From 4a685d9b374ee1aae70f7707d5b066d41857ffa0 Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Sat, 30 Mar 2024 09:18:49 +0400 Subject: [PATCH 1/3] Fix StreamConsumer wakeup races --- src/client.rs | 38 ++++++++++++--- src/consumer/base_consumer.rs | 85 +++++++++++++++++++-------------- src/consumer/stream_consumer.rs | 56 ++++++++++++++-------- src/producer/base_producer.rs | 4 +- 4 files changed, 117 insertions(+), 66 deletions(-) diff --git a/src/client.rs b/src/client.rs index 1b9f6bd1c..6ab430fb6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -198,6 +198,21 @@ impl NativeClient { } } +pub(crate) enum EventPollResult { + None, + EventConsumed, + Event(T), +} + +impl Into> for EventPollResult { + fn into(self) -> Option { + match self { + EventPollResult::None | EventPollResult::EventConsumed => None, + EventPollResult::Event(evt) => Some(evt), + } + } +} + /// A low-level rdkafka client. /// /// This type is the basis of the consumers and producers in the [`consumer`] @@ -278,31 +293,42 @@ impl Client { &self.context } - pub(crate) fn poll_event(&self, queue: &NativeQueue, timeout: Timeout) -> Option { + pub(crate) fn poll_event( + &self, + queue: &NativeQueue, + timeout: Timeout, + ) -> EventPollResult { let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) }; if let Some(ev) = event { let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; match evtype { - rdsys::RD_KAFKA_EVENT_LOG => self.handle_log_event(ev.ptr()), - rdsys::RD_KAFKA_EVENT_STATS => self.handle_stats_event(ev.ptr()), + rdsys::RD_KAFKA_EVENT_LOG => { + self.handle_log_event(ev.ptr()); + return EventPollResult::EventConsumed; + } + rdsys::RD_KAFKA_EVENT_STATS => { + self.handle_stats_event(ev.ptr()); + return EventPollResult::EventConsumed; + } rdsys::RD_KAFKA_EVENT_ERROR => { // rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets // embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event // for the consumer case in order to return the error to the user. self.handle_error_event(ev.ptr()); - return Some(ev); + return EventPollResult::Event(ev); } rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH => { if C::ENABLE_REFRESH_OAUTH_TOKEN { self.handle_oauth_refresh_event(ev.ptr()); } + return EventPollResult::EventConsumed; } _ => { - return Some(ev); + return EventPollResult::Event(ev); } } } - None + EventPollResult::None } fn handle_log_event(&self, event: *mut RDKafkaEvent) { diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index c67c90cb2..f240541e7 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -11,7 +11,7 @@ use log::{error, warn}; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -use crate::client::{Client, NativeClient, NativeQueue}; +use crate::client::{Client, EventPollResult, NativeClient, NativeQueue}; use crate::config::{ ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig, }; @@ -115,59 +115,70 @@ where /// /// The returned message lives in the memory of the consumer and cannot outlive it. pub fn poll>(&self, timeout: T) -> Option>> { - self.poll_queue(self.get_queue(), timeout) + self.poll_queue(self.get_queue(), timeout).into() } pub(crate) fn poll_queue>( &self, queue: &NativeQueue, timeout: T, - ) -> Option>> { + ) -> EventPollResult>> { let now = Instant::now(); - let mut timeout = timeout.into(); + let initial_timeout = timeout.into(); + let mut timeout = initial_timeout; let min_poll_interval = self.context().main_queue_min_poll_interval(); loop { let op_timeout = std::cmp::min(timeout, min_poll_interval); let maybe_event = self.client().poll_event(queue, op_timeout); - if let Some(event) = maybe_event { - let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; - match evtype { - rdsys::RD_KAFKA_EVENT_FETCH => { - if let Some(result) = self.handle_fetch_event(event) { - return Some(result); + match maybe_event { + EventPollResult::Event(event) => { + let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_FETCH => { + if let Some(result) = self.handle_fetch_event(event) { + return EventPollResult::Event(result); + } } - } - rdsys::RD_KAFKA_EVENT_ERROR => { - if let Some(err) = self.handle_error_event(event) { - return Some(Err(err)); + rdsys::RD_KAFKA_EVENT_ERROR => { + if let Some(err) = self.handle_error_event(event) { + return EventPollResult::Event(Err(err)); + } } - } - rdsys::RD_KAFKA_EVENT_REBALANCE => { - self.handle_rebalance_event(event); - if timeout != Timeout::Never { - return None; + rdsys::RD_KAFKA_EVENT_REBALANCE => { + self.handle_rebalance_event(event); + if timeout != Timeout::Never { + return EventPollResult::EventConsumed; + } } - } - rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => { - self.handle_offset_commit_event(event); - if timeout != Timeout::Never { - return None; + rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => { + self.handle_offset_commit_event(event); + if timeout != Timeout::Never { + return EventPollResult::EventConsumed; + } + } + _ => { + let buf = unsafe { + let evname = rdsys::rd_kafka_event_name(event.ptr()); + CStr::from_ptr(evname).to_bytes() + }; + let evname = String::from_utf8(buf.to_vec()).unwrap(); + warn!("Ignored event '{}' on consumer poll", evname); } } - _ => { - let evname = unsafe { - let evname = rdsys::rd_kafka_event_name(event.ptr()); - CStr::from_ptr(evname).to_string_lossy() - }; - warn!("Ignored event '{evname}' on consumer poll"); + } + EventPollResult::None => { + timeout = initial_timeout.saturating_sub(now.elapsed()); + if timeout.is_zero() { + return EventPollResult::None; } } - } - - timeout = timeout.saturating_sub(now.elapsed()); - if timeout.is_zero() { - return None; - } + EventPollResult::EventConsumed => { + timeout = initial_timeout.saturating_sub(now.elapsed()); + if timeout.is_zero() { + return EventPollResult::EventConsumed; + } + } + }; } } @@ -802,7 +813,7 @@ where /// associated consumer regularly, even if no messages are expected, to /// serve events. pub fn poll>(&self, timeout: T) -> Option>> { - self.consumer.poll_queue(&self.queue, timeout) + self.consumer.poll_queue(&self.queue, timeout).into() } /// Sets a callback that will be invoked whenever the queue becomes diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 5a7f60552..12aa8ee99 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -18,7 +18,7 @@ use slab::Slab; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -use crate::client::{Client, NativeQueue}; +use crate::client::{Client, EventPollResult, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::consumer::base_consumer::{BaseConsumer, PartitionQueue}; use crate::consumer::{ @@ -122,11 +122,12 @@ impl<'a, C: ConsumerContext> MessageStream<'a, C> { } } - fn poll(&self) -> Option>> { + fn poll(&self) -> EventPollResult>> { if let Some(queue) = self.partition_queue { self.consumer.poll_queue(queue, Duration::ZERO) } else { - self.consumer.poll(Duration::ZERO) + self.consumer + .poll_queue(self.consumer.get_queue(), Duration::ZERO) } } } @@ -135,25 +136,38 @@ impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> { type Item = KafkaResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // If there is a message ready, yield it immediately to avoid the - // taking the lock in `self.set_waker`. - if let Some(message) = self.poll() { - return Poll::Ready(Some(message)); - } - - // Otherwise, we need to wait for a message to become available. Store - // the waker so that we are woken up if the queue flips from non-empty - // to empty. We have to store the waker repatedly in case this future - // migrates between tasks. - self.wakers.set_waker(self.slot, cx.waker().clone()); - - // Check whether a new message became available after we installed the - // waker. This avoids a race where `poll` returns None to indicate that - // the queue is empty, but the queue becomes non-empty before we've - // installed the waker. match self.poll() { - None => Poll::Pending, - Some(message) => Poll::Ready(Some(message)), + EventPollResult::Event(message) => { + // If there is a message ready, yield it immediately to avoid the + // taking the lock in `self.set_waker`. + Poll::Ready(Some(message)) + } + EventPollResult::EventConsumed => { + // Event was consumed, yield to runtime + cx.waker().wake_by_ref(); + Poll::Pending + } + EventPollResult::None => { + // Otherwise, we need to wait for a message to become available. Store + // the waker so that we are woken up if the queue flips from non-empty + // to empty. We have to store the waker repatedly in case this future + // migrates between tasks. + self.wakers.set_waker(self.slot, cx.waker().clone()); + + // Check whether a new message became available after we installed the + // waker. This avoids a race where `poll` returns None to indicate that + // the queue is empty, but the queue becomes non-empty before we've + // installed the waker. + match self.poll() { + EventPollResult::Event(message) => Poll::Ready(Some(message)), + EventPollResult::EventConsumed => { + // Event was consumed, yield to runtime + cx.waker().wake_by_ref(); + Poll::Pending + } + EventPollResult::None => Poll::Pending, + } + } } } } diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 1cc6e05ce..405466537 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -57,7 +57,7 @@ use rdkafka_sys as rdsys; use rdkafka_sys::rd_kafka_vtype_t::*; use rdkafka_sys::types::*; -use crate::client::{Client, NativeQueue}; +use crate::client::{Client, EventPollResult, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::consumer::ConsumerGroupMetadata; use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError}; @@ -363,7 +363,7 @@ where /// the message delivery callbacks. pub fn poll>(&self, timeout: T) { let event = self.client().poll_event(&self.queue, timeout.into()); - if let Some(ev) = event { + if let EventPollResult::Event(ev) = event { let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; match evtype { rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), From 934c580537696e35421ea14f10c0fb31893fdcdf Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Thu, 29 Aug 2024 20:44:37 +0400 Subject: [PATCH 2/3] to_string_lossy --- src/consumer/base_consumer.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index f240541e7..e232095e2 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -157,12 +157,11 @@ where } } _ => { - let buf = unsafe { + let evname = unsafe { let evname = rdsys::rd_kafka_event_name(event.ptr()); - CStr::from_ptr(evname).to_bytes() + CStr::from_ptr(evname).to_string_lossy() }; - let evname = String::from_utf8(buf.to_vec()).unwrap(); - warn!("Ignored event '{}' on consumer poll", evname); + warn!("Ignored event '{evname}' on consumer poll"); } } } From 3f2b74bc5e18f6a77974fcb59073e99108ccdfb5 Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Thu, 29 Aug 2024 20:45:22 +0400 Subject: [PATCH 3/3] use from --- src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 6ab430fb6..08aa5b9dd 100644 --- a/src/client.rs +++ b/src/client.rs @@ -204,9 +204,9 @@ pub(crate) enum EventPollResult { Event(T), } -impl Into> for EventPollResult { - fn into(self) -> Option { - match self { +impl From> for Option { + fn from(val: EventPollResult) -> Self { + match val { EventPollResult::None | EventPollResult::EventConsumed => None, EventPollResult::Event(evt) => Some(evt), }