Skip to content

Commit

Permalink
Poll until return in poll_event
Browse files Browse the repository at this point in the history
otherwise librdkafka queue waker does not work
  • Loading branch information
trtt committed Mar 14, 2024
1 parent 36dc43c commit c1836f6
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::os::raw::c_char;
use std::ptr;
use std::string::ToString;
use std::sync::Arc;
use std::time::Instant;

use libc::c_void;
use rdkafka_sys as rdsys;
Expand Down Expand Up @@ -286,8 +287,18 @@ impl<C: ClientContext> Client<C> {
}

pub(crate) fn poll_event(&self, queue: &NativeQueue, timeout: Timeout) -> Option<NativeEvent> {
let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) };
if let Some(ev) = event {
let now = if timeout.is_zero() || timeout == Timeout::Never {
None
} else {
Some(Instant::now())
};

let get_event = || {
let timeout = now.map_or_else(|| timeout, |now| timeout.saturating_sub(now.elapsed()));
unsafe { NativeEvent::from_ptr(queue.poll(timeout)) }
};

while let Some(ev) = get_event() {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_LOG => self.handle_log_event(ev.ptr()),
Expand Down

0 comments on commit c1836f6

Please sign in to comment.