From ed10fc63a9fd3032a529f7e10a0f863ee71b710a Mon Sep 17 00:00:00 2001 From: Rohan Date: Sat, 14 Dec 2024 20:18:49 -0800 Subject: [PATCH] KAFKA-18026: supply stores for KTable#mapValues using ProcessorSupplier#stores (#18155) KAFKA-18026: supply stores for KTable#mapValues using ProcessorSupplier#stores Reviewers: Guozhang Wang , Anna Sophie Blee-Goldman --- .../streams/kstream/internals/KTableImpl.java | 11 +--- .../kstream/internals/KTableMapValues.java | 18 +++++- .../internals/graph/TableFilterNode.java | 2 +- .../internals/graph/TableProcessorNode.java | 21 +++---- .../kafka/streams/StreamsBuilderTest.java | 57 +++++++++++++++++-- 5 files changed, 79 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index f90d35827f196..e661d78fefcd5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -322,7 +322,7 @@ private KTable doMapValues(final ValueMapperWithKey processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName); + final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName, storeFactory); // leaving in calls to ITB until building topology with graph @@ -331,8 +331,7 @@ private KTable doMapValues(final ValueMapperWithKey( name, - processorParameters, - storeFactory + processorParameters ); maybeSetOutputVersioned(tableNode, materializedInternal); @@ -1358,16 +1357,12 @@ private KTable doJoinOnForeignKey(final KTable forei final KTableSource resultProcessorSupplier = new KTableSource<>(materializedInternal); - final StoreFactory resultStore = - new KeyValueStoreMaterializer<>(materializedInternal); - final TableProcessorNode resultNode = new TableProcessorNode<>( resultProcessorName, new ProcessorParameters<>( resultProcessorSupplier, resultProcessorName - ), - resultStore + ) ); resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier); builder.addGraphNode(responseJoinNode, resultNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index c26488c12a178..af495c9b9a34a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -20,9 +20,14 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; +import java.util.Collections; +import java.util.Set; + import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; @@ -33,13 +38,16 @@ class KTableMapValues implements KTableProcessorSupplier mapper; private final String queryableName; private boolean sendOldValues = false; + private final StoreFactory storeFactory; KTableMapValues(final KTableImpl parent, final ValueMapperWithKey mapper, - final String queryableName) { + final String queryableName, + final StoreFactory storeFactory) { this.parent = parent; this.mapper = mapper; this.queryableName = queryableName; + this.storeFactory = storeFactory; } @Override @@ -47,6 +55,14 @@ public Processor, KIn, Change> get() { return new KTableMapValuesProcessor(); } + @Override + public Set> stores() { + if (storeFactory == null) { + return null; + } + return Collections.singleton(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public KTableValueGetterSupplier view() { // if the KTable is materialized, use the materialized store to return getter value; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java index 1874bd807ed47..38033693ebb62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java @@ -26,7 +26,7 @@ public class TableFilterNode extends TableProcessorNode implements V public TableFilterNode(final String nodeName, final ProcessorParameters processorParameters, final StoreFactory storeFactory) { - super(nodeName, processorParameters, storeFactory); + super(nodeName, processorParameters, storeFactory, null); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java index b47252068e638..af3ab15d4903c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java @@ -17,12 +17,10 @@ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StoreFactory; import java.util.Arrays; -import java.util.Objects; public class TableProcessorNode extends GraphNode { @@ -31,9 +29,8 @@ public class TableProcessorNode extends GraphNode { private final String[] storeNames; public TableProcessorNode(final String nodeName, - final ProcessorParameters processorParameters, - final StoreFactory storeFactory) { - this(nodeName, processorParameters, storeFactory, null); + final ProcessorParameters processorParameters) { + this(nodeName, processorParameters, null, null); } public TableProcessorNode(final String nodeName, @@ -62,21 +59,17 @@ public String toString() { @SuppressWarnings("unchecked") @Override public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { + processorParameters.addProcessorTo(topologyBuilder, parentNodeNames()); + final String processorName = processorParameters.processorName(); - topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames()); if (storeNames.length > 0) { + // todo(rodesai): remove me once all operators have been moved to ProcessorSupplier topologyBuilder.connectProcessorAndStateStores(processorName, storeNames); } - final KTableSource tableSource = processorParameters.processorSupplier() instanceof KTableSource ? - (KTableSource) processorParameters.processorSupplier() : null; - if (tableSource != null) { - if (tableSource.materialized()) { - topologyBuilder.addStateStore(Objects.requireNonNull(storeFactory, "storeFactory was null"), - processorName); - } - } else if (storeFactory != null) { + if (storeFactory != null) { + // todo(rodesai) remove when KTableImpl#doFilter, KTableImpl#doTransformValues moved to ProcessorSupplier topologyBuilder.addStateStore(storeFactory, processorName); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 738f753f532cd..be38f0514929c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -1621,6 +1621,26 @@ public void shouldWrapProcessorsForCoGroupedStreamAggregate() { assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); } + @Test + public void shouldWrapProcessorsForMapValuesWithMaterializedStore() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.table("input", Consumed.as("source-table")) + .mapValues(v -> null, Named.as("map-values"), Materialized.as("map-values-store")) + .toStream(Named.as("to-stream")) + .to("output-topic", Produced.as("sink")); + builder.build(); + + assertThat(counter.wrappedProcessorNames(), + Matchers.containsInAnyOrder("source-table", "map-values", "to-stream")); + assertThat(counter.numUniqueStateStores(), is(1)); + assertThat(counter.numConnectedStateStores(), is(1)); + } + @Test public void shouldWrapProcessorsForTableAggregate() { final Map props = dummyStreamsConfigMap(); @@ -1687,21 +1707,46 @@ public void shouldWrapProcessorsForStatelessOperators() { .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3 .peek((k, v) -> { }, Named.as("peek")) // wrapped 4 .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5 - .toTable(Named.as("toTable")) // wrapped 6 - .filter((k, v) -> true, Named.as("filter-table")) // should be wrapped once we do TableProcessorNode - .toStream(Named.as("toStream")) // wrapped 7 .to("output", Produced.as("sink")); builder.build(); - assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8)); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5)); assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( - "filter-stream", "map", "selectKey", "peek", "flatMap", - "toTable-repartition-filter", "toStream", "toTable" + "filter-stream", "map", "selectKey", "peek", "flatMap" )); assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0)); assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0)); } + @Test + public void shouldWrapProcessorsWhenMultipleTableOperators() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) + .toTable(Named.as("to-table")) + .mapValues(v -> v, Named.as("map-values")) + .mapValues(v -> v, Named.as("map-values-stateful"), Materialized.as("map-values-stateful")) + .filter((k, v) -> true, Named.as("filter-table")) + .filter((k, v) -> true, Named.as("filter-table-stateful"), Materialized.as("filter-table-stateful")) + .toStream(Named.as("to-stream")) + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "to-table", "map-values", "map-values-stateful", + "filter-table", "filter-table-stateful", "to-stream" + )); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + } + @Test public void shouldWrapProcessorsForUnmaterializedSourceTable() { final Map props = dummyStreamsConfigMap();