diff --git a/src/dataflow/sink/kafka.rs b/src/dataflow/sink/kafka.rs index 318b65cd91c2..8f6283dd21e6 100644 --- a/src/dataflow/sink/kafka.rs +++ b/src/dataflow/sink/kafka.rs @@ -16,31 +16,31 @@ use rdkafka::config::ClientConfig; use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; -/// TODO@jldlaughlin: Progress tracking for kafka sinks #1442 -/// -/// Right now, every time Materialize crashes and recovers these sinks -/// will resend each record to Kafka. This is not entirely horrible, but also -/// obviously not ideal! But until we have a more concrete idea of what -/// people will require from Kafka sinks, we're punting on implementing this. -/// -/// For posterity, here are some of the options we discussed to -/// implement this: -/// - Use differential's Consolidate operator on the batches we are -/// iterating through. Track our progress in sending batches to Kafka -/// using the "watermarks" from the consolidated batch (inclusive on the -/// lower bound, exclusive on the upper bound). Store these timestamps -/// persistently (in mzdata/catalog) in order to either: -/// - Resend everything including and after the last successfully sent batch. -/// This assumes the Kafka topic we are sending to handles duplicates. -/// - First, send a negative diff of each record in the last successful -/// batch. Then, resend everything after. -/// -/// - Append something like a "Materialize start up timestamp" to the -/// end of the Kafka topic name. This accepts resending all of the data, -/// but will not duplicate data in a single topic. -/// - NB: This, like other decisions we've made, assumes that -/// the user has configured their Kafka instance to automatically -/// create new topics. +// TODO@jldlaughlin: Progress tracking for kafka sinks #1442 +// +// Right now, every time Materialize crashes and recovers these sinks +// will resend each record to Kafka. This is not entirely horrible, but also +// obviously not ideal! But until we have a more concrete idea of what +// people will require from Kafka sinks, we're punting on implementing this. +// +// For posterity, here are some of the options we discussed to +// implement this: +// - Use differential's Consolidate operator on the batches we are +// iterating through. Track our progress in sending batches to Kafka +// using the "watermarks" from the consolidated batch (inclusive on the +// lower bound, exclusive on the upper bound). Store these timestamps +// persistently (in mzdata/catalog) in order to either: +// - Resend everything including and after the last successfully sent batch. +// This assumes the Kafka topic we are sending to handles duplicates. +// - First, send a negative diff of each record in the last successful +// batch. Then, resend everything after. +// +// - Append something like a "Materialize start up timestamp" to the +// end of the Kafka topic name. This accepts resending all of the data, +// but will not duplicate data in a single topic. +// - NB: This, like other decisions we've made, assumes that +// the user has configured their Kafka instance to automatically +// create new topics. pub fn kafka( stream: &Stream, id: GlobalId,