8000 fix(kafka sink): Use rdkafka::client::Client instead of Consumer by belltoy · Pull Request #21129 · vectordotdev/vector · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(kafka sink): Use rdkafka::client::Client instead of Consumer #21129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/21129_suppress_warnings_for_kafka_sink.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `kafka` sink no longer emits warnings due to applying rdkafka options to a consumer used for the health check. Now it uses the producer client for the health check.

authors: belltoy
136 changes: 63 additions & 73 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,8 @@ fn example_librdkafka_options() -> HashMap<String, String> {
])
}

/// Used to determine the options to set in configs, since both Kafka consumers and producers have
/// unique options, they use the same struct, and the error if given the wrong options.
#[derive(Debug, PartialOrd, PartialEq, Eq)]
pub enum KafkaRole {
Consumer,
Producer,
}

impl KafkaSinkConfig {
pub(crate) fn to_rdkafka(&self, kafka_role: KafkaRole) -> crate::Result<ClientConfig> {
pub(crate) fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &self.bootstrap_servers)
Expand All @@ -164,73 +156,71 @@ impl KafkaSinkConfig {
self.auth.apply(&mut client_config)?;

// All batch options are producer only.
if kafka_role == KafkaRole::Producer {
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.queue.buffering.max.ms={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\
The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\
Please delete one.").into());
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.num.messages={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\
The config already sets this as `librdkafka_options.batch.num.messages={val}`.\
Please delete one.").into());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.size={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\
The config already sets this as `librdkafka_options.batch.size={val}`.\
Please delete one.").into());
}
debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}

for (key, value) in self.librdkafka_options.iter() {
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use rdkafka::{
consumer::{BaseConsumer, Consumer},
error::KafkaError,
producer::FutureProducer,
producer::{BaseProducer, FutureProducer, Producer},
ClientConfig,
};
use snafu::{ResultExt, Snafu};
use tokio::time::Duration;
use tracing::Span;
use vrl::path::OwnedTargetPath;

use super::config::{KafkaRole, KafkaSinkConfig};
use super::config::KafkaSinkConfig;
use crate::{
kafka::KafkaStatisticsContext,
sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService},
Expand Down Expand Up @@ -46,7 +45,7 @@ pub(crate) fn create_producer(

impl KafkaSink {
pub(crate) fn new(config: KafkaSinkConfig) -> crate::Result<Self> {
let producer_config = config.to_rdkafka(KafkaRole::Producer)?;
let producer_config = config.to_rdkafka()?;
let producer = create_producer(producer_config)?;
let transformer = config.encoding.transformer();
let serializer = config.encoding.build()?;
Expand Down Expand Up @@ -105,7 +104,7 @@ impl KafkaSink {

pub(crate) async fn healthcheck(config: KafkaSinkConfig) -> crate::Result<()> {
trace!("Healthcheck started.");
let client = config.to_rdkafka(KafkaRole::Consumer).unwrap();
let client_config = config.to_rdkafka().unwrap();
let topic: Option<String> = match config.healthcheck_topic {
Some(topic) => Some(topic),
_ => match config.topic.render_string(&LogEvent::from_str_legacy("")) {
Expand All @@ -121,10 +120,11 @@ pub(crate) async fn healthcheck(config: KafkaSinkConfig) -> crate::Result<()> {
};

tokio::task::spawn_blocking(move || {
let consumer: BaseConsumer = client.create().unwrap();
let producer: BaseProducer = client_config.create().unwrap();
let topic = topic.as_ref().map(|topic| &topic[..]);

consumer
producer
.client()
.fetch_metadata(topic, Duration::from_secs(3))
.map(|_| ())
})
Expand Down
9 changes: 2 additions & 7 deletions src/sinks/kafka/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ mod integration_test {
event::{BatchNotifier, BatchStatus},
};

use super::super::{
config::{KafkaRole, KafkaSinkConfig},
sink::KafkaSink,
*,
};
use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
use crate::{
event::{ObjectMap, Value},
kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
Expand Down Expand Up @@ -190,8 +186,7 @@ mod integration_test {
headers_key: None,
acknowledgements: Default::default(),
};
config.clone().to_rdkafka(KafkaRole::Consumer)?;
config.clone().to_rdkafka(KafkaRole::Producer)?;
config.clone().to_rdkafka()?;
self::sink::healthcheck(config.clone()).await?;
KafkaSink::new(config)
}
Expand Down
Loading
0