From 87b1d900f3dd294b630ad5411056f382191805d5 Mon Sep 17 00:00:00 2001 From: David Blewett Date: Thu, 12 Dec 2024 13:58:10 -0500 Subject: [PATCH] Various BaseProducer::flush fixes (#748) * Refactor `BaseProducer::poll` to not return early, but instead continue processing events until the passed timeout. Refactor `BaseProducer::flush` to spend most time in `librdkafka`, and whatever is left in `BaseProducer::poll`. * Simplify and rely on cast truncation. * Fix logic error so that we always poll at least once even when timeout is `Duration::ZERO`. * Introduce Deadline type to simplify `BaseProducer::flush` and `BaseProducer::poll`. * Add `From` impl for `Deadline` * Ensure we always call `poll_event` at least once, even if we have `Timeout::After` for a non-blocking call. * Allow Deadline to express `Timeout::Never` losslessly. * Refactor poll loop to be more idiomatic. * Centralize clamping logic to Deadline. * Remove extraneous From impl. * Simplify `BaseProducer::poll` to rely on `From` impl. * Don't block forever in poll when flushing. * Remove this clamp, in favor of relying on remaining_millis_i32. * Ensure we always poll even if we get a timeout from flush. * Update changelog reflecting behavior change in `BaseProducer::poll`. --- changelog.md | 2 ++ src/client.rs | 4 +-- src/producer/base_producer.rs | 61 +++++++++++++++++------------------ src/util.rs | 57 +++++++++++++++++++++++++++++++- 4 files changed, 90 insertions(+), 34 deletions(-) diff --git a/changelog.md b/changelog.md index d938978bd..5c4fed654 100644 --- a/changelog.md +++ b/changelog.md @@ -8,6 +8,8 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). * Remove testing for old Kafka versions (before 3.0). Add tests for 3.7. * Fix test dependency on docker compose. * Address wakeup races introduced by pivoting to the event API. +* Update `BaseProducer::poll` to not return early, and instead continue + looping until the passed timeout is reached. ## 0.36.2 (2024-01-16) diff --git a/src/client.rs b/src/client.rs index 231bcf908..3c78a2284 100644 --- a/src/client.rs +++ b/src/client.rs @@ -300,10 +300,10 @@ impl Client { &self.context } - pub(crate) fn poll_event( + pub(crate) fn poll_event>( &self, queue: &NativeQueue, - timeout: Timeout, + timeout: T, ) -> EventPollResult { let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) }; if let Some(ev) = event { diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 7edd4befd..19c6ab6b9 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -67,7 +67,7 @@ use crate::producer::{ DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig, }; use crate::topic_partition_list::TopicPartitionList; -use crate::util::{IntoOpaque, NativePtr, Timeout}; +use crate::util::{Deadline, IntoOpaque, NativePtr, Timeout}; pub use crate::message::DeliveryResult; @@ -338,7 +338,6 @@ where client: Client, queue: NativeQueue, _partitioner: PhantomData, - min_poll_interval: Timeout, } impl BaseProducer @@ -353,7 +352,6 @@ where client, queue, _partitioner: PhantomData, - min_poll_interval: Timeout::After(Duration::from_millis(100)), } } @@ -362,19 +360,25 @@ where /// Regular calls to `poll` are required to process the events and execute /// the message delivery callbacks. pub fn poll>(&self, timeout: T) { - let event = self.client().poll_event(&self.queue, timeout.into()); - if let EventPollResult::Event(ev) = event { - let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; - match evtype { - rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), - _ => { - let evname = unsafe { - let evname = rdsys::rd_kafka_event_name(ev.ptr()); - CStr::from_ptr(evname).to_string_lossy() - }; - warn!("Ignored event '{}' on base producer poll", evname); + let deadline: Deadline = timeout.into().into(); + loop { + let event = self.client().poll_event(&self.queue, &deadline); + if let EventPollResult::Event(ev) = event { + let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), + _ => { + let evname = unsafe { + let evname = rdsys::rd_kafka_event_name(ev.ptr()); + CStr::from_ptr(evname).to_string_lossy() + }; + warn!("Ignored event '{}' on base producer poll", evname); + } } } + if deadline.elapsed() { + break; + } } } @@ -494,26 +498,21 @@ where // As this library uses the rdkafka Event API, flush will not call rd_kafka_poll() but instead wait for // the librdkafka-handled message count to reach zero. Runs until value reaches zero or timeout. fn flush>(&self, timeout: T) -> KafkaResult<()> { - let mut timeout = timeout.into(); - loop { - let op_timeout = std::cmp::min(timeout, self.min_poll_interval); - if self.in_flight_count() > 0 { - unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) }; - self.poll(op_timeout); + let deadline: Deadline = timeout.into().into(); + while self.in_flight_count() > 0 && !deadline.elapsed() { + let ret = unsafe { + rdsys::rd_kafka_flush(self.client().native_ptr(), deadline.remaining_millis_i32()) + }; + if let Deadline::Never = &deadline { + self.poll(Timeout::After(Duration::ZERO)); } else { - return Ok(()); + self.poll(&deadline); } - - if op_timeout >= timeout { - let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) }; - if ret.is_error() { - return Err(KafkaError::Flush(ret.into())); - } else { - return Ok(()); - } - } - timeout -= op_timeout; + if ret.is_error() { + return Err(KafkaError::Flush(ret.into())); + }; } + Ok(()) } fn purge(&self, flags: PurgeConfig) { diff --git a/src/util.rs b/src/util.rs index fef12b87f..847bf89dd 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,5 +1,6 @@ //! Utility functions and types. +use std::cmp; use std::ffi::CStr; use std::fmt; use std::future::Future; @@ -12,7 +13,7 @@ use std::slice; use std::sync::Arc; #[cfg(feature = "naive-runtime")] use std::thread; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(feature = "naive-runtime")] use futures_channel::oneshot; @@ -31,6 +32,40 @@ pub fn get_rdkafka_version() -> (i32, String) { (version_number, c_str.to_string_lossy().into_owned()) } +pub(crate) enum Deadline { + At(Instant), + Never, +} + +impl Deadline { + // librdkafka's flush api requires an i32 millisecond timeout + const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64); + + pub(crate) fn new(duration: Option) -> Self { + if let Some(d) = duration { + Self::At(Instant::now() + d) + } else { + Self::Never + } + } + + pub(crate) fn remaining(&self) -> Duration { + if let Deadline::At(i) = self { + *i - Instant::now() + } else { + Duration::MAX + } + } + + pub(crate) fn remaining_millis_i32(&self) -> i32 { + cmp::min(Deadline::MAX_FLUSH_DURATION, self.remaining()).as_millis() as i32 + } + + pub(crate) fn elapsed(&self) -> bool { + self.remaining() <= Duration::ZERO + } +} + /// Specifies a timeout for a Kafka operation. #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub enum Timeout { @@ -76,6 +111,26 @@ impl std::ops::SubAssign for Timeout { } } +impl From for Deadline { + fn from(t: Timeout) -> Deadline { + if let Timeout::After(dur) = t { + Deadline::new(Some(dur)) + } else { + Deadline::new(None) + } + } +} + +impl From<&Deadline> for Timeout { + fn from(d: &Deadline) -> Timeout { + if let Deadline::Never = d { + Timeout::Never + } else { + Timeout::After(d.remaining()) + } + } +} + impl From for Timeout { fn from(d: Duration) -> Timeout { Timeout::After(d)