Skip to content

Commit

Permalink
Merge pull request #1548 from JLDLaughlin/kafka_todo
Browse files Browse the repository at this point in the history
Add progress tracking todo to Kafka sink
  • Loading branch information
JLDLaughlin authored Jan 15, 2020
2 parents dc19e0e + 2d7f2aa commit c8c227c
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/dataflow/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@ use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

// TODO@jldlaughlin: Progress tracking for kafka sinks #1442
//
// Right now, every time Materialize crashes and recovers these sinks
// will resend each record to Kafka. This is not entirely horrible, but also
// obviously not ideal! But until we have a more concrete idea of what
// people will require from Kafka sinks, we're punting on implementing this.
//
// For posterity, here are some of the options we discussed to
// implement this:
// - Use differential's Consolidate operator on the batches we are
// iterating through. Track our progress in sending batches to Kafka
// using the "watermarks" from the consolidated batch (inclusive on the
// lower bound, exclusive on the upper bound). Store these timestamps
// persistently (in mzdata/catalog) in order to either:
// - Resend everything including and after the last successfully sent batch.
// This assumes the Kafka topic we are sending to handles duplicates.
// - First, send a negative diff of each record in the last successful
// batch. Then, resend everything after.
//
// - Append something like a "Materialize start up timestamp" to the
// end of the Kafka topic name. This accepts resending all of the data,
// but will not duplicate data in a single topic.
// - NB: This, like other decisions we've made, assumes that
// the user has configured their Kafka instance to automatically
// create new topics.
pub fn kafka<G>(
stream: &Stream<G, (Row, Timestamp, Diff)>,
id: GlobalId,
Expand Down

0 comments on commit c8c227c

Please sign in to comment.