diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java index 9bb83733c82d0..f06a42851f943 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -25,21 +25,21 @@ import static java.util.Objects.requireNonNull; -public class FullChangeSerde implements Serde> { +public final class FullChangeSerde implements Serde> { private final Serde inner; @SuppressWarnings("unchecked") - public static FullChangeSerde castOrWrap(final Serde serde) { + public static FullChangeSerde castOrWrap(final Serde serde) { if (serde == null) { return null; } else if (serde instanceof FullChangeSerde) { return (FullChangeSerde) serde; } else { - return new FullChangeSerde((Serde) serde); + return new FullChangeSerde<>(serde); } } - public FullChangeSerde(final Serde inner) { + private FullChangeSerde(final Serde inner) { this.inner = requireNonNull(inner); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ba08b89ed2927..529270c5f22c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -391,18 +391,26 @@ public KStream peek(final ForeachAction action) { @Override public KStream through(final String topic) { - return through(topic, Produced.with(null, null, null)); + return through(topic, Produced.with(keySerde, valSerde, null)); } @Override public KStream through(final String topic, final Produced produced) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(produced, "Produced can't be null"); final ProducedInternal producedInternal = new ProducedInternal<>(produced); + if (producedInternal.keySerde() == null) { + producedInternal.withKeySerde(keySerde); + } + if (producedInternal.valueSerde() == null) { + producedInternal.withValueSerde(valSerde); + } to(topic, producedInternal); return builder.stream( Collections.singleton(topic), new ConsumedInternal<>( - producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde, - producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde, + producedInternal.keySerde(), + producedInternal.valueSerde(), new FailOnInvalidTimestamp(), null ) @@ -411,26 +419,40 @@ public KStream through(final String topic, final Produced produced) @Override public void to(final String topic) { - to(topic, Produced.with(null, null, null)); + to(topic, Produced.with(keySerde, valSerde, null)); } @Override public void to(final String topic, final Produced produced) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(produced, "Produced can't be null"); - to(new StaticTopicNameExtractor<>(topic), new ProducedInternal<>(produced)); + final ProducedInternal producedInternal = new ProducedInternal<>(produced); + if (producedInternal.keySerde() == null) { + producedInternal.withKeySerde(keySerde); + } + if (producedInternal.valueSerde() == null) { + producedInternal.withValueSerde(valSerde); + } + to(new StaticTopicNameExtractor<>(topic), producedInternal); } @Override public void to(final TopicNameExtractor topicExtractor) { - to(topicExtractor, Produced.with(null, null, null)); + to(topicExtractor, Produced.with(keySerde, valSerde, null)); } @Override public void to(final TopicNameExtractor topicExtractor, final Produced produced) { Objects.requireNonNull(topicExtractor, "topic extractor can't be null"); Objects.requireNonNull(produced, "Produced can't be null"); - to(topicExtractor, new ProducedInternal<>(produced)); + final ProducedInternal producedInternal = new ProducedInternal<>(produced); + if (producedInternal.keySerde() == null) { + producedInternal.withKeySerde(keySerde); + } + if (producedInternal.valueSerde() == null) { + producedInternal.withValueSerde(valSerde); + } + to(topicExtractor, producedInternal); } private void to(final TopicNameExtractor topicExtractor, final ProducedInternal produced) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 6e65b89fd9ddb..7d059ee65df53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -359,18 +359,13 @@ public KTable suppress(final Suppressed suppressed) { suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); final ProcessorSupplier> suppressionSupplier = - () -> new KTableSuppressProcessor<>( - suppressedInternal, - storeName, - keySerde, - valSerde == null ? null : new FullChangeSerde<>(valSerde) - ); + () -> new KTableSuppressProcessor<>(suppressedInternal, storeName); final ProcessorGraphNode> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName), + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valSerde)), false ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 686002a11dbfa..7184d7abbd115 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -19,8 +19,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.FullChangeSerde; @@ -30,7 +28,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.state.internals.ContextualRecord; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; import static java.util.Objects.requireNonNull; @@ -44,22 +41,14 @@ public class KTableSuppressProcessor implements Processor> { private final boolean safeToDropTombstones; private final String storeName; - private TimeOrderedKeyValueBuffer buffer; + private TimeOrderedKeyValueBuffer> buffer; private InternalProcessorContext internalProcessorContext; private Sensor suppressionEmitSensor; - private Serde keySerde; - private FullChangeSerde valueSerde; - private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; - public KTableSuppressProcessor(final SuppressedInternal suppress, - final String storeName, - final Serde keySerde, - final FullChangeSerde valueSerde) { + public KTableSuppressProcessor(final SuppressedInternal suppress, final String storeName) { this.storeName = storeName; requireNonNull(suppress); - this.keySerde = keySerde; - this.valueSerde = valueSerde; maxRecords = suppress.bufferConfig().maxRecords(); maxBytes = suppress.bufferConfig().maxBytes(); suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis(); @@ -74,9 +63,8 @@ public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext); - keySerde = keySerde == null ? (Serde) context.keySerde() : keySerde; - valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde; - buffer = requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName)); + buffer = requireNonNull((TimeOrderedKeyValueBuffer>) context.getStateStore(storeName)); + buffer.setSerdesIfNull((Serde) context.keySerde(), FullChangeSerde.castOrWrap((Serde) context.valueSerde())); } @Override @@ -88,12 +76,7 @@ public void process(final K key, final Change value) { private void buffer(final K key, final Change value) { final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key); - final ProcessorRecordContext recordContext = internalProcessorContext.recordContext(); - - final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key)); - final byte[] serializedValue = valueSerde.serializer().serialize(null, value); - - buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext)); + buffer.put(bufferTime, key, value, internalProcessorContext.recordContext()); } private void enforceConstraints() { @@ -114,6 +97,11 @@ private void enforceConstraints() { buffer.numRecords(), maxRecords, buffer.bufferSize(), maxBytes )); + default: + throw new UnsupportedOperationException( + "The bufferFullStrategy [" + bufferFullStrategy + + "] is not implemented. This is a bug in Kafka Streams." + ); } } } @@ -122,14 +110,12 @@ private boolean overCapacity() { return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes; } - private void emit(final KeyValue toEmit) { - final Change value = valueSerde.deserializer().deserialize(null, toEmit.value.value()); - if (shouldForward(value)) { + private void emit(final TimeOrderedKeyValueBuffer.Eviction> toEmit) { + if (shouldForward(toEmit.value())) { final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); - internalProcessorContext.setRecordContext(toEmit.value.recordContext()); + internalProcessorContext.setRecordContext(toEmit.recordContext()); try { - final K key = keySerde.deserializer().deserialize(null, toEmit.key.get()); - internalProcessorContext.forward(key, value); + internalProcessorContext.forward(toEmit.key(), toEmit.value()); suppressionEmitSensor.record(); } finally { internalProcessorContext.setRecordContext(prevRecordContext); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index d7fe3e49c7d83..654f11bfe14fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -147,7 +147,7 @@ public void forward(final K key, final V value, final To to) { final ProcessorNode previousNode = currentNode(); - final long currentTimestamp = recordContext.timestamp; + final long currentTimestamp = recordContext.timestamp(); try { toInternal.update(to); @@ -170,7 +170,7 @@ public void forward(final K key, forward(child, key, value); } } finally { - recordContext.timestamp = currentTimestamp; + recordContext.setTimestamp(currentTimestamp); setCurrentNode(previousNode); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index aacb801b5819f..cc512ae90bb0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -29,11 +29,11 @@ public class ProcessorRecordContext implements RecordContext { - long timestamp; - final long offset; - final String topic; - final int partition; - final Headers headers; + private long timestamp; + private final long offset; + private final String topic; + private final int partition; + private final Headers headers; public ProcessorRecordContext(final long timestamp, final long offset, @@ -48,13 +48,6 @@ public ProcessorRecordContext(final long timestamp, this.headers = headers; } - public ProcessorRecordContext(final long timestamp, - final long offset, - final int partition, - final String topic) { - this(timestamp, offset, partition, topic, null); - } - public void setTimestamp(final long timestamp) { this.timestamp = timestamp; } @@ -225,9 +218,13 @@ public boolean equals(final Object o) { Objects.equals(headers, that.headers); } + /** + * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable. + */ + @Deprecated @Override public int hashCode() { - return Objects.hash(timestamp, offset, topic, partition, headers); + throw new UnsupportedOperationException("ProcessorRecordContext is unsafe for use in Hash collections"); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 8e58b16b7004a..e11df7caf38f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -23,8 +23,8 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -50,7 +50,7 @@ import static java.util.Objects.requireNonNull; -public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { +public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer(); private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer(); private static final RecordHeaders V_1_CHANGELOG_HEADERS = @@ -63,6 +63,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa private final String storeName; private final boolean loggingEnabled; + private Serde keySerde; + private Serde valueSerde; + private long memBufferSize = 0L; private long minTimestamp = Long.MAX_VALUE; private RecordCollector collector; @@ -74,13 +77,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa private int partition; - public static class Builder implements StoreBuilder { + public static class Builder implements StoreBuilder { private final String storeName; + private final Serde keySerde; + private final Serde valSerde; private boolean loggingEnabled = true; - public Builder(final String storeName) { + public Builder(final String storeName, final Serde keySerde, final Serde valSerde) { this.storeName = storeName; + this.keySerde = keySerde; + this.valSerde = valSerde; } /** @@ -119,8 +126,8 @@ public StoreBuilder withLoggingDisabled() { } @Override - public StateStore build() { - return new InMemoryTimeOrderedKeyValueBuffer(storeName, loggingEnabled); + public InMemoryTimeOrderedKeyValueBuffer build() { + return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valSerde); } @Override @@ -182,9 +189,14 @@ public String toString() { } } - private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) { + private InMemoryTimeOrderedKeyValueBuffer(final String storeName, + final boolean loggingEnabled, + final Serde keySerde, + final Serde valueSerde) { this.storeName = storeName; this.loggingEnabled = loggingEnabled; + this.keySerde = keySerde; + this.valueSerde = valueSerde; } @Override @@ -198,9 +210,16 @@ public boolean persistent() { return false; } + @Override + public void setSerdesIfNull(final Serde keySerde, final Serde valueSerde) { + this.keySerde = this.keySerde == null ? keySerde : this.keySerde; + this.valueSerde = this.valueSerde == null ? valueSerde : this.valueSerde; + } + @Override public void init(final ProcessorContext context, final StateStore root) { final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; + bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext); bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext); @@ -359,7 +378,7 @@ private void restoreBatch(final Collection> batch @Override public void evictWhile(final Supplier predicate, - final Consumer> callback) { + final Consumer> callback) { final Iterator> delegate = sortedMap.entrySet().iterator(); int evictions = 0; @@ -377,7 +396,9 @@ public void evictWhile(final Supplier predicate, next.getKey().time + "]" ); } - callback.accept(new KeyValue<>(next.getKey().key, next.getValue())); + final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key.get()); + final V value = valueSerde.deserializer().deserialize(changelogTopic, next.getValue().value()); + callback.accept(new Eviction<>(key, value, next.getValue().recordContext())); delegate.remove(); index.remove(next.getKey().key); @@ -405,13 +426,17 @@ public void evictWhile(final Supplier predicate, @Override public void put(final long time, - final Bytes key, - final ContextualRecord contextualRecord) { - requireNonNull(contextualRecord.value(), "value cannot be null"); - requireNonNull(contextualRecord.recordContext(), "recordContext cannot be null"); + final K key, + final V value, + final ProcessorRecordContext recordContext) { + requireNonNull(value, "value cannot be null"); + requireNonNull(recordContext, "recordContext cannot be null"); + + final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key)); + final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value); - cleanPut(time, key, contextualRecord); - dirtyKeys.add(key); + cleanPut(time, serializedKey, new ContextualRecord(serializedValue, recordContext)); + dirtyKeys.add(serializedKey); updateBufferMetrics(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java index 86a8c1e651dab..ffa1f49c4bca7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java @@ -16,17 +16,64 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; -public interface TimeOrderedKeyValueBuffer extends StateStore { - void evictWhile(final Supplier predicate, final Consumer> callback); +public interface TimeOrderedKeyValueBuffer extends StateStore { + final class Eviction { + private final K key; + private final V value; + private final ProcessorRecordContext recordContext; - void put(final long time, final Bytes key, final ContextualRecord value); + Eviction(final K key, final V value, final ProcessorRecordContext recordContext) { + this.key = key; + this.value = value; + this.recordContext = recordContext; + } + + public K key() { + return key; + } + + public V value() { + return value; + } + + public ProcessorRecordContext recordContext() { + return recordContext; + } + + @Override + public String toString() { + return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final Eviction eviction = (Eviction) o; + return Objects.equals(key, eviction.key) && + Objects.equals(value, eviction.value) && + Objects.equals(recordContext, eviction.recordContext); + } + + @Override + public int hashCode() { + return Objects.hash(key, value, recordContext); + } + } + + void setSerdesIfNull(final Serde keySerde, final Serde valueSerde); + + void evictWhile(final Supplier predicate, final Consumer> callback); + + void put(long time, K key, V value, ProcessorRecordContext recordContext); int numRecords(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 46a4f13cada31..e3f0a4121e351 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -16,10 +16,16 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; @@ -31,21 +37,23 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; +import org.hamcrest.Matchers; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.time.Duration; import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.Properties; +import java.util.stream.Collectors; import static java.lang.Long.MAX_VALUE; import static java.time.Duration.ofMillis; @@ -64,7 +72,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -@Category({IntegrationTest.class}) +@Category(IntegrationTest.class) public class SuppressionIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( @@ -76,7 +84,7 @@ public class SuppressionIntegrationTest { private static final Serde STRING_SERDE = Serdes.String(); private static final int COMMIT_INTERVAL = 100; - private KTable buildCountsTable(final String input, final StreamsBuilder builder) { + private static KTable buildCountsTable(final String input, final StreamsBuilder builder) { return builder .table( input, @@ -89,6 +97,139 @@ private KTable buildCountsTable(final String input, final StreamsB .count(Materialized.>as("counts").withCachingDisabled()); } + @Test + public void shouldUseDefaultSerdes() { + final String testId = "-shouldInheritSerdes"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream = builder.stream(input); + + final KTable valueCounts = inputStream + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")"); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull())) + .toStream() + .to(outputSuppressed); + + valueCounts + .toStream() + .to(outputRaw); + + final Properties streamsConfig = getStreamsConfig(appId); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + final boolean rawRecords = waitForAnyRecord(outputRaw); + final boolean suppressedRecords = waitForAnyRecord(outputSuppressed); + assertThat(rawRecords, Matchers.is(true)); + assertThat(suppressedRecords, is(true)); + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + + @Test + public void shouldInheritSerdes() { + final String testId = "-shouldInheritSerdes"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream = builder.stream(input); + + // count sets the serde to Long + final KTable valueCounts = inputStream + .groupByKey() + .count(); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull())) + .toStream() + .to(outputSuppressed); + + valueCounts + .toStream() + .to(outputRaw); + + final Properties streamsConfig = getStreamsConfig(appId); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + final boolean rawRecords = waitForAnyRecord(outputRaw); + final boolean suppressedRecords = waitForAnyRecord(outputSuppressed); + assertThat(rawRecords, Matchers.is(true)); + assertThat(suppressedRecords, is(true)); + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + + private static boolean waitForAnyRecord(final String topic) { + final Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + try (final Consumer consumer = new KafkaConsumer<>(properties)) { + final List partitions = + consumer.partitionsFor(topic) + .stream() + .map(pi -> new TopicPartition(pi.topic(), pi.partition())) + .collect(Collectors.toList()); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + final long start = System.currentTimeMillis(); + while ((System.currentTimeMillis() - start) < DEFAULT_TIMEOUT) { + final ConsumerRecords records = consumer.poll(ofMillis(500)); + + if (!records.isEmpty()) { + return true; + } + } + + return false; + } + } + @Test public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException { final String testId = "-shouldShutdownWhenRecordConstraintIsViolated"; @@ -145,7 +286,7 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce valueCounts // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. - .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull())) + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull())) .toStream() .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); @@ -172,7 +313,7 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce } } - private Properties getStreamsConfig(final String appId) { + private static Properties getStreamsConfig(final String appId) { return mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), @@ -187,11 +328,11 @@ private Properties getStreamsConfig(final String appId) { * scaling to ensure that there are commits in between the various test events, * just to exercise that everything works properly in the presence of commits. */ - private long scaledTime(final long unscaledTime) { + private static long scaledTime(final long unscaledTime) { return COMMIT_INTERVAL * 2 * unscaledTime; } - private void produceSynchronously(final String topic, final List> toProduce) { + private static void produceSynchronously(final String topic, final List> toProduce) { final Properties producerConfig = mkProperties(mkMap( mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer) STRING_SERIALIZER).getClass().getName()), @@ -201,7 +342,7 @@ private void produceSynchronously(final String topic, final List !driver.state().isRunning(), DEFAULT_TIMEOUT, "Streams didn't shut down."); assertThat(driver.state(), is(KafkaStreams.State.ERROR)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java index a6a888840fdf4..ddba05e06414f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java @@ -29,7 +29,7 @@ import static org.hamcrest.core.Is.is; public class FullChangeSerdeTest { - private final FullChangeSerde serde = new FullChangeSerde<>(Serdes.String()); + private final FullChangeSerde serde = FullChangeSerde.castOrWrap(Serdes.String()); @Test public void shouldRoundTripNull() { @@ -77,31 +77,28 @@ public void shouldRoundTripChange() { ); } - @SuppressWarnings("unchecked") @Test public void shouldConfigureSerde() { final Serde mock = EasyMock.mock(Serde.class); mock.configure(emptyMap(), false); EasyMock.expectLastCall(); EasyMock.replay(mock); - final FullChangeSerde serde = new FullChangeSerde<>(mock); + final FullChangeSerde serde = FullChangeSerde.castOrWrap(mock); serde.configure(emptyMap(), false); EasyMock.verify(mock); } - @SuppressWarnings("unchecked") @Test public void shouldCloseSerde() { final Serde mock = EasyMock.mock(Serde.class); mock.close(); EasyMock.expectLastCall(); EasyMock.replay(mock); - final FullChangeSerde serde = new FullChangeSerde<>(mock); + final FullChangeSerde serde = FullChangeSerde.castOrWrap(mock); serde.close(); EasyMock.verify(mock); } - @SuppressWarnings("unchecked") @Test public void shouldConfigureSerializer() { final Serde mockSerde = EasyMock.mock(Serde.class); @@ -111,13 +108,12 @@ public void shouldConfigureSerializer() { mockSerializer.configure(emptyMap(), false); EasyMock.expectLastCall(); EasyMock.replay(mockSerializer); - final Serializer> serializer = new FullChangeSerde<>(mockSerde).serializer(); + final Serializer> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer(); serializer.configure(emptyMap(), false); EasyMock.verify(mockSerde); EasyMock.verify(mockSerializer); } - @SuppressWarnings("unchecked") @Test public void shouldCloseSerializer() { final Serde mockSerde = EasyMock.mock(Serde.class); @@ -127,13 +123,12 @@ public void shouldCloseSerializer() { mockSerializer.close(); EasyMock.expectLastCall(); EasyMock.replay(mockSerializer); - final Serializer> serializer = new FullChangeSerde<>(mockSerde).serializer(); + final Serializer> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer(); serializer.close(); EasyMock.verify(mockSerde); EasyMock.verify(mockSerializer); } - @SuppressWarnings("unchecked") @Test public void shouldConfigureDeserializer() { final Serde mockSerde = EasyMock.mock(Serde.class); @@ -143,13 +138,12 @@ public void shouldConfigureDeserializer() { mockDeserializer.configure(emptyMap(), false); EasyMock.expectLastCall(); EasyMock.replay(mockDeserializer); - final Deserializer> serializer = new FullChangeSerde<>(mockSerde).deserializer(); + final Deserializer> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer(); serializer.configure(emptyMap(), false); EasyMock.verify(mockSerde); EasyMock.verify(mockDeserializer); } - @SuppressWarnings("unchecked") @Test public void shouldCloseDeserializer() { final Serde mockSerde = EasyMock.mock(Serde.class); @@ -159,7 +153,7 @@ public void shouldCloseDeserializer() { mockDeserializer.close(); EasyMock.expectLastCall(); EasyMock.replay(mockDeserializer); - final Deserializer> serializer = new FullChangeSerde<>(mockSerde).deserializer(); + final Deserializer> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer(); serializer.close(); EasyMock.verify(mockSerde); EasyMock.verify(mockDeserializer); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java index 228cfc8d96272..62ae3bfc41a37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.FullChangeSerde; @@ -31,8 +32,6 @@ import java.time.Duration; import java.util.Map; -import static org.apache.kafka.common.serialization.Serdes.Long; -import static org.apache.kafka.common.serialization.Serdes.String; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; @@ -40,7 +39,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.core.Is.is; -@SuppressWarnings("PointlessArithmeticExpression") public class KTableSuppressProcessorMetricsTest { private static final long ARBITRARY_LONG = 5L; @@ -136,16 +134,17 @@ public class KTableSuppressProcessorMetricsTest { public void shouldRecordMetrics() { final String storeName = "test-store"; - final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName) + final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( + storeName, Serdes.String(), + FullChangeSerde.castOrWrap(Serdes.Long()) + ) .withLoggingDisabled() .build(); final KTableSuppressProcessor processor = new KTableSuppressProcessor<>( (SuppressedInternal) Suppressed.untilTimeLimit(Duration.ofDays(100), maxRecords(1)), - storeName, - String(), - new FullChangeSerde<>(Long()) + storeName ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); @@ -191,9 +190,9 @@ public void shouldRecordMetrics() { } @SuppressWarnings("unchecked") - private void verifyMetric(final Map metrics, - final MetricName metricName, - final Matcher matcher) { + private static void verifyMetric(final Map metrics, + final MetricName metricName, + final Matcher matcher) { assertThat(metrics.get(metricName).metricName().description(), is(metricName.description())); assertThat((T) metrics.get(metricName).metricValue(), matcher); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 4b182cbf6fbc3..6c10d91cf90ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -72,16 +72,12 @@ private static class Harness { final String storeName = "test-store"; - final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName) + final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde)) .withLoggingDisabled() .build(); + final KTableSuppressProcessor processor = - new KTableSuppressProcessor<>( - (SuppressedInternal) suppressed, - storeName, - keySerde, - new FullChangeSerde<>(valueSerde) - ); + new KTableSuppressProcessor<>((SuppressedInternal) suppressed, storeName); final MockInternalProcessorContext context = new MockInternalProcessorContext(); context.setCurrentNode(new ProcessorNode("testNode")); @@ -208,7 +204,6 @@ public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() { // note the record is in the past, but the window end is in the future, so we still have to buffer, // even though the grace period is 0. final long timestamp = 5L; - final long streamTime = 99L; final long windowEnd = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0, windowEnd)); @@ -446,8 +441,8 @@ public void suppressShouldShutDownWhenOverByteCapacity() { } } - @SuppressWarnings("unchecked") - private SuppressedInternal finalResults(final Duration grace) { + @SuppressWarnings({"unchecked", "rawtypes"}) + private static SuppressedInternal finalResults(final Duration grace) { return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace); } @@ -471,7 +466,7 @@ public boolean matches(final Object item) { }; } - private Serde> timeWindowedSerdeFrom(final Class rawType, final long windowSize) { + private static Serde> timeWindowedSerdeFrom(final Class rawType, final long windowSize) { final Serde kSerde = Serdes.serdeFrom(rawType); return new Serdes.WrapperSerde<>( new TimeWindowedSerializer<>(kSerde.serializer()), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 8afd30298b54c..3ae7cf2061a2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -96,7 +96,7 @@ public void shouldReturnTopicFromRecordContext() { @Test public void shouldReturnNullIfTopicEqualsNonExistTopic() { - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null)); assertThat(context.topic(), nullValue()); } @@ -154,7 +154,7 @@ public void shouldReturnHeadersFromRecordContext() { @Test public void shouldReturnNullIfHeadersAreNotSet() { - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null)); assertThat(context.headers(), nullValue()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java index ddc40462d4d7b..18f689f3cd483 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java @@ -22,11 +22,11 @@ public class InMemoryTimeOrderedKeyValueBufferTest { @Test public void bufferShouldAllowCacheEnablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingEnabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingEnabled(); } @Test public void bufferShouldAllowCacheDisablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingDisabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 2953953714b93..6ae36d4d911e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -22,13 +22,14 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction; import org.apache.kafka.test.MockInternalProcessorContext; import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector; import org.apache.kafka.test.TestUtils; @@ -55,7 +56,7 @@ import static org.junit.Assert.fail; @RunWith(Parameterized.class) -public class TimeOrderedKeyValueBufferTest { +public class TimeOrderedKeyValueBufferTest> { private static final RecordHeaders V_1_CHANGELOG_HEADERS = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); @@ -69,9 +70,9 @@ public static Collection parameters() { return singletonList( new Object[] { "in-memory buffer", - (Function) name -> - (InMemoryTimeOrderedKeyValueBuffer) new InMemoryTimeOrderedKeyValueBuffer - .Builder(name) + (Function>) name -> + new InMemoryTimeOrderedKeyValueBuffer + .Builder<>(name, Serdes.String(), Serdes.String()) .build() } ); @@ -96,7 +97,7 @@ private static MockInternalProcessorContext makeContext() { } - private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer buffer) { + private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer buffer) { try { buffer.close(); Utils.delete(context.stateDir()); @@ -107,7 +108,7 @@ private static void cleanup(final MockInternalProcessorContext context, final Ti @Test public void shouldInit() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); cleanup(context, buffer); @@ -115,23 +116,23 @@ public void shouldInit() { @Test public void shouldAcceptData() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "2p93nf", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf"); cleanup(context, buffer); } @Test public void shouldRejectNullValues() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); try { - buffer.put(0, getBytes("asdf"), new ContextualRecord( - null, - new ProcessorRecordContext(0, 0, 0, "topic") - )); + buffer.put(0, "asdf", + null, + getContext(0) + ); fail("expected an exception"); } catch (final NullPointerException expected) { // expected @@ -139,27 +140,12 @@ public void shouldRejectNullValues() { cleanup(context, buffer); } - private static ContextualRecord getRecord(final String value) { - return getRecord(value, 0L); - } - - private static ContextualRecord getRecord(final String value, final long timestamp) { - return new ContextualRecord( - value.getBytes(UTF_8), - new ProcessorRecordContext(timestamp, 0, 0, "topic") - ); - } - - private static Bytes getBytes(final String key) { - return Bytes.wrap(key.getBytes(UTF_8)); - } - @Test public void shouldRemoveData() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "qwer", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "qwer"); assertThat(buffer.numRecords(), is(1)); buffer.evictWhile(() -> true, kv -> { }); assertThat(buffer.numRecords(), is(0)); @@ -168,90 +154,71 @@ public void shouldRemoveData() { @Test public void shouldRespectEvictionPredicate() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - final Bytes firstKey = getBytes("asdf"); - final ContextualRecord firstRecord = getRecord("eyt"); - putRecord(0, buffer, context, firstRecord, firstKey); - putRecord(buffer, context, "rtg", 1, "zxcv"); + putRecord(buffer, context, 0L, 0L, "asdf", "eyt"); + putRecord(buffer, context, 1L, 0L, "zxcv", "rtg"); assertThat(buffer.numRecords(), is(2)); - final List> evicted = new LinkedList<>(); + final List> evicted = new LinkedList<>(); buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add); assertThat(buffer.numRecords(), is(1)); - assertThat(evicted, is(singletonList(new KeyValue<>(firstKey, firstRecord)))); + assertThat(evicted, is(singletonList(new Eviction<>("asdf", "eyt", getContext(0L))))); cleanup(context, buffer); } @Test public void shouldTrackCount() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "oin", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "oin"); assertThat(buffer.numRecords(), is(1)); - putRecord(buffer, context, "wekjn", 1, "asdf"); + putRecord(buffer, context, 1L, 0L, "asdf", "wekjn"); assertThat(buffer.numRecords(), is(1)); - putRecord(buffer, context, "24inf", 0, "zxcv"); + putRecord(buffer, context, 0L, 0L, "zxcv", "24inf"); assertThat(buffer.numRecords(), is(2)); cleanup(context, buffer); } @Test public void shouldTrackSize() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "23roni", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "23roni"); assertThat(buffer.bufferSize(), is(43L)); - putRecord(buffer, context, "3l", 1, "asdf"); + putRecord(buffer, context, 1L, 0L, "asdf", "3l"); assertThat(buffer.bufferSize(), is(39L)); - putRecord(buffer, context, "qfowin", 0, "zxcv"); + putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin"); assertThat(buffer.bufferSize(), is(82L)); cleanup(context, buffer); } @Test public void shouldTrackMinTimestamp() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "2093j", 1, "asdf"); + putRecord(buffer, context, 1L, 0L, "asdf", "2093j"); assertThat(buffer.minTimestamp(), is(1L)); - putRecord(buffer, context, "3gon4i", 0, "zxcv"); + putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i"); assertThat(buffer.minTimestamp(), is(0L)); cleanup(context, buffer); } - private static void putRecord(final TimeOrderedKeyValueBuffer buffer, - final MockInternalProcessorContext context, - final String value, - final int time, - final String key) { - putRecord(time, buffer, context, getRecord(value), getBytes(key)); - } - - private static void putRecord(final int time, - final TimeOrderedKeyValueBuffer buffer, - final MockInternalProcessorContext context, - final ContextualRecord firstRecord, - final Bytes firstKey) { - context.setRecordContext(firstRecord.recordContext()); - buffer.put(time, firstKey, firstRecord); - } - @Test public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "o23i4", 1, "zxcv"); + putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4"); assertThat(buffer.numRecords(), is(1)); assertThat(buffer.bufferSize(), is(42L)); assertThat(buffer.minTimestamp(), is(1L)); - putRecord(buffer, context, "3ng", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "3ng"); assertThat(buffer.numRecords(), is(2)); assertThat(buffer.bufferSize(), is(82L)); assertThat(buffer.minTimestamp(), is(0L)); @@ -260,14 +227,14 @@ public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() { buffer.evictWhile(() -> true, kv -> { switch (callbackCount.incrementAndGet()) { case 1: { - assertThat(new String(kv.key.get(), UTF_8), is("asdf")); + assertThat(kv.key(), is("asdf")); assertThat(buffer.numRecords(), is(2)); assertThat(buffer.bufferSize(), is(82L)); assertThat(buffer.minTimestamp(), is(0L)); break; } case 2: { - assertThat(new String(kv.key.get(), UTF_8), is("zxcv")); + assertThat(kv.key(), is("zxcv")); assertThat(buffer.numRecords(), is(1)); assertThat(buffer.bufferSize(), is(42L)); assertThat(buffer.minTimestamp(), is(1L)); @@ -288,12 +255,12 @@ public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() { @Test public void shouldFlush() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(2, buffer, context, getRecord("2093j", 0L), getBytes("asdf")); - putRecord(1, buffer, context, getRecord("3gon4i", 1L), getBytes("zxcv")); - putRecord(0, buffer, context, getRecord("deadbeef", 2L), getBytes("deleteme")); + putRecord(buffer, context, 2L, 0L, "asdf", "2093j"); + putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i"); + putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef"); // replace "deleteme" with a tombstone buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { }); @@ -357,17 +324,16 @@ public void shouldFlush() { cleanup(context, buffer); } - @Test public void shouldRestoreOldFormat() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); final RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "")); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", @@ -425,7 +391,7 @@ public void shouldRestoreOldFormat() { // flush the buffer into a list in buffer order so we can make assertions about the contents. - final List> evicted = new LinkedList<>(); + final List> evicted = new LinkedList<>(); buffer.evictWhile(() -> true, evicted::add); // Several things to note: @@ -437,22 +403,14 @@ public void shouldRestoreOldFormat() { // original format. assertThat(evicted, is(asList( - new KeyValue<>( - getBytes("zxcv"), - new ContextualRecord("3o4im".getBytes(UTF_8), - new ProcessorRecordContext(2, - 2, - 0, - "changelog-topic", - new RecordHeaders()))), - new KeyValue<>( - getBytes("asdf"), - new ContextualRecord("qwer".getBytes(UTF_8), - new ProcessorRecordContext(1, - 1, - 0, - "changelog-topic", - new RecordHeaders()))) + new Eviction<>( + "zxcv", + "3o4im", + new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new RecordHeaders())), + new Eviction<>( + "asdf", + "qwer", + new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new RecordHeaders())) ))); cleanup(context, buffer); @@ -460,14 +418,14 @@ public void shouldRestoreOldFormat() { @Test public void shouldRestoreNewFormat() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); final RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "")); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); @@ -533,7 +491,7 @@ public void shouldRestoreNewFormat() { // flush the buffer into a list in buffer order so we can make assertions about the contents. - final List> evicted = new LinkedList<>(); + final List> evicted = new LinkedList<>(); buffer.evictWhile(() -> true, evicted::add); // Several things to note: @@ -541,41 +499,33 @@ public void shouldRestoreNewFormat() { // * The record timestamps are properly restored, and not conflated with the record's buffer time. // * The keys and values are properly restored // * The record topic is set to the original input topic, *not* the changelog topic - // * The record offset preserves the origininal input record's offset, *not* the offset of the changelog record + // * The record offset preserves the original input record's offset, *not* the offset of the changelog record assertThat(evicted, is(asList( - new KeyValue<>( - getBytes("zxcv"), - new ContextualRecord("3o4im".getBytes(UTF_8), - new ProcessorRecordContext(2, - 0, - 0, - "topic", - null))), - new KeyValue<>( - getBytes("asdf"), - new ContextualRecord("qwer".getBytes(UTF_8), - new ProcessorRecordContext(1, - 0, - 0, - "topic", - null))) - ))); + new Eviction<>( + "zxcv", + "3o4im", + getContext(2L)), + new Eviction<>( + "asdf", + "qwer", + getContext(1L) + )))); cleanup(context, buffer); } @Test public void shouldNotRestoreUnrecognizedVersionRecord() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); final RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "")); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})}); @@ -601,4 +551,26 @@ public void shouldNotRestoreUnrecognizedVersionRecord() { cleanup(context, buffer); } } + + private static void putRecord(final TimeOrderedKeyValueBuffer buffer, + final MockInternalProcessorContext context, + final long streamTime, + final long recordTimestamp, + final String key, + final String value) { + final ProcessorRecordContext recordContext = getContext(recordTimestamp); + context.setRecordContext(recordContext); + buffer.put(streamTime, key, value, recordContext); + } + + private static ContextualRecord getRecord(final String value, final long timestamp) { + return new ContextualRecord( + value.getBytes(UTF_8), + getContext(timestamp) + ); + } + + private static ProcessorRecordContext getContext(final long recordTimestamp) { + return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null); + } }