Skip to content

Commit

Permalink
dekaf: Remove readback, as it wasn't the solution to the lag issue, a…
Browse files Browse the repository at this point in the history
…nd it is causing issues by allowing consumers to periodically read _just_ the last ack message
  • Loading branch information
jshearer committed Oct 29, 2024
1 parent 6fa40a2 commit 98041fc
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ pub enum ReadTarget {
Docs(usize),
}

const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size

impl Read {
pub fn new(
client: journal::Client,
Expand All @@ -65,8 +63,7 @@ impl Read {

let stream = client.clone().read_json_lines(
broker::ReadRequest {
// Start reading at least 1 document in the past
offset: std::cmp::max(0, offset - OFFSET_READBACK),
offset,
block: true,
journal: partition.spec.name.clone(),
begin_mod_time: not_before_sec as i64,
Expand Down Expand Up @@ -322,7 +319,8 @@ impl Read {
last_write_head = self.last_write_head,
ratio = buf.len() as f64 / (records_bytes + 1) as f64,
records_bytes,
"returning records"
did_timeout,
"batch complete"
);

metrics::counter!("dekaf_documents_read", "journal_name" => self.journal_name.to_owned())
Expand Down

0 comments on commit 98041fc

Please sign in to comment.