Skip to content

Commit

Permalink
KAFKA-18026: supply stores for KTable#mapValues using ProcessorSuppli…
Browse files Browse the repository at this point in the history
…er#stores (#18155)

KAFKA-18026: supply stores for KTable#mapValues using ProcessorSupplier#stores

Reviewers: Guozhang Wang <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
rodesai authored Dec 15, 2024
1 parent 0815d70 commit ed10fc6
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? sup

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);

final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName);
final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName, storeFactory);

// leaving in calls to ITB until building topology with graph

Expand All @@ -331,8 +331,7 @@ private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? sup
);
final GraphNode tableNode = new TableProcessorNode<>(
name,
processorParameters,
storeFactory
processorParameters
);
maybeSetOutputVersioned(tableNode, materializedInternal);

Expand Down Expand Up @@ -1358,16 +1357,12 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei

final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal);

final StoreFactory resultStore =
new KeyValueStoreMaterializer<>(materializedInternal);

final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
resultProcessorName,
new ProcessorParameters<>(
resultProcessorSupplier,
resultProcessorName
),
resultStore
)
);
resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
builder.addGraphNode(responseJoinNode, resultNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;

import java.util.Collections;
import java.util.Set;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
Expand All @@ -33,20 +38,31 @@ class KTableMapValues<KIn, VIn, VOut> implements KTableProcessorSupplier<KIn, VI
private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends VOut> mapper;
private final String queryableName;
private boolean sendOldValues = false;
private final StoreFactory storeFactory;

KTableMapValues(final KTableImpl<KIn, ?, VIn> parent,
final ValueMapperWithKey<? super KIn, ? super VIn, ? extends VOut> mapper,
final String queryableName) {
final String queryableName,
final StoreFactory storeFactory) {
this.parent = parent;
this.mapper = mapper;
this.queryableName = queryableName;
this.storeFactory = storeFactory;
}

@Override
public Processor<KIn, Change<VIn>, KIn, Change<VOut>> get() {
return new KTableMapValuesProcessor();
}

@Override
public Set<StoreBuilder<?>> stores() {
if (storeFactory == null) {
return null;
}
return Collections.singleton(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
}

@Override
public KTableValueGetterSupplier<KIn, VOut> view() {
// if the KTable is materialized, use the materialized store to return getter value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements V
public TableFilterNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory storeFactory) {
super(nodeName, processorParameters, storeFactory);
super(nodeName, processorParameters, storeFactory, null);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;

import java.util.Arrays;
import java.util.Objects;

public class TableProcessorNode<K, V> extends GraphNode {

Expand All @@ -31,9 +29,8 @@ public class TableProcessorNode<K, V> extends GraphNode {
private final String[] storeNames;

public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory storeFactory) {
this(nodeName, processorParameters, storeFactory, null);
final ProcessorParameters<K, V, ?, ?> processorParameters) {
this(nodeName, processorParameters, null, null);
}

public TableProcessorNode(final String nodeName,
Expand Down Expand Up @@ -62,21 +59,17 @@ public String toString() {
@SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());

final String processorName = processorParameters.processorName();
topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames());

if (storeNames.length > 0) {
// todo(rodesai): remove me once all operators have been moved to ProcessorSupplier
topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
}

final KTableSource<K, V> tableSource = processorParameters.processorSupplier() instanceof KTableSource ?
(KTableSource<K, V>) processorParameters.processorSupplier() : null;
if (tableSource != null) {
if (tableSource.materialized()) {
topologyBuilder.addStateStore(Objects.requireNonNull(storeFactory, "storeFactory was null"),
processorName);
}
} else if (storeFactory != null) {
if (storeFactory != null) {
// todo(rodesai) remove when KTableImpl#doFilter, KTableImpl#doTransformValues moved to ProcessorSupplier
topologyBuilder.addStateStore(storeFactory, processorName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,26 @@ public void shouldWrapProcessorsForCoGroupedStreamAggregate() {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}

@Test
public void shouldWrapProcessorsForMapValuesWithMaterializedStore() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

builder.table("input", Consumed.as("source-table"))
.mapValues(v -> null, Named.as("map-values"), Materialized.as("map-values-store"))
.toStream(Named.as("to-stream"))
.to("output-topic", Produced.as("sink"));
builder.build();

assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("source-table", "map-values", "to-stream"));
assertThat(counter.numUniqueStateStores(), is(1));
assertThat(counter.numConnectedStateStores(), is(1));
}

@Test
public void shouldWrapProcessorsForTableAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
Expand Down Expand Up @@ -1687,21 +1707,46 @@ public void shouldWrapProcessorsForStatelessOperators() {
.selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
.peek((k, v) -> { }, Named.as("peek")) // wrapped 4
.flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5
.toTable(Named.as("toTable")) // wrapped 6
.filter((k, v) -> true, Named.as("filter-table")) // should be wrapped once we do TableProcessorNode
.toStream(Named.as("toStream")) // wrapped 7
.to("output", Produced.as("sink"));

builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8));
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"filter-stream", "map", "selectKey", "peek", "flatMap",
"toTable-repartition-filter", "toStream", "toTable"
"filter-stream", "map", "selectKey", "peek", "flatMap"
));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
}

@Test
public void shouldWrapProcessorsWhenMultipleTableOperators() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);

final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);

final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

builder.stream("input", Consumed.as("source"))
.toTable(Named.as("to-table"))
.mapValues(v -> v, Named.as("map-values"))
.mapValues(v -> v, Named.as("map-values-stateful"), Materialized.as("map-values-stateful"))
.filter((k, v) -> true, Named.as("filter-table"))
.filter((k, v) -> true, Named.as("filter-table-stateful"), Materialized.as("filter-table-stateful"))
.toStream(Named.as("to-stream"))
.to("output", Produced.as("sink"));

builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"to-table", "map-values", "map-values-stateful",
"filter-table", "filter-table-stateful", "to-stream"
));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}

@Test
public void shouldWrapProcessorsForUnmaterializedSourceTable() {
final Map<Object, Object> props = dummyStreamsConfigMap();
Expand Down

0 comments on commit ed10fc6

Please sign in to comment.