diff --git a/src/sinks/kafka/config.rs b/src/sinks/kafka/config.rs index 692718b9096de..487b572d73367 100644 --- a/src/sinks/kafka/config.rs +++ b/src/sinks/kafka/config.rs @@ -17,8 +17,6 @@ use crate::{ }, }; -pub(crate) const QUEUED_MIN_MESSAGES: u64 = 100000; - /// Configuration for the `kafka` sink. #[serde_as] #[configurable_component(sink( @@ -159,79 +157,73 @@ impl KafkaSinkConfig { self.auth.apply(&mut client_config)?; - match kafka_role { - // All batch options are producer only. - KafkaRole::Producer => { - client_config - .set("compression.codec", &to_string(self.compression)) - .set( - "message.timeout.ms", - &self.message_timeout_ms.as_millis().to_string(), - ); - - if let Some(value) = self.batch.timeout_secs { - // Delay in milliseconds to wait for messages in the producer queue to accumulate before - // constructing message batches (MessageSets) to transmit to brokers. A higher value - // allows larger and more effective (less overhead, improved compression) batches of - // messages to accumulate at the expense of increased message delivery latency. - // Type: float - let key = "queue.buffering.max.ms"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\ + // All batch options are producer only. + if kafka_role == KafkaRole::Producer { + client_config + .set("compression.codec", &to_string(self.compression)) + .set( + "message.timeout.ms", + &self.message_timeout_ms.as_millis().to_string(), + ); + + if let Some(value) = self.batch.timeout_secs { + // Delay in milliseconds to wait for messages in the producer queue to accumulate before + // constructing message batches (MessageSets) to transmit to brokers. A higher value + // allows larger and more effective (less overhead, improved compression) batches of + // messages to accumulate at the expense of increased message delivery latency. + // Type: float + let key = "queue.buffering.max.ms"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\ The config already sets this as `librdkafka_options.queue.buffering.max.ms={}`.\ Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "timeout_secs", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &((value * 1000.0).round().to_string())); } - if let Some(value) = self.batch.max_events { - // Maximum number of messages batched in one MessageSet. The total MessageSet size is - // also limited by batch.size and message.max.bytes. - // Type: integer - let key = "batch.num.messages"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\ + debug!( + librdkafka_option = key, + batch_option = "timeout_secs", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &((value * 1000.0).round().to_string())); + } + if let Some(value) = self.batch.max_events { + // Maximum number of messages batched in one MessageSet. The total MessageSet size is + // also limited by batch.size and message.max.bytes. + // Type: integer + let key = "batch.num.messages"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\ The config already sets this as `librdkafka_options.batch.num.messages={}`.\ Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "max_events", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &value.to_string()); } - if let Some(value) = self.batch.max_bytes { - // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol - // framing overhead. This limit is applied after the first message has been added to the - // batch, regardless of the first message's size, this is to ensure that messages that - // exceed batch.size are produced. The total MessageSet size is also limited by - // batch.num.messages and message.max.bytes. - // Type: integer - let key = "batch.size"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\ + debug!( + librdkafka_option = key, + batch_option = "max_events", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &value.to_string()); + } + if let Some(value) = self.batch.max_bytes { + // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol + // framing overhead. This limit is applied after the first message has been added to the + // batch, regardless of the first message's size, this is to ensure that messages that + // exceed batch.size are produced. The total MessageSet size is also limited by + // batch.num.messages and message.max.bytes. + // Type: integer + let key = "batch.size"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\ The config already sets this as `librdkafka_options.batch.size={}`.\ Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "max_bytes", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &value.to_string()); } - } - - KafkaRole::Consumer => { - client_config.set("queued.min.messages", QUEUED_MIN_MESSAGES.to_string()); + debug!( + librdkafka_option = key, + batch_option = "max_bytes", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &value.to_string()); } } diff --git a/src/sinks/kafka/request_builder.rs b/src/sinks/kafka/request_builder.rs index 3a8ce886ad085..7a7693a35b032 100644 --- a/src/sinks/kafka/request_builder.rs +++ b/src/sinks/kafka/request_builder.rs @@ -1,42 +1,43 @@ -use std::num::NonZeroUsize; - -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; +use lookup::OwnedTargetPath; use rdkafka::message::{Header, OwnedHeaders}; -use tokio_util::codec::Encoder as _; -use vrl::path::OwnedTargetPath; use crate::{ - codecs::{Encoder, Transformer}, - event::{Event, Finalizable, Value}, - internal_events::{KafkaHeaderExtractionError, TemplateRenderingError}, + internal_events::KafkaHeaderExtractionError, sinks::{ kafka::service::{KafkaRequest, KafkaRequestMetadata}, - util::metadata::RequestMetadataBuilder, + prelude::*, }, - template::Template, }; pub struct KafkaRequestBuilder { pub key_field: Option, pub headers_key: Option, - pub topic_template: Template, - pub transformer: Transformer, - pub encoder: Encoder<()>, + pub encoder: (Transformer, Encoder<()>), } -impl KafkaRequestBuilder { - pub fn build_request(&mut self, mut event: Event) -> Option { - let topic = self - .topic_template - .render_string(&event) - .map_err(|error| { - emit!(TemplateRenderingError { - field: None, - drop_event: true, - error, - }); - }) - .ok()?; +impl RequestBuilder<(String, Event)> for KafkaRequestBuilder { + type Metadata = KafkaRequestMetadata; + type Events = Event; + type Encoder = (Transformer, Encoder<()>); + type Payload = Bytes; + type Request = KafkaRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + input: (String, Event), + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let (topic, mut event) = input; + let builder = RequestMetadataBuilder::from_event(&event); let metadata = KafkaRequestMetadata { finalizers: event.take_finalizers(), @@ -45,23 +46,21 @@ impl KafkaRequestBuilder { headers: get_headers(&event, self.headers_key.as_ref()), topic, }; - self.transformer.transform(&mut event); - let mut body = BytesMut::new(); - - // Ensure the metadata builder is built after transforming the event so we have the event - // size taking into account any dropped fields. - let metadata_builder = RequestMetadataBuilder::from_event(&event); - self.encoder.encode(event, &mut body).ok()?; - let body = body.freeze(); - let bytes_len = NonZeroUsize::new(body.len()).expect("payload should never be zero length"); - let request_metadata = metadata_builder.with_request_size(bytes_len); + (metadata, builder, event) + } - Some(KafkaRequest { - body, + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + KafkaRequest { + body: payload.into_payload(), metadata, request_metadata, - }) + } } } diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 299a9de4ee056..0f1d122b7750c 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -7,9 +7,6 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord}, util::Timeout, }; -use vector_core::internal_event::{ - ByteSize, BytesSent, InternalEventHandle as _, Protocol, Registered, -}; use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; @@ -29,6 +26,7 @@ pub struct KafkaRequestMetadata { pub struct KafkaResponse { event_byte_size: GroupedCountByteSize, + raw_byte_size: usize, } impl DriverResponse for KafkaResponse { @@ -39,6 +37,10 @@ impl DriverResponse for KafkaResponse { fn events_sent(&self) -> &GroupedCountByteSize { &self.event_byte_size } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } } impl Finalizable for KafkaRequest { @@ -60,15 +62,13 @@ impl MetaDescriptive for KafkaRequest { #[derive(Clone)] pub struct KafkaService { kafka_producer: FutureProducer, - bytes_sent: Registered, } impl KafkaService { - pub(crate) fn new(kafka_producer: FutureProducer) -> KafkaService { - KafkaService { - kafka_producer, - bytes_sent: register!(BytesSent::from(Protocol("kafka".into()))), - } + pub(crate) const fn new( + kafka_producer: FutureProducer, + ) -> KafkaService { + KafkaService { kafka_producer } } } @@ -104,10 +104,12 @@ impl Service for KafkaService { // rdkafka will internally retry forever if the queue is full match this.kafka_producer.send(record, Timeout::Never).await { Ok((_partition, _offset)) => { - this.bytes_sent.emit(ByteSize( - request.body.len() + request.metadata.key.map(|x| x.len()).unwrap_or(0), - )); - Ok(KafkaResponse { event_byte_size }) + let raw_byte_size = + request.body.len() + request.metadata.key.map_or(0, |x| x.len()); + Ok(KafkaResponse { + event_byte_size, + raw_byte_size, + }) } Err((kafka_err, _original_record)) => Err(kafka_err), } diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index f2d0107524310..8a981a987c79a 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -1,4 +1,5 @@ -use futures::future; +use std::num::NonZeroUsize; + use rdkafka::{ consumer::{BaseConsumer, Consumer}, error::KafkaError, @@ -13,9 +14,7 @@ use vrl::path::OwnedTargetPath; use super::config::{KafkaRole, KafkaSinkConfig}; use crate::{ kafka::KafkaStatisticsContext, - sinks::kafka::{ - config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, service::KafkaService, - }, + sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService}, sinks::prelude::*, }; @@ -65,22 +64,47 @@ impl KafkaSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - // rdkafka will internally retry forever, so we need some limit to prevent this from overflowing - let service = ConcurrencyLimit::new(self.service, QUEUED_MIN_MESSAGES as usize); - let mut request_builder = KafkaRequestBuilder { + // rdkafka will internally retry forever, so we need some limit to prevent this from overflowing. + // 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying + // buffer is full. + let service = ConcurrencyLimit::new(self.service.clone(), 64); + let builder_limit = NonZeroUsize::new(64); + + let request_builder = KafkaRequestBuilder { key_field: self.key_field, headers_key: self.headers_key, - topic_template: self.topic, - transformer: self.transformer, - encoder: self.encoder, + encoder: (self.transformer, self.encoder), }; input - .filter_map(|event| - // request_builder is fallible but the places it can fail are emitting - // `Error` and `DroppedEvent` internal events appropriately so no need to here. - future::ready(request_builder.build_request(event))) + .filter_map(|event| { + // Compute the topic. + future::ready( + self.topic + .render_string(&event) + .map_err(|error| { + emit!(TemplateRenderingError { + field: None, + drop_event: true, + error, + }); + }) + .ok() + .map(|topic| (topic, event)), + ) + }) + .request_builder(builder_limit, request_builder) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) .into_driver(service) + .protocol("kafka") .run() .await }