Skip to content

Commit

Permalink
KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602)
Browse files Browse the repository at this point in the history
Reviewers:  Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored and guozhangwang committed Apr 26, 2019
1 parent 725e184 commit 607cf8f
Show file tree
Hide file tree
Showing 15 changed files with 418 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@

import static java.util.Objects.requireNonNull;

public class FullChangeSerde<T> implements Serde<Change<T>> {
public final class FullChangeSerde<T> implements Serde<Change<T>> {
private final Serde<T> inner;

@SuppressWarnings("unchecked")
public static <T> FullChangeSerde<T> castOrWrap(final Serde<?> serde) {
public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) {
if (serde == null) {
return null;
} else if (serde instanceof FullChangeSerde) {
return (FullChangeSerde<T>) serde;
} else {
return new FullChangeSerde<T>((Serde<T>) serde);
return new FullChangeSerde<>(serde);
}
}

public FullChangeSerde(final Serde<T> inner) {
private FullChangeSerde(final Serde<T> inner) {
this.inner = requireNonNull(inner);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,18 +391,26 @@ public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {

@Override
public KStream<K, V> through(final String topic) {
return through(topic, Produced.with(null, null, null));
return through(topic, Produced.with(keySerde, valSerde, null));
}

@Override
public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
if (producedInternal.keySerde() == null) {
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
}
to(topic, producedInternal);
return builder.stream(
Collections.singleton(topic),
new ConsumedInternal<>(
producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde,
producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde,
producedInternal.keySerde(),
producedInternal.valueSerde(),
new FailOnInvalidTimestamp(),
null
)
Expand All @@ -411,26 +419,40 @@ public KStream<K, V> through(final String topic, final Produced<K, V> produced)

@Override
public void to(final String topic) {
to(topic, Produced.with(null, null, null));
to(topic, Produced.with(keySerde, valSerde, null));
}

@Override
public void to(final String topic, final Produced<K, V> produced) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
to(new StaticTopicNameExtractor<>(topic), new ProducedInternal<>(produced));
final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
if (producedInternal.keySerde() == null) {
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
}
to(new StaticTopicNameExtractor<>(topic), producedInternal);
}

@Override
public void to(final TopicNameExtractor<K, V> topicExtractor) {
to(topicExtractor, Produced.with(null, null, null));
to(topicExtractor, Produced.with(keySerde, valSerde, null));
}

@Override
public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) {
Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
to(topicExtractor, new ProducedInternal<>(produced));
final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
if (producedInternal.keySerde() == null) {
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
}
to(topicExtractor, producedInternal);
}

private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,18 +359,13 @@ public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);

final ProcessorSupplier<K, Change<V>> suppressionSupplier =
() -> new KTableSuppressProcessor<>(
suppressedInternal,
storeName,
keySerde,
valSerde == null ? null : new FullChangeSerde<>(valSerde)
);
() -> new KTableSuppressProcessor<>(suppressedInternal, storeName);


final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valSerde)),
false
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
Expand All @@ -30,7 +28,6 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

import static java.util.Objects.requireNonNull;
Expand All @@ -44,22 +41,14 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
private final boolean safeToDropTombstones;
private final String storeName;

private TimeOrderedKeyValueBuffer buffer;
private TimeOrderedKeyValueBuffer<K, Change<V>> buffer;
private InternalProcessorContext internalProcessorContext;
private Sensor suppressionEmitSensor;
private Serde<K> keySerde;
private FullChangeSerde<V> valueSerde;

private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;

public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
final String storeName,
final Serde<K> keySerde,
final FullChangeSerde<V> valueSerde) {
public KTableSuppressProcessor(final SuppressedInternal<K> suppress, final String storeName) {
this.storeName = storeName;
requireNonNull(suppress);
this.keySerde = keySerde;
this.valueSerde = valueSerde;
maxRecords = suppress.bufferConfig().maxRecords();
maxBytes = suppress.bufferConfig().maxBytes();
suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
Expand All @@ -74,9 +63,8 @@ public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext);

keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
buffer = requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, Change<V>>) context.getStateStore(storeName));
buffer.setSerdesIfNull((Serde<K>) context.keySerde(), FullChangeSerde.castOrWrap((Serde<V>) context.valueSerde()));
}

@Override
Expand All @@ -88,12 +76,7 @@ public void process(final K key, final Change<V> value) {

private void buffer(final K key, final Change<V> value) {
final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
final ProcessorRecordContext recordContext = internalProcessorContext.recordContext();

final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key));
final byte[] serializedValue = valueSerde.serializer().serialize(null, value);

buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext));
buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
}

private void enforceConstraints() {
Expand All @@ -114,6 +97,11 @@ private void enforceConstraints() {
buffer.numRecords(), maxRecords,
buffer.bufferSize(), maxBytes
));
default:
throw new UnsupportedOperationException(
"The bufferFullStrategy [" + bufferFullStrategy +
"] is not implemented. This is a bug in Kafka Streams."
);
}
}
}
Expand All @@ -122,14 +110,12 @@ private boolean overCapacity() {
return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes;
}

private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
final Change<V> value = valueSerde.deserializer().deserialize(null, toEmit.value.value());
if (shouldForward(value)) {
private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> toEmit) {
if (shouldForward(toEmit.value())) {
final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
internalProcessorContext.setRecordContext(toEmit.value.recordContext());
internalProcessorContext.setRecordContext(toEmit.recordContext());
try {
final K key = keySerde.deserializer().deserialize(null, toEmit.key.get());
internalProcessorContext.forward(key, value);
internalProcessorContext.forward(toEmit.key(), toEmit.value());
suppressionEmitSensor.record();
} finally {
internalProcessorContext.setRecordContext(prevRecordContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public <K, V> void forward(final K key,
final V value,
final To to) {
final ProcessorNode previousNode = currentNode();
final long currentTimestamp = recordContext.timestamp;
final long currentTimestamp = recordContext.timestamp();

try {
toInternal.update(to);
Expand All @@ -170,7 +170,7 @@ public <K, V> void forward(final K key,
forward(child, key, value);
}
} finally {
recordContext.timestamp = currentTimestamp;
recordContext.setTimestamp(currentTimestamp);
setCurrentNode(previousNode);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@

public class ProcessorRecordContext implements RecordContext {

long timestamp;
final long offset;
final String topic;
final int partition;
final Headers headers;
private long timestamp;
private final long offset;
private final String topic;
private final int partition;
private final Headers headers;

public ProcessorRecordContext(final long timestamp,
final long offset,
Expand All @@ -48,13 +48,6 @@ public ProcessorRecordContext(final long timestamp,
this.headers = headers;
}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic) {
this(timestamp, offset, partition, topic, null);
}

public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
Expand Down Expand Up @@ -225,9 +218,13 @@ public boolean equals(final Object o) {
Objects.equals(headers, that.headers);
}

/**
* Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable.
*/
@Deprecated
@Override
public int hashCode() {
return Objects.hash(timestamp, offset, topic, partition, headers);
throw new UnsupportedOperationException("ProcessorRecordContext is unsafe for use in Hash collections");
}

@Override
Expand Down
Loading

0 comments on commit 607cf8f

Please sign in to comment.