From 98041fc2fe7090964bcc1ca8f7bb369177ccb6ce Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 25 Oct 2024 14:32:17 -0400 Subject: [PATCH] dekaf: Remove readback, as it wasn't the solution to the lag issue, and it is causing issues by allowing consumers to periodically read _just_ the last ack message --- crates/dekaf/src/read.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 4b676bf44d..3b3f1ffa7f 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -49,8 +49,6 @@ pub enum ReadTarget { Docs(usize), } -const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size - impl Read { pub fn new( client: journal::Client, @@ -65,8 +63,7 @@ impl Read { let stream = client.clone().read_json_lines( broker::ReadRequest { - // Start reading at least 1 document in the past - offset: std::cmp::max(0, offset - OFFSET_READBACK), + offset, block: true, journal: partition.spec.name.clone(), begin_mod_time: not_before_sec as i64, @@ -322,7 +319,8 @@ impl Read { last_write_head = self.last_write_head, ratio = buf.len() as f64 / (records_bytes + 1) as f64, records_bytes, - "returning records" + did_timeout, + "batch complete" ); metrics::counter!("dekaf_documents_read", "journal_name" => self.journal_name.to_owned())