Skip to content

Commit

Permalink
dekaf: Refactor to avoid leaking modified HeapNodes when using `Del…
Browse files Browse the repository at this point in the history
…etionMode::CDC`
  • Loading branch information
jshearer committed Oct 31, 2024
1 parent 34e8d07 commit 7e7f612
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub struct Read {
offset_start: i64,

deletes: DeletionMode,
alloc: bumpalo::Bump,

pub(crate) rewrite_offsets_from: Option<i64>,
}
Expand Down Expand Up @@ -105,7 +104,6 @@ impl Read {
journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
deletes,
alloc: Default::default(),
offset_start: offset,
}
}
Expand All @@ -120,6 +118,8 @@ impl Read {
Compression, Record, RecordBatchEncoder, RecordEncodeOptions,
};

let mut alloc = bumpalo::Bump::new();

let mut records: Vec<Record> = Vec::new();
let mut records_bytes: usize = 0;

Expand Down Expand Up @@ -277,14 +277,16 @@ impl Read {
tmp.extend(self.value_schema_id.to_be_bytes());

if matches!(self.deletes, DeletionMode::CDC) {
let mut heap_node = HeapNode::from_node(root.get(), &self.alloc);
let mut heap_node = HeapNode::from_node(root.get(), &alloc);
let foo = DELETION_INDICATOR_PTR
.create_heap_node(&mut heap_node, &self.alloc)
.create_heap_node(&mut heap_node, &alloc)
.context("Unable to add deletion meta indicator")?;

*foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 });

() = avro::encode(&mut tmp, &self.value_schema, &heap_node)?;

alloc.reset();
} else {
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;
}
Expand Down

0 comments on commit 7e7f612

Please sign in to comment.