Skip to content

Commit

Permalink
/// -> //
Browse files Browse the repository at this point in the history
  • Loading branch information
JLDLaughlin committed Jan 15, 2020
1 parent c2cc49f commit 2d7f2aa
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions src/dataflow/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@ use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

/// TODO@jldlaughlin: Progress tracking for kafka sinks #1442
///
/// Right now, every time Materialize crashes and recovers these sinks
/// will resend each record to Kafka. This is not entirely horrible, but also
/// obviously not ideal! But until we have a more concrete idea of what
/// people will require from Kafka sinks, we're punting on implementing this.
///
/// For posterity, here are some of the options we discussed to
/// implement this:
/// - Use differential's Consolidate operator on the batches we are
/// iterating through. Track our progress in sending batches to Kafka
/// using the "watermarks" from the consolidated batch (inclusive on the
/// lower bound, exclusive on the upper bound). Store these timestamps
/// persistently (in mzdata/catalog) in order to either:
/// - Resend everything including and after the last successfully sent batch.
/// This assumes the Kafka topic we are sending to handles duplicates.
/// - First, send a negative diff of each record in the last successful
/// batch. Then, resend everything after.
///
/// - Append something like a "Materialize start up timestamp" to the
/// end of the Kafka topic name. This accepts resending all of the data,
/// but will not duplicate data in a single topic.
/// - NB: This, like other decisions we've made, assumes that
/// the user has configured their Kafka instance to automatically
/// create new topics.
// TODO@jldlaughlin: Progress tracking for kafka sinks #1442
//
// Right now, every time Materialize crashes and recovers these sinks
// will resend each record to Kafka. This is not entirely horrible, but also
// obviously not ideal! But until we have a more concrete idea of what
// people will require from Kafka sinks, we're punting on implementing this.
//
// For posterity, here are some of the options we discussed to
// implement this:
// - Use differential's Consolidate operator on the batches we are
// iterating through. Track our progress in sending batches to Kafka
// using the "watermarks" from the consolidated batch (inclusive on the
// lower bound, exclusive on the upper bound). Store these timestamps
// persistently (in mzdata/catalog) in order to either:
// - Resend everything including and after the last successfully sent batch.
// This assumes the Kafka topic we are sending to handles duplicates.
// - First, send a negative diff of each record in the last successful
// batch. Then, resend everything after.
//
// - Append something like a "Materialize start up timestamp" to the
// end of the Kafka topic name. This accepts resending all of the data,
// but will not duplicate data in a single topic.
// - NB: This, like other decisions we've made, assumes that
// the user has configured their Kafka instance to automatically
// create new topics.
pub fn kafka<G>(
stream: &Stream<G, (Row, Timestamp, Diff)>,
id: GlobalId,
Expand Down

0 comments on commit 2d7f2aa

Please sign in to comment.