diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 2e0301ce4d090..060b949f331c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import java.time.Duration; +import java.util.Map; public interface Suppressed extends NamedOperation> { @@ -118,6 +119,25 @@ static StrictBufferConfig unbounded() { * duplicate results downstream, but does not promise to eliminate them. */ EagerBufferConfig emitEarlyWhenFull(); + + /** + * Disable the changelog for this suppression's internal buffer. + * This will turn off fault-tolerance for the suppression, and will result in data loss in the event of a rebalance. + * By default the changelog is enabled. + * @return this + */ + BC withLoggingDisabled(); + + /** + * Indicates that a changelog topic should be created containing the currently suppressed + * records. Due to the short-lived nature of records in this topic it is likely more + * compactable than changelog topics for KTables. + * + * @param config Configs that should be applied to the changelog. Note: Any unrecognized + * configs will be ignored. + * @return this + */ + BC withLoggingEnabled(final Map config); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index ddff497818540..91a01d14e85fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -72,6 +72,7 @@ import java.time.Duration; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Function; @@ -529,10 +530,27 @@ public KTable suppress(final Suppressed suppressed) { this ); + final StoreBuilder> storeBuilder; + + if (suppressedInternal.bufferConfig().isLoggingEnabled()) { + final Map topicConfig = suppressedInternal.bufferConfig().getLogConfig(); + storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( + storeName, + keySerde, + valSerde) + .withLoggingEnabled(topicConfig); + } else { + storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( + storeName, + keySerde, + valSerde) + .withLoggingDisabled(); + } + final ProcessorGraphNode> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valSerde) + storeBuilder ); builder.addGraphNode(streamsGraphNode, node); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java index 2087945ab882e..74de6ef1d452f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java @@ -18,9 +18,11 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Map; + import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; -abstract class BufferConfigInternal> implements Suppressed.BufferConfig { +public abstract class BufferConfigInternal> implements Suppressed.BufferConfig { public abstract long maxRecords(); public abstract long maxBytes(); @@ -46,4 +48,8 @@ public Suppressed.StrictBufferConfig shutDownWhenFull() { public Suppressed.EagerBufferConfig emitEarlyWhenFull() { return new EagerBufferConfigImpl(maxRecords(), maxBytes()); } + + public abstract boolean isLoggingEnabled(); + + public abstract Map getLogConfig(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index 1c1b30c3edc2a..c56532d7f4775 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -18,26 +18,38 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Collections; +import java.util.Map; import java.util.Objects; public class EagerBufferConfigImpl extends BufferConfigInternal implements Suppressed.EagerBufferConfig { private final long maxRecords; private final long maxBytes; + private final Map logConfig; public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) { this.maxRecords = maxRecords; this.maxBytes = maxBytes; + this.logConfig = Collections.emptyMap(); + } + + private EagerBufferConfigImpl(final long maxRecords, + final long maxBytes, + final Map logConfig) { + this.maxRecords = maxRecords; + this.maxBytes = maxBytes; + this.logConfig = logConfig; } @Override public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) { - return new EagerBufferConfigImpl(recordLimit, maxBytes); + return new EagerBufferConfigImpl(recordLimit, maxBytes, logConfig); } @Override public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) { - return new EagerBufferConfigImpl(maxRecords, byteLimit); + return new EagerBufferConfigImpl(maxRecords, byteLimit, logConfig); } @Override @@ -55,6 +67,26 @@ public BufferFullStrategy bufferFullStrategy() { return BufferFullStrategy.EMIT; } + @Override + public Suppressed.EagerBufferConfig withLoggingDisabled() { + return new EagerBufferConfigImpl(maxRecords, maxBytes, null); + } + + @Override + public Suppressed.EagerBufferConfig withLoggingEnabled(final Map config) { + return new EagerBufferConfigImpl(maxRecords, maxBytes, config); + } + + @Override + public boolean isLoggingEnabled() { + return logConfig != null; + } + + @Override + public Map getLogConfig() { + return isLoggingEnabled() ? logConfig : Collections.emptyMap(); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java index 30427b7a154e8..13ffccdfffb85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -18,6 +18,8 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Collections; +import java.util.Map; import java.util.Objects; import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; @@ -27,6 +29,17 @@ public class StrictBufferConfigImpl extends BufferConfigInternal logConfig; + + public StrictBufferConfigImpl(final long maxRecords, + final long maxBytes, + final BufferFullStrategy bufferFullStrategy, + final Map logConfig) { + this.maxRecords = maxRecords; + this.maxBytes = maxBytes; + this.bufferFullStrategy = bufferFullStrategy; + this.logConfig = logConfig; + } public StrictBufferConfigImpl(final long maxRecords, final long maxBytes, @@ -34,12 +47,14 @@ public StrictBufferConfigImpl(final long maxRecords, this.maxRecords = maxRecords; this.maxBytes = maxBytes; this.bufferFullStrategy = bufferFullStrategy; + this.logConfig = Collections.emptyMap(); } public StrictBufferConfigImpl() { this.maxRecords = Long.MAX_VALUE; this.maxBytes = Long.MAX_VALUE; this.bufferFullStrategy = SHUT_DOWN; + this.logConfig = Collections.emptyMap(); } @Override @@ -67,6 +82,26 @@ public BufferFullStrategy bufferFullStrategy() { return bufferFullStrategy; } + @Override + public Suppressed.StrictBufferConfig withLoggingDisabled() { + return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, null); + } + + @Override + public Suppressed.StrictBufferConfig withLoggingEnabled(final Map config) { + return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, config); + } + + @Override + public boolean isLoggingEnabled() { + return logConfig != null; + } + + @Override + public Map getLogConfig() { + return isLoggingEnabled() ? logConfig : Collections.emptyMap(); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index d24988dcbf204..5206821a6336c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -67,7 +67,8 @@ public String name() { return name; } - BufferConfigInternal bufferConfig() { + @SuppressWarnings("unchecked") + public > BufferConfigInternal bufferConfig() { return bufferConfig; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 4043ceace5afe..d58adbc54ea14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -94,6 +94,7 @@ public static class Builder implements StoreBuilder keySerde; private final Serde valSerde; private boolean loggingEnabled = true; + private Map logConfig = new HashMap<>(); public Builder(final String storeName, final Serde keySerde, final Serde valSerde) { this.storeName = storeName; @@ -127,7 +128,8 @@ public StoreBuilder> withCachingDisabled @Override public StoreBuilder> withLoggingEnabled(final Map config) { - throw new UnsupportedOperationException(); + logConfig = config; + return this; } @Override @@ -143,7 +145,7 @@ public InMemoryTimeOrderedKeyValueBuffer build() { @Override public Map logConfig() { - return Collections.emptyMap(); + return loggingEnabled() ? Collections.unmodifiableMap(logConfig) : Collections.emptyMap(); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 8e6b6acf20a7e..8c3be2f378aea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -49,10 +49,13 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import static java.lang.Long.MAX_VALUE; @@ -69,8 +72,10 @@ import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; @Category(IntegrationTest.class) public class SuppressionIntegrationTest { @@ -313,6 +318,184 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce } } + @Test + public void shouldAllowOverridingChangelogConfig() { + final String testId = "-shouldAllowOverridingChangelogConfig"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + final Map logConfig = Collections.singletonMap("retention.ms", "1000"); + final String changeLog = "suppressionintegrationtest-shouldAllowOverridingChangelogConfig-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog"; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream = builder.stream(input); + + final KTable valueCounts = inputStream + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")"); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L) + .emitEarlyWhenFull() + .withLoggingEnabled(logConfig))) + .toStream() + .to(outputSuppressed); + + valueCounts + .toStream() + .to(outputRaw); + + final Properties streamsConfig = getStreamsConfig(appId); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + final boolean rawRecords = waitForAnyRecord(outputRaw); + final boolean suppressedRecords = waitForAnyRecord(outputSuppressed); + final Properties config = CLUSTER.getLogConfig(changeLog); + + assertThat(config.getProperty("retention.ms"), is(logConfig.get("retention.ms"))); + assertThat(CLUSTER.getAllTopicsInCluster(), hasItem(changeLog)); + assertThat(rawRecords, Matchers.is(true)); + assertThat(suppressedRecords, is(true)); + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + + @Test + public void shouldCreateChangelogByDefault() { + final String testId = "-shouldCreateChangelogByDefault"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + final String changeLog = "suppressionintegrationtest-shouldCreateChangelogByDefault-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog"; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream = builder.stream(input); + + final KTable valueCounts = inputStream + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")"); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L) + .emitEarlyWhenFull())) + .toStream() + .to(outputSuppressed); + + valueCounts + .toStream() + .to(outputRaw); + + final Properties streamsConfig = getStreamsConfig(appId); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + final boolean rawRecords = waitForAnyRecord(outputRaw); + final boolean suppressedRecords = waitForAnyRecord(outputSuppressed); + + assertThat(CLUSTER.getAllTopicsInCluster(), hasItem(changeLog)); + assertThat(rawRecords, Matchers.is(true)); + assertThat(suppressedRecords, is(true)); + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + + @Test + public void shouldAllowDisablingChangelog() { + final String testId = "-shouldAllowDisablingChangelog"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + final String changeLog = "suppressionintegrationtest-shouldAllowDisablingChangelog-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog"; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream = builder.stream(input); + + final KTable valueCounts = inputStream + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")"); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L) + .emitEarlyWhenFull() + .withLoggingDisabled())) + .toStream() + .to(outputSuppressed); + + valueCounts + .toStream() + .to(outputRaw); + + final Properties streamsConfig = getStreamsConfig(appId); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + final boolean rawRecords = waitForAnyRecord(outputRaw); + final boolean suppressedRecords = waitForAnyRecord(outputSuppressed); + final Set suppressChangeLog = CLUSTER.getAllTopicsInCluster() + .stream() + .filter(s -> s.contains("-changelog")) + .filter(s -> s.contains("KTABLE-SUPPRESS")) + .collect(Collectors.toSet()); + + assertThat(suppressChangeLog, is(empty())); + assertThat(rawRecords, Matchers.is(true)); + assertThat(suppressedRecords, is(true)); + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + private static Properties getStreamsConfig(final String appId) { return mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index f30ecedc3aaa1..d3ea631b73190 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration.utils; +import kafka.server.ConfigType; import kafka.server.KafkaConfig$; import kafka.server.KafkaServer; import kafka.utils.MockTime; @@ -339,4 +340,12 @@ private List brokers() { } return servers; } + + public Properties getLogConfig(final String topic) { + return brokers[0].kafkaServer().zkClient().getEntityConfigs(ConfigType.Topic(), topic); + } + + public Set getAllTopicsInCluster() { + return JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava(); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java index 18f689f3cd483..3f0b3477102b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java @@ -16,8 +16,17 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.streams.state.StoreBuilder; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + public class InMemoryTimeOrderedKeyValueBufferTest { @Test @@ -29,4 +38,25 @@ public void bufferShouldAllowCacheEnablement() { public void bufferShouldAllowCacheDisablement() { new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled(); } + + @Test + public void bufferShouldAllowLoggingEnablement() { + final String expect = "3"; + final Map logConfig = new HashMap<>(); + logConfig.put("min.insync.replicas", expect); + final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withLoggingEnabled(logConfig); + + assertThat(builder.logConfig(), is(singletonMap("min.insync.replicas", expect))); + assertThat(builder.loggingEnabled(), is(true)); + } + + @Test + public void bufferShouldAllowLoggingDisablement() { + final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withLoggingDisabled(); + + assertThat(builder.logConfig(), is(emptyMap())); + assertThat(builder.loggingEnabled(), is(false)); + } }