Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Implement DeletionMode to allow representing deletions as a Kafka header instead of a tombstone #1738

Merged
merged 3 commits into from
Oct 31, 2024

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Oct 25, 2024

This adds support for customizable deletion handling:

  • DeleteMode::Default, which is what you get if you don't specify a deletion mode, operates the same was as before where deletes are mapped into Kafka records with a key, and null value.
  • DeleteMode::Header changes this behavior to include an _is_deleted header on every returned Record. Non-deletes get 0, deletes get 1. In addition, the full deletion tombstone stored in the Flow collection is returned for the Record's value instead of null.

I tested this out in Tinybird and it looks like this works as expected: setting a username of {"deletions": "header"} causes Flow deletion tombstones (op: d) to cause the corresponding row to get deleted in Tinybird.

Question: Should we just go all the way and name this DeleteMode::Tinybird? Or does someone have a better suggestion than Header?

Also included is a small change to Read to make data preview UIs work reliably again. I realized they stopped working consistently after merging #1733, and this fixes that.


This change is Reviewable

@jshearer jshearer force-pushed the jshearer/dekaf_deletions_in_header branch 7 times, most recently from 1ab8273 to cb3fd03 Compare October 25, 2024 18:42
@jshearer jshearer marked this pull request as ready for review October 25, 2024 19:35
@jshearer jshearer marked this pull request as draft October 25, 2024 20:09
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@jshearer jshearer force-pushed the jshearer/dekaf_deletions_in_header branch from e49688a to cb3fd03 Compare October 28, 2024 16:08
@jshearer jshearer marked this pull request as ready for review October 28, 2024 19:33
@jshearer jshearer mentioned this pull request Oct 28, 2024
@jshearer jshearer force-pushed the jshearer/dekaf_deletions_in_header branch 2 times, most recently from 972aecd to b391c01 Compare October 30, 2024 17:14
…_deleted` for deletions

Also needed to add this field to the generated Avro schemas
@jshearer jshearer force-pushed the jshearer/dekaf_deletions_in_header branch from b391c01 to e337f39 Compare October 31, 2024 15:22
if matches!(self.deletes, DeletionMode::CDC) {
let mut heap_node = HeapNode::from_node(root.get(), &self.alloc);
let foo = DELETION_INDICATOR_PTR
.create_heap_node(&mut heap_node, &self.alloc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a memory leak here -- you're never resetting the bump allocator.

I would recommend not having it as a field of self and instead creating one on the stack. Use it, and then call reset() on it immediately after each avro::encode so that it's only retaining a single HeapNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to ask you to take a look at this as I wasn't sure I was doing it correctly. That makes sense to me, thanks 👍

@jshearer jshearer merged commit 7e7f612 into master Oct 31, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants