From ab5e4f52ecb072df55c7f5cd8941122a135cdf79 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 3 Apr 2020 19:17:57 -0700 Subject: [PATCH] MINOR: Refactor StreamsProducer (#8380) Reviewers: Boyang Chen , Guozhang Wang , Andrew Choi --- .../apache/kafka/streams/StreamsConfig.java | 2 +- .../internals/ActiveTaskCreator.java | 52 +- .../processor/internals/ClientUtils.java | 21 +- .../processor/internals/StreamThread.java | 30 +- .../processor/internals/StreamsProducer.java | 87 ++- .../kafka/streams/KafkaStreamsTest.java | 5 +- .../internals/ActiveTaskCreatorTest.java | 159 +---- .../internals/RecordCollectorTest.java | 152 +++-- .../processor/internals/StreamThreadTest.java | 4 +- .../internals/StreamsProducerTest.java | 592 +++++++++++++----- .../state/KeyValueStoreTestDriver.java | 36 +- .../StreamThreadStateStoreProviderTest.java | 16 +- .../apache/kafka/test/MockClientSupplier.java | 5 +- .../kafka/streams/TopologyTestDriver.java | 32 +- .../internals/TestDriverProducer.java | 11 +- 15 files changed, 719 insertions(+), 485 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 9c8044389155f..068fcb869de10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1007,7 +1007,7 @@ private Map getCommonConsumerConfigs() { checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); final Map consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); - if (StreamThread.eosBetaEnabled(this)) { + if (StreamThread.processingMode(this) == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA) { consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true); } consumerProps.putAll(getClientCustomProps()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index b81c6c43b3f7c..ad1a3ccae27e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; @@ -45,7 +44,6 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId; -import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.AT_LEAST_ONCE; import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; @@ -61,7 +59,6 @@ class ActiveTaskCreator { private final String threadId; private final Logger log; private final Sensor createTaskSensor; - private final String applicationId; private final StreamsProducer threadProducer; private final Map taskProducers; private final StreamThread.ProcessingMode processingMode; @@ -69,7 +66,6 @@ class ActiveTaskCreator { ActiveTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, - final StreamThread.ProcessingMode processingMode, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, @@ -81,7 +77,6 @@ class ActiveTaskCreator { final Logger log) { this.builder = builder; this.config = config; - this.processingMode = processingMode; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; @@ -92,8 +87,9 @@ class ActiveTaskCreator { this.log = log; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); - applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); transactionalId = applicationId + "-" + processId + "-StreamThread-" + threadId.split("-StreamThread-")[1]; + processingMode = StreamThread.processingMode(config); if (processingMode == EXACTLY_ONCE_ALPHA) { threadProducer = null; @@ -104,31 +100,19 @@ class ActiveTaskCreator { final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); final LogContext logContext = new LogContext(threadIdPrefix); - final String threadProducerClientId = getThreadProducerClientId(threadId); - final Map producerConfigs = config.getProducerConfigs(threadProducerClientId); - - if (processingMode == EXACTLY_ONCE_BETA) { - producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - threadProducer = new StreamsProducer(clientSupplier.getProducer(producerConfigs), EXACTLY_ONCE_BETA, logContext); - } else { - threadProducer = new StreamsProducer(clientSupplier.getProducer(producerConfigs), AT_LEAST_ONCE, logContext); - } + threadProducer = new StreamsProducer( + config, + threadId, + clientSupplier, + null, + processId, + logContext); taskProducers = Collections.emptyMap(); } } public void reInitializeThreadProducer() { - if (processingMode != EXACTLY_ONCE_BETA) { - throw new IllegalStateException("Exactly-once beta is not enabled."); - } - - threadProducer.kafkaProducer().close(); - - final String threadProducerClientId = getThreadProducerClientId(threadId); - final Map producerConfigs = config.getProducerConfigs(threadProducerClientId); - producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - - threadProducer.resetProducer(clientSupplier.getProducer(producerConfigs)); + threadProducer.resetProducer(); } StreamsProducer streamsProducerForTask(final TaskId taskId) { @@ -175,14 +159,14 @@ Collection createTasks(final Consumer consumer, ); final StreamsProducer streamsProducer; - if (processingMode == EXACTLY_ONCE_ALPHA) { - final String taskProducerClientId = getTaskProducerClientId(threadId, taskId); - final Map producerConfigs = config.getProducerConfigs(taskProducerClientId); - producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); + if (processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) { log.info("Creating producer client for task {}", taskId); streamsProducer = new StreamsProducer( - clientSupplier.getProducer(producerConfigs), - EXACTLY_ONCE_ALPHA, + config, + threadId, + clientSupplier, + taskId, + null, logContext); taskProducers.put(taskId, streamsProducer); } else { @@ -221,7 +205,7 @@ Collection createTasks(final Consumer consumer, void closeThreadProducerIfNeeded() { if (threadProducer != null) { try { - threadProducer.kafkaProducer().close(); + threadProducer.close(); } catch (final RuntimeException e) { throw new StreamsException("Thread producer encounter error trying to close.", e); } @@ -232,7 +216,7 @@ void closeAndRemoveTaskProducerIfNeeded(final TaskId id) { final StreamsProducer taskProducer = taskProducers.remove(id); if (taskProducer != null) { try { - taskProducer.kafkaProducer().close(); + taskProducer.close(); } catch (final RuntimeException e) { throw new StreamsException("[" + id + "] task producer encounter error trying to close.", e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java index 21a750d898bad..216753d3fa38c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java @@ -16,15 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.time.Duration; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.OffsetSpec; @@ -36,6 +27,16 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; +import java.time.Duration; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; + public class ClientUtils { // currently admin client is shared among all threads @@ -77,7 +78,7 @@ public static Map adminClientMetrics(final Admin adminClient public static Map producerMetrics(final Collection producers) { final Map result = new LinkedHashMap<>(); for (final StreamsProducer producer : producers) { - final Map producerMetrics = producer.kafkaProducer().metrics(); + final Map producerMetrics = producer.metrics(); if (producerMetrics != null) { result.putAll(producerMetrics); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e6d5be7624c7e..bdd9ae1db92ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -317,19 +317,9 @@ public static StreamThread create(final InternalTopologyBuilder builder, final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); - final ProcessingMode processingMode; - if (StreamThread.eosAlphaEnabled(config)) { - processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; - } else if (StreamThread.eosBetaEnabled(config)) { - processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; - } else { - processingMode = StreamThread.ProcessingMode.AT_LEAST_ONCE; - } - final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( builder, config, - processingMode, streamsMetrics, stateDirectory, changelogReader, @@ -359,7 +349,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, builder, adminClient, stateDirectory, - processingMode + StreamThread.processingMode(config) ); log.info("Creating consumer client"); @@ -413,16 +403,20 @@ public enum ProcessingMode { } } - public static boolean eosAlphaEnabled(final StreamsConfig config) { - return EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); - } - - public static boolean eosBetaEnabled(final StreamsConfig config) { - return EXACTLY_ONCE_BETA.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); + public static ProcessingMode processingMode(final StreamsConfig config) { + if (EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { + return StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; + } else if (EXACTLY_ONCE_BETA.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { + return StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; + } else { + return StreamThread.ProcessingMode.AT_LEAST_ONCE; + } } public static boolean eosEnabled(final StreamsConfig config) { - return eosAlphaEnabled(config) || eosBetaEnabled(config); + final ProcessingMode processingMode = processingMode(config); + return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || + processingMode == ProcessingMode.EXACTLY_ONCE_BETA; } public StreamThread(final Time time, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index b1d765eac7214..5f3c3865080eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -25,22 +25,29 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.Future; -import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId; +import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId; import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; /** @@ -55,20 +62,68 @@ public class StreamsProducer { private final Logger log; private final String logPrefix; - private Producer producer; + private final Map eosBetaProducerConfigs; + private final KafkaClientSupplier clientSupplier; private final StreamThread.ProcessingMode processingMode; + private Producer producer; private boolean transactionInFlight = false; private boolean transactionInitialized = false; - public StreamsProducer(final Producer producer, - final StreamThread.ProcessingMode processingMode, + public StreamsProducer(final StreamsConfig config, + final String threadId, + final KafkaClientSupplier clientSupplier, + final TaskId taskId, + final UUID processId, final LogContext logContext) { - this.producer = Objects.requireNonNull(producer, "producer cannot be null"); - this.processingMode = Objects.requireNonNull(processingMode, "processingMode cannot be null"); - + Objects.requireNonNull(config, "config cannot be null"); + Objects.requireNonNull(threadId, "threadId cannot be null"); + this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier cannot be null"); log = Objects.requireNonNull(logContext, "logContext cannot be null").logger(getClass()); logPrefix = logContext.logPrefix().trim(); + + processingMode = StreamThread.processingMode(config); + + final Map producerConfigs; + switch (processingMode) { + case AT_LEAST_ONCE: { + producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId)); + eosBetaProducerConfigs = null; + + break; + } + case EXACTLY_ONCE_ALPHA: { + producerConfigs = config.getProducerConfigs( + getTaskProducerClientId( + threadId, + Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha") + ) + ); + + final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); + + eosBetaProducerConfigs = null; + + break; + } + case EXACTLY_ONCE_BETA: { + producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId)); + + final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + producerConfigs.put( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + applicationId + "-" + Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta")); + + eosBetaProducerConfigs = producerConfigs; + + break; + } + default: + throw new IllegalArgumentException("Unknown processing mode: " + processingMode); + } + + producer = clientSupplier.getProducer(producerConfigs); } private String formatException(final String message) { @@ -76,7 +131,8 @@ private String formatException(final String message) { } boolean eosEnabled() { - return processingMode == EXACTLY_ONCE_ALPHA || processingMode == EXACTLY_ONCE_BETA; + return processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA || + processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; } /** @@ -113,12 +169,14 @@ void initTransaction() { } } - public void resetProducer(final Producer producer) { + public void resetProducer() { if (processingMode != EXACTLY_ONCE_BETA) { throw new IllegalStateException(formatException("Exactly-once beta is not enabled")); } - this.producer = Objects.requireNonNull(producer, "producer cannot be null"); + producer.close(); + + producer = clientSupplier.getProducer(eosBetaProducerConfigs); transactionInitialized = false; } @@ -233,10 +291,19 @@ List partitionsFor(final String topic) throws TimeoutException { return producer.partitionsFor(topic); } + Map metrics() { + return producer.metrics(); + } + void flush() { producer.flush(); } + void close() { + producer.close(); + } + + // for testing only Producer kafkaProducer() { return producer; } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index c79cad539e13e..bd90ed4a238c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -148,7 +148,7 @@ public void onChange(final KafkaStreams.State newState, public void before() throws Exception { time = new MockTime(); supplier = new MockClientSupplier(); - supplier.setClusterForAdminClient(Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)))); + supplier.setCluster(Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)))); streamsStateListener = new StateListenerStub(); threadStatelistenerCapture = EasyMock.newCapture(); metricsReportersCapture = EasyMock.newCapture(); @@ -212,8 +212,7 @@ private void prepareStreams() throws Exception { )).andReturn(streamThreadOne).andReturn(streamThreadTwo); EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); - EasyMock.expect(StreamThread.eosAlphaEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); - EasyMock.expect(StreamThread.eosBetaEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); + EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes(); EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes(); EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes(); prepareStreamThread(streamThreadOne, true); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 4b9dabe260559..7442b1a84a705 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -16,12 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Metrics; @@ -55,7 +52,6 @@ import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -80,7 +76,6 @@ public class ActiveTaskCreatorTest { ); final UUID uuid = UUID.randomUUID(); - private StreamsConfig config; private ActiveTaskCreator activeTaskCreator; @@ -89,24 +84,6 @@ public class ActiveTaskCreatorTest { // functional test - @Test - public void shouldCreateThreadProducerIfEosDisabled() { - final Map mockProducerConfig = mock(Map.class); - expect(mockProducerConfig.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn(false); - replay(mockProducerConfig); - - config = new StreamsConfig(properties) { - @Override - public Map getProducerConfigs(final String clientId) { - return mockProducerConfig; - } - }; - createTasks(); - - assertThat(mockClientSupplier.producers.size(), is(1)); - verify(mockProducerConfig); - } - @Test public void shouldConstructProducerMetricsWithEosDisabled() { shouldConstructThreadProducerMetric(); @@ -142,18 +119,6 @@ public void shouldNoOpCloseTaskProducerIfEosDisabled() { // error handling - @Test - public void shouldFailOnReInitializeProducerIfEosDisabled() { - createTasks(); - - final IllegalStateException thrown = assertThrows( - IllegalStateException.class, - activeTaskCreator::reInitializeThreadProducer - ); - - assertThat(thrown.getMessage(), is("Exactly-once beta is not enabled.")); - } - @Test public void shouldFailOnStreamsProducerPerTaskIfEosDisabled() { createTasks(); @@ -198,30 +163,6 @@ public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosDisabled() // functional test - @Test - public void shouldCreateProducerPerTaskIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - - final Map mockProducerConfig = mock(Map.class); - expect(mockProducerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-0_0")).andReturn(null); - expect(mockProducerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-0_1")).andReturn(null); - expect(mockProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) - .andReturn("appId-0_0").andReturn("appId-0_1"); - replay(mockProducerConfig); - - config = new StreamsConfig(properties) { - @Override - public Map getProducerConfigs(final String clientId) { - return mockProducerConfig; - } - }; - createTasks(); - - assertThat(mockClientSupplier.producers.size(), is(2)); - verify(mockProducerConfig); - } - @Test public void shouldReturnStreamsProducerPerTaskIfEosAlphaEnabled() { properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); @@ -281,21 +222,6 @@ public void shouldCloseTaskProducersIfEosAlphaEnabled() { // error handling @Test - public void shouldFailOnReInitializeProducerIfEosAlphaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - mockClientSupplier.setApplicationIdForProducer("appId"); - - createTasks(); - - final IllegalStateException thrown = assertThrows( - IllegalStateException.class, - activeTaskCreator::reInitializeThreadProducer - ); - - assertThat(thrown.getMessage(), is("Exactly-once beta is not enabled.")); - } - @Test - public void shouldFailForUnknownTaskOnStreamsProducerPerTaskIfEosAlphaEnabled() { properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); mockClientSupplier.setApplicationIdForProducer("appId"); @@ -360,29 +286,6 @@ public void shouldThrowStreamsExceptionOnErrorCloseTaskProducerIfEosAlphaEnabled // functional test - @Test - public void shouldCreateThreadProducerIfEosBetaEnabled() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); - mockClientSupplier.setApplicationIdForProducer("appId"); - - final String txId = "appId-" + uuid + "-StreamThread-0"; - final Map mockProducerConfig = mock(Map.class); - expect(mockProducerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId)).andReturn(null); - expect(mockProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn(txId); - replay(mockProducerConfig); - - config = new StreamsConfig(properties) { - @Override - public Map getProducerConfigs(final String clientId) { - return mockProducerConfig; - } - }; - createTasks(); - - assertThat(mockClientSupplier.producers.size(), is(1)); - verify(mockProducerConfig); - } - @Test public void shouldReturnThreadProducerIfEosBetaEnabled() { properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); @@ -396,50 +299,6 @@ public void shouldReturnThreadProducerIfEosBetaEnabled() { assertThat(threadProducer.kafkaProducer(), is(mockClientSupplier.producers.get(0))); } - @Test - public void shouldAbortTransactionAndCloseOldProducerOnReInitializeProducer() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); - mockClientSupplier.setApplicationIdForProducer("appId"); - - createTasks(); - activeTaskCreator.threadProducer().initTransaction(); - activeTaskCreator.threadProducer().send(new ProducerRecord<>("topic", 0, 0L, new byte[0], new byte[0], new RecordHeaders()), null); - - activeTaskCreator.reInitializeThreadProducer(); - - assertThat(mockClientSupplier.producers.get(0).closed(), is(true)); - } - - @Test - public void shouldCreateNewProducerOnReInitializeProducer() { - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); - mockClientSupplier.setApplicationIdForProducer("appId"); - - final String txId = "appId-" + uuid + "-StreamThread-0"; - final Map mockProducerConfig = mock(Map.class); - expect(mockProducerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId)).andReturn(null); - expect(mockProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn(txId); - replay(mockProducerConfig); - - config = new StreamsConfig(properties) { - @Override - public Map getProducerConfigs(final String clientId) { - return mockProducerConfig; - } - }; - createTasks(); - - reset(mockProducerConfig); - expect(mockProducerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId)).andReturn(null); - expect(mockProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn(txId); - replay(mockProducerConfig); - - activeTaskCreator.reInitializeThreadProducer(); - - assertThat(mockClientSupplier.producers.size(), is(2)); - verify(mockProducerConfig); - } - @Test public void shouldConstructProducerMetricsWithEosBetaEnabled() { properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); @@ -593,25 +452,9 @@ private void createTasks() { expect(topology.globalStateStores()).andReturn(Collections.emptyList()).anyTimes(); replay(builder, stateDirectory, topology, sourceNode); - final StreamThread.ProcessingMode processingMode; - final String eosConfig = (String) properties.get(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); - if (eosConfig == null || StreamsConfig.AT_LEAST_ONCE.equals(eosConfig)) { - processingMode = StreamThread.ProcessingMode.AT_LEAST_ONCE; - } else if (StreamsConfig.EXACTLY_ONCE.equals(eosConfig)) { - processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; - } else if (StreamsConfig.EXACTLY_ONCE_BETA.equals(eosConfig)) { - processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; - } else { - throw new IllegalArgumentException("argument `" + eosConfig + "` for config `processing.guarantees` invalid."); - } - if (config == null) { - config = new StreamsConfig(properties); - } - activeTaskCreator = new ActiveTaskCreator( builder, - config, - processingMode, + new StreamsConfig(properties), streamsMetrics, stateDirectory, changeLogReader, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 369825b935405..47681afc93919 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; @@ -41,6 +42,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler; @@ -50,6 +52,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.test.MockClientSupplier; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,8 +64,8 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.AT_LEAST_ONCE; -import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; @@ -82,6 +85,15 @@ public class RecordCollectorTest { private final TaskId taskId = new TaskId(0, 0); private final ProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler(); private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics()); + private final StreamsConfig config = new StreamsConfig(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234") + )); + private final StreamsConfig eosConfig = new StreamsConfig(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE) + )); private final String topic = "topic"; private final Cluster cluster = new Cluster( @@ -101,14 +113,24 @@ public class RecordCollectorTest { private final StreamPartitioner streamPartitioner = (topic, key, value, numPartitions) -> Integer.parseInt(key) % numPartitions; - private final MockProducer mockProducer = new MockProducer<>( - cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer); - private final StreamsProducer streamsProducer = new StreamsProducer(mockProducer, AT_LEAST_ONCE, logContext); + private MockProducer mockProducer; + private StreamsProducer streamsProducer; private RecordCollectorImpl collector; @Before public void setup() { + final MockClientSupplier clientSupplier = new MockClientSupplier(); + clientSupplier.setCluster(cluster); + streamsProducer = new StreamsProducer( + config, + "threadId", + clientSupplier, + null, + null, + logContext + ); + mockProducer = clientSupplier.producers.get(0); collector = new RecordCollectorImpl( logContext, taskId, @@ -414,14 +436,22 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn logContext, taskId, new StreamsProducer( - new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + eosConfig, + "threadId", + new MockClientSupplier() { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, exception); - return null; + public Producer getProducer(final Map config) { + return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, exception); + return null; + } + }; } }, - EXACTLY_ONCE_ALPHA, + taskId, + null, logContext ), productionExceptionHandler, @@ -454,14 +484,22 @@ public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultEx logContext, taskId, new StreamsProducer( - new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + config, + "threadId", + new MockClientSupplier() { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, exception); - return null; + public Producer getProducer(final Map config) { + return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, exception); + return null; + } + }; } }, - AT_LEAST_ONCE, + null, + null, logContext ), productionExceptionHandler, @@ -493,14 +531,22 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin logContext, taskId, new StreamsProducer( - new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + config, + "threadId", + new MockClientSupplier() { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new Exception()); - return null; + public Producer getProducer(final Map config) { + return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new Exception()); + return null; + } + }; } }, - AT_LEAST_ONCE, + null, + null, logContext ), new AlwaysContinueProductionExceptionHandler(), @@ -514,7 +560,7 @@ public synchronized Future send(final ProducerRecord messages = logCaptureAppender.getMessages(); @@ -533,14 +579,22 @@ public void shouldThrowStreamsExceptionOnSubsequentCallIfFatalEvenWithContinueEx logContext, taskId, new StreamsProducer( - new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + config, + "threadId", + new MockClientSupplier() { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, exception); - return null; + public Producer getProducer(final Map config) { + return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, exception); + return null; + } + }; } }, - AT_LEAST_ONCE, + null, + null, logContext ), new AlwaysContinueProductionExceptionHandler(), @@ -572,13 +626,21 @@ public void shouldNotAbortTxnOnEOSCloseIfNothingSent() { logContext, taskId, new StreamsProducer( - new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + eosConfig, + "threadId", + new MockClientSupplier() { @Override - public void abortTransaction() { - functionCalled.set(true); + public Producer getProducer(final Map config) { + return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public void abortTransaction() { + functionCalled.set(true); + } + }; } }, - EXACTLY_ONCE_ALPHA, + taskId, + null, logContext ), productionExceptionHandler, @@ -595,13 +657,21 @@ public void shouldThrowIfTopicIsUnknownOnSendWithPartitioner() { logContext, taskId, new StreamsProducer( - new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + config, + "threadId", + new MockClientSupplier() { @Override - public List partitionsFor(final String topic) { - return Collections.emptyList(); + public Producer getProducer(final Map config) { + return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public List partitionsFor(final String topic) { + return Collections.emptyList(); + } + }; } }, - AT_LEAST_ONCE, + null, + null, logContext ), productionExceptionHandler, @@ -621,7 +691,19 @@ public void shouldNotCloseInternalProducerForEOS() { final RecordCollector collector = new RecordCollectorImpl( logContext, taskId, - new StreamsProducer(mockProducer, EXACTLY_ONCE_ALPHA, logContext), + new StreamsProducer( + eosConfig, + "threadId", + new MockClientSupplier() { + @Override + public Producer getProducer(final Map config) { + return mockProducer; + } + }, + taskId, + null, + logContext + ), productionExceptionHandler, streamsMetrics ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 69d2855d87640..f890a420f95a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -188,7 +188,7 @@ private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") clientSupplier.setApplicationIdForProducer(APPLICATION_ID); } - clientSupplier.setClusterForAdminClient(createCluster()); + clientSupplier.setCluster(createCluster()); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, @@ -1174,7 +1174,7 @@ public void shouldNotCloseTaskProducerWhenSuspending() { public void shouldReturnActiveTaskMetadataWhileRunningState() { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); - clientSupplier.setClusterForAdminClient(createCluster()); + clientSupplier.setCluster(createCluster()); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index 09eb038bfaba0..12779d4ea1ddf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -20,8 +20,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -31,31 +31,36 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.test.MockClientSupplier; import org.junit.Before; import org.junit.Test; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.AT_LEAST_ONCE; -import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; -import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; public class StreamsProducerTest { @@ -69,32 +74,109 @@ public class StreamsProducerTest { Collections.emptySet() ); - private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); - private final Map offsetsAndMetadata = mkMap( - mkEntry(new TopicPartition(topic, 0), new OffsetAndMetadata(0L, null)) + private final StreamsConfig nonEosConfig = new StreamsConfig(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")) + ); + + private final StreamsConfig eosAlphaConfig = new StreamsConfig(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)) + ); + + private final StreamsConfig eosBetaConfig = new StreamsConfig(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA)) + ); + + final Producer mockedProducer = mock(Producer.class); + final KafkaClientSupplier clientSupplier = new MockClientSupplier() { + @Override + public Producer getProducer(final Map config) { + return mockedProducer; + } + }; + final StreamsProducer streamsProducerWithMock = new StreamsProducer( + nonEosConfig, + "threadId", + clientSupplier, + null, + null, + logContext + ); + final StreamsProducer eosAlphaStreamsProducerWithMock = new StreamsProducer( + eosAlphaConfig, + "threadId", + clientSupplier, + new TaskId(0, 0), + null, + logContext ); - private final MockProducer nonEosMockProducer = new MockProducer<>( - cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer); - private final StreamsProducer nonEosStreamsProducer = - new StreamsProducer(nonEosMockProducer, AT_LEAST_ONCE, logContext); + private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + private StreamsProducer nonEosStreamsProducer; + private MockProducer nonEosMockProducer; - private final MockProducer eosAlphaMockProducer = new MockProducer<>( - cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer); - private final StreamsProducer eosAlphaStreamsProducer = - new StreamsProducer(eosAlphaMockProducer, EXACTLY_ONCE_ALPHA, logContext); + private final MockClientSupplier eosAlphaMockClientSupplier = new MockClientSupplier(); + private StreamsProducer eosAlphaStreamsProducer; + private MockProducer eosAlphaMockProducer; - private final MockProducer eosBetaMockProducer = new MockProducer<>( - cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer); - private final StreamsProducer eosBetaStreamsProducer = - new StreamsProducer(eosBetaMockProducer, EXACTLY_ONCE_BETA, logContext); + private final MockClientSupplier eosBetaMockClientSupplier = new MockClientSupplier(); + private StreamsProducer eosBetaStreamsProducer; + private MockProducer eosBetaMockProducer; private final ProducerRecord record = new ProducerRecord<>(topic, 0, 0L, new byte[0], new byte[0], new RecordHeaders()); + private final Map offsetsAndMetadata = mkMap( + mkEntry(new TopicPartition(topic, 0), new OffsetAndMetadata(0L, null)) + ); + + + @Before public void before() { + mockClientSupplier.setCluster(cluster); + nonEosStreamsProducer = + new StreamsProducer( + nonEosConfig, + "threadId", + mockClientSupplier, + null, + null, + logContext + ); + nonEosMockProducer = mockClientSupplier.producers.get(0); + + eosAlphaMockClientSupplier.setCluster(cluster); + eosAlphaMockClientSupplier.setApplicationIdForProducer("appId"); + eosAlphaStreamsProducer = + new StreamsProducer( + eosAlphaConfig, + "threadId", + eosAlphaMockClientSupplier, + new TaskId(0, 0), + null, + logContext + ); eosAlphaStreamsProducer.initTransaction(); + eosAlphaMockProducer = eosAlphaMockClientSupplier.producers.get(0); + + eosBetaMockClientSupplier.setCluster(cluster); + eosBetaMockClientSupplier.setApplicationIdForProducer("appId"); + eosBetaStreamsProducer = + new StreamsProducer( + eosBetaConfig, + "threadId", + eosBetaMockClientSupplier, + null, + UUID.randomUUID(), + logContext + ); + eosBetaStreamsProducer.initTransaction(); + eosBetaMockProducer = eosBetaMockClientSupplier.producers.get(0); } @@ -104,105 +186,118 @@ public void before() { // functional tests @Test - public void shouldForwardCallToPartitionsFor() { - final Producer producer = mock(Producer.class); + public void shouldCreateProducer() { + assertThat(mockClientSupplier.producers.size(), is(1)); + assertThat(eosAlphaMockClientSupplier.producers.size(), is(1)); + } + @Test + public void shouldForwardCallToPartitionsFor() { final List expectedPartitionInfo = Collections.emptyList(); - expect(producer.partitionsFor("topic")).andReturn(expectedPartitionInfo); - replay(producer); + expect(mockedProducer.partitionsFor("topic")).andReturn(expectedPartitionInfo); + replay(mockedProducer); - final StreamsProducer streamsProducer = - new StreamsProducer(producer, AT_LEAST_ONCE, logContext); - - final List partitionInfo = streamsProducer.partitionsFor(topic); + final List partitionInfo = streamsProducerWithMock.partitionsFor(topic); assertThat(partitionInfo, sameInstance(expectedPartitionInfo)); - verify(producer); + verify(mockedProducer); } @Test public void shouldForwardCallToFlush() { - final Producer producer = mock(Producer.class); - - producer.flush(); + mockedProducer.flush(); expectLastCall(); - replay(producer); - - final StreamsProducer streamsProducer = - new StreamsProducer(producer, AT_LEAST_ONCE, logContext); + replay(mockedProducer); - streamsProducer.flush(); + streamsProducerWithMock.flush(); - verify(producer); + verify(mockedProducer); } - // error handling tests - + @SuppressWarnings({"unchecked", "rawtypes"}) @Test - public void shouldFailIfProducerIsNullForAtLeastOnce() { - final NullPointerException thrown = assertThrows( - NullPointerException.class, - () -> new StreamsProducer(null, AT_LEAST_ONCE, logContext) - ); - - assertThat(thrown.getMessage(), is("producer cannot be null")); - } + public void shouldForwardCallToMetrics() { + final Map metrics = new HashMap<>(); + expect(mockedProducer.metrics()).andReturn(metrics); + replay(mockedProducer); - @Test - public void shouldFailIfProcessingModeIsNull() { - final NullPointerException thrown = assertThrows( - NullPointerException.class, - () -> new StreamsProducer(nonEosMockProducer, null, logContext) - ); + assertSame(metrics, streamsProducerWithMock.metrics()); - assertThat(thrown.getMessage(), is("processingMode cannot be null")); + verify(mockedProducer); } @Test - public void shouldFailIfProducerIsNullForExactlyOnceAlpha() { - final NullPointerException thrown = assertThrows( - NullPointerException.class, - () -> new StreamsProducer(null, EXACTLY_ONCE_ALPHA, logContext) - ); + public void shouldForwardCallToClose() { + mockedProducer.close(); + expectLastCall(); + replay(mockedProducer); - assertThat(thrown.getMessage(), is("producer cannot be null")); + streamsProducerWithMock.close(); + + verify(mockedProducer); } + // error handling tests + @Test - public void shouldFailIfProducerIsNullForExactlyOnceBeta() { + public void shouldFailIfStreamsConfigIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, - () -> new StreamsProducer(null, EXACTLY_ONCE_BETA, logContext) + () -> new StreamsProducer( + null, + "threadId", + mockClientSupplier, + new TaskId(0, 0), + UUID.randomUUID(), + logContext) ); - assertThat(thrown.getMessage(), is("producer cannot be null")); + assertThat(thrown.getMessage(), is("config cannot be null")); } @Test - public void shouldFailIfLogContextIsNullForAtLeastOnce() { + public void shouldFailIfThreadIdIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, - () -> new StreamsProducer(nonEosMockProducer, AT_LEAST_ONCE, null) + () -> new StreamsProducer( + nonEosConfig, + null, + mockClientSupplier, + new TaskId(0, 0), + UUID.randomUUID(), + logContext) ); - assertThat(thrown.getMessage(), is("logContext cannot be null")); + assertThat(thrown.getMessage(), is("threadId cannot be null")); } @Test - public void shouldFailIfLogContextIsNullForExactlyOnceAlpha() { + public void shouldFailIfClientSupplierIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, - () -> new StreamsProducer(nonEosMockProducer, EXACTLY_ONCE_ALPHA, null) + () -> new StreamsProducer( + nonEosConfig, + "threadId", + null, + new TaskId(0, 0), + UUID.randomUUID(), + logContext) ); - assertThat(thrown.getMessage(), is("logContext cannot be null")); + assertThat(thrown.getMessage(), is("clientSupplier cannot be null")); } @Test - public void shouldFailIfLogContextIsNullForExactlyOnceBeta() { + public void shouldFailIfLogContextIsNull() { final NullPointerException thrown = assertThrows( NullPointerException.class, - () -> new StreamsProducer(nonEosMockProducer, EXACTLY_ONCE_BETA, null) + () -> new StreamsProducer( + nonEosConfig, + "threadId", + mockClientSupplier, + new TaskId(0, 0), + UUID.randomUUID(), + null) ); assertThat(thrown.getMessage(), is("logContext cannot be null")); @@ -212,7 +307,7 @@ public void shouldFailIfLogContextIsNullForExactlyOnceBeta() { public void shouldFailOnResetProducerForAtLeastOnce() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, - () -> nonEosStreamsProducer.resetProducer(nonEosMockProducer) + () -> nonEosStreamsProducer.resetProducer() ); assertThat(thrown.getMessage(), is("Exactly-once beta is not enabled [test]")); @@ -222,7 +317,7 @@ public void shouldFailOnResetProducerForAtLeastOnce() { public void shouldFailOnResetProducerForExactlyOnceAlpha() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, - () -> eosAlphaStreamsProducer.resetProducer(eosAlphaMockProducer) + () -> eosAlphaStreamsProducer.resetProducer() ); assertThat(thrown.getMessage(), is("Exactly-once beta is not enabled [test]")); @@ -233,6 +328,28 @@ public void shouldFailOnResetProducerForExactlyOnceAlpha() { // functional tests + @Test + public void shouldNotSetTransactionIdIfEosDisable() { + final StreamsConfig mockConfig = mock(StreamsConfig.class); + expect(mockConfig.getProducerConfigs("threadId-producer")).andReturn(mock(Map.class)); + expect(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).andReturn(StreamsConfig.AT_LEAST_ONCE).anyTimes(); + replay(mockConfig); + + new StreamsProducer( + mockConfig, + "threadId", + mockClientSupplier, + null, + null, + logContext + ); + } + + @Test + public void shouldNotHaveEosEnabledIfEosDisabled() { + assertThat(nonEosStreamsProducer.eosEnabled(), is(false)); + } + @Test public void shouldNotInitTxIfEosDisable() { assertThat(nonEosMockProducer.transactionInitialized(), is(false)); @@ -323,6 +440,68 @@ public void shouldEnableEosIfEosBetaEnabled() { assertThat(eosBetaStreamsProducer.eosEnabled(), is(true)); } + @Test + public void shouldSetTransactionIdUsingTaskIdIfEosAlphaEnabled() { + final Map mockMap = mock(Map.class); + expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-0_0")).andReturn(null); + expect(mockMap.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn("appId-0_0"); + + final StreamsConfig mockConfig = mock(StreamsConfig.class); + expect(mockConfig.getProducerConfigs("threadId-0_0-producer")).andReturn(mockMap); + expect(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("appId"); + expect(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).andReturn(StreamsConfig.EXACTLY_ONCE); + + replay(mockMap, mockConfig); + + new StreamsProducer( + mockConfig, + "threadId", + eosAlphaMockClientSupplier, + new TaskId(0, 0), + null, + logContext + ); + + verify(mockMap); + } + + @Test + public void shouldSetTransactionIdUsingProcessIdIfEosBetaEnable() { + final UUID processId = UUID.randomUUID(); + + final Map mockMap = mock(Map.class); + expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + processId)).andReturn(null); + expect(mockMap.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn("appId-" + processId); + + final StreamsConfig mockConfig = mock(StreamsConfig.class); + expect(mockConfig.getProducerConfigs("threadId-producer")).andReturn(mockMap); + expect(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("appId"); + expect(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).andReturn(StreamsConfig.EXACTLY_ONCE_BETA).anyTimes(); + + replay(mockMap, mockConfig); + + new StreamsProducer( + mockConfig, + "threadId", + eosAlphaMockClientSupplier, + null, + processId, + logContext + ); + + verify(mockMap); + } + + @Test + public void shouldNotHaveEosEnabledIfEosAlphaEnable() { + assertThat(eosAlphaStreamsProducer.eosEnabled(), is(true)); + } + + @Test + public void shouldHaveEosEnabledIfEosBetaEnabled() { + assertThat(eosBetaStreamsProducer.eosEnabled(), is(true)); + } + @Test public void shouldInitTxOnEos() { assertThat(eosAlphaMockProducer.transactionInitialized(), is(true)); @@ -353,22 +532,18 @@ public void shouldForwardRecordButNotCommitOnEosSend() { @Test public void shouldBeginTxOnEosCommit() { - final Producer producer = mock(Producer.class); - - producer.initTransactions(); - producer.beginTransaction(); - producer.sendOffsetsToTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId")); - producer.commitTransaction(); + mockedProducer.initTransactions(); + mockedProducer.beginTransaction(); + mockedProducer.sendOffsetsToTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId")); + mockedProducer.commitTransaction(); expectLastCall(); - replay(producer); + replay(mockedProducer); - final StreamsProducer streamsProducer = - new StreamsProducer(producer, EXACTLY_ONCE_ALPHA, logContext); - streamsProducer.initTransaction(); + eosAlphaStreamsProducerWithMock.initTransaction(); - streamsProducer.commitTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId")); + eosAlphaStreamsProducerWithMock.commitTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId")); - verify(producer); + verify(mockedProducer); } @Test @@ -395,54 +570,54 @@ public void shouldCommitTxOnEosCommit() { @Test public void shouldCommitTxWithApplicationIdOnEosAlphaCommit() { - final Producer producer = mock(Producer.class); - - producer.initTransactions(); + mockedProducer.initTransactions(); expectLastCall(); - producer.beginTransaction(); + mockedProducer.beginTransaction(); expectLastCall(); - expect(producer.send(record, null)).andReturn(null); - producer.sendOffsetsToTransaction(null, new ConsumerGroupMetadata("appId")); + expect(mockedProducer.send(record, null)).andReturn(null); + mockedProducer.sendOffsetsToTransaction(null, new ConsumerGroupMetadata("appId")); expectLastCall(); - producer.commitTransaction(); + mockedProducer.commitTransaction(); expectLastCall(); - replay(producer); + replay(mockedProducer); - final StreamsProducer streamsProducer = - new StreamsProducer(producer, EXACTLY_ONCE_ALPHA, logContext); - streamsProducer.initTransaction(); + eosAlphaStreamsProducerWithMock.initTransaction(); // call `send()` to start a transaction - streamsProducer.send(record, null); + eosAlphaStreamsProducerWithMock.send(record, null); - streamsProducer.commitTransaction(null, new ConsumerGroupMetadata("appId")); + eosAlphaStreamsProducerWithMock.commitTransaction(null, new ConsumerGroupMetadata("appId")); - verify(producer); + verify(mockedProducer); } @Test public void shouldCommitTxWithConsumerGroupMetadataOnEosBetaCommit() { - final Producer producer = mock(Producer.class); - - producer.initTransactions(); + mockedProducer.initTransactions(); expectLastCall(); - producer.beginTransaction(); + mockedProducer.beginTransaction(); expectLastCall(); - expect(producer.send(record, null)).andReturn(null); - producer.sendOffsetsToTransaction(null, new ConsumerGroupMetadata("appId")); + expect(mockedProducer.send(record, null)).andReturn(null); + mockedProducer.sendOffsetsToTransaction(null, new ConsumerGroupMetadata("appId")); expectLastCall(); - producer.commitTransaction(); + mockedProducer.commitTransaction(); expectLastCall(); - replay(producer); - - final StreamsProducer streamsProducer = - new StreamsProducer(producer, EXACTLY_ONCE_ALPHA, logContext); + replay(mockedProducer); + + final StreamsProducer streamsProducer = new StreamsProducer( + eosBetaConfig, + "threadId", + clientSupplier, + null, + UUID.randomUUID(), + logContext + ); streamsProducer.initTransaction(); // call `send()` to start a transaction streamsProducer.send(record, null); streamsProducer.commitTransaction(null, new ConsumerGroupMetadata("appId")); - verify(producer); + verify(mockedProducer); } @Test @@ -464,29 +639,70 @@ public void shouldAbortTxOnEosAbort() { @Test public void shouldSkipAbortTxOnEosAbortIfNotTxInFlight() { - final Producer producer = mock(Producer.class); - - producer.initTransactions(); + mockedProducer.initTransactions(); expectLastCall(); - replay(producer); + replay(mockedProducer); - final StreamsProducer streamsProducer = - new StreamsProducer(producer, EXACTLY_ONCE_ALPHA, logContext); - streamsProducer.initTransaction(); + eosAlphaStreamsProducerWithMock.initTransaction(); - streamsProducer.abortTransaction(); + eosAlphaStreamsProducerWithMock.abortTransaction(); - verify(producer); + verify(mockedProducer); } // error handling tests + @Test + public void shouldFailIfTaskIdIsNullForEosAlpha() { + final NullPointerException thrown = assertThrows( + NullPointerException.class, + () -> new StreamsProducer( + eosAlphaConfig, + "threadId", + mockClientSupplier, + null, + UUID.randomUUID(), + logContext) + ); + + assertThat(thrown.getMessage(), is("taskId cannot be null for exactly-once alpha")); + } + + @Test + public void shouldFailIfProcessIdNullForEosBeta() { + final NullPointerException thrown = assertThrows( + NullPointerException.class, + () -> new StreamsProducer( + eosBetaConfig, + "threadId", + mockClientSupplier, + new TaskId(0, 0), + null, + logContext) + ); + + assertThat(thrown.getMessage(), is("processId cannot be null for exactly-once beta")); + } + @Test public void shouldThrowTimeoutExceptionOnEosInitTxTimeout() { - // use `mockProducer` instead of `eosMockProducer` to avoid double Tx-Init + // use `nonEosMockProducer` instead of `eosMockProducer` to avoid double Tx-Init nonEosMockProducer.initTransactionException = new TimeoutException("KABOOM!"); - final StreamsProducer streamsProducer = - new StreamsProducer(nonEosMockProducer, EXACTLY_ONCE_ALPHA, logContext); + final KafkaClientSupplier clientSupplier = new MockClientSupplier() { + @Override + public Producer getProducer(final Map config) { + return nonEosMockProducer; + } + }; + + final StreamsProducer streamsProducer = new StreamsProducer( + eosAlphaConfig, + "threadId", + clientSupplier, + new TaskId(0, 0), + null, + logContext + ); final TimeoutException thrown = assertThrows( TimeoutException.class, @@ -499,7 +715,14 @@ public void shouldThrowTimeoutExceptionOnEosInitTxTimeout() { @Test public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceAlpha() { final StreamsProducer streamsProducer = - new StreamsProducer(nonEosMockProducer, EXACTLY_ONCE_ALPHA, logContext); + new StreamsProducer( + eosAlphaConfig, + "threadId", + eosAlphaMockClientSupplier, + new TaskId(0, 0), + null, + logContext + ); final IllegalStateException thrown = assertThrows( IllegalStateException.class, @@ -512,7 +735,14 @@ public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExac @Test public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceBeta() { final StreamsProducer streamsProducer = - new StreamsProducer(nonEosMockProducer, EXACTLY_ONCE_BETA, logContext); + new StreamsProducer( + eosBetaConfig, + "threadId", + eosBetaMockClientSupplier, + null, + UUID.randomUUID(), + logContext + ); final IllegalStateException thrown = assertThrows( IllegalStateException.class, @@ -524,10 +754,23 @@ public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExac @Test public void shouldThrowStreamsExceptionOnEosInitError() { - // use `mockProducer` instead of `eosMockProducer` to avoid double Tx-Init + // use `nonEosMockProducer` instead of `eosMockProducer` to avoid double Tx-Init nonEosMockProducer.initTransactionException = new KafkaException("KABOOM!"); - final StreamsProducer streamsProducer = - new StreamsProducer(nonEosMockProducer, EXACTLY_ONCE_ALPHA, logContext); + final KafkaClientSupplier clientSupplier = new MockClientSupplier() { + @Override + public Producer getProducer(final Map config) { + return nonEosMockProducer; + } + }; + + final StreamsProducer streamsProducer = new StreamsProducer( + eosAlphaConfig, + "threadId", + clientSupplier, + new TaskId(0, 0), + null, + logContext + ); final StreamsException thrown = assertThrows( StreamsException.class, @@ -540,10 +783,23 @@ public void shouldThrowStreamsExceptionOnEosInitError() { @Test public void shouldFailOnEosInitFatal() { - // use `mockProducer` instead of `eosMockProducer` to avoid double Tx-Init + // use `nonEosMockProducer` instead of `eosMockProducer` to avoid double Tx-Init nonEosMockProducer.initTransactionException = new RuntimeException("KABOOM!"); - final StreamsProducer streamsProducer = - new StreamsProducer(nonEosMockProducer, EXACTLY_ONCE_ALPHA, logContext); + final KafkaClientSupplier clientSupplier = new MockClientSupplier() { + @Override + public Producer getProducer(final Map config) { + return nonEosMockProducer; + } + }; + + final StreamsProducer streamsProducer = new StreamsProducer( + eosAlphaConfig, + "threadId", + clientSupplier, + new TaskId(0, 0), + null, + logContext + ); final RuntimeException thrown = assertThrows( RuntimeException.class, @@ -754,24 +1010,20 @@ public void shouldFailOnEosCommitTxFatal() { @Test public void shouldSwallowExceptionOnEosAbortTxFenced() { - final Producer producer = mock(Producer.class); - - producer.initTransactions(); - producer.beginTransaction(); - expect(producer.send(record, null)).andReturn(null); - producer.abortTransaction(); + mockedProducer.initTransactions(); + mockedProducer.beginTransaction(); + expect(mockedProducer.send(record, null)).andReturn(null); + mockedProducer.abortTransaction(); expectLastCall().andThrow(new ProducerFencedException("KABOOM!")); - replay(producer); + replay(mockedProducer); - final StreamsProducer streamsProducer = - new StreamsProducer(producer, EXACTLY_ONCE_ALPHA, logContext); - streamsProducer.initTransaction(); + eosAlphaStreamsProducerWithMock.initTransaction(); // call `send()` to start a transaction - streamsProducer.send(record, null); + eosAlphaStreamsProducerWithMock.send(record, null); - streamsProducer.abortTransaction(); + eosAlphaStreamsProducerWithMock.abortTransaction(); - verify(producer); + verify(mockedProducer); } @Test @@ -806,40 +1058,42 @@ public void shouldFailOnEosAbortTxFatal() { // functional tests @Test - public void shouldSetSetNewProducerOnResetProducer() { - final Producer producer = mock(Producer.class); - - eosBetaStreamsProducer.resetProducer(producer); + public void shouldCloseExistingProducerOnResetProducer() { + eosBetaStreamsProducer.resetProducer(); - assertThat(eosBetaStreamsProducer.kafkaProducer(), is(producer)); + assertTrue(eosBetaMockProducer.closed()); } @Test - public void shouldResetTransactionInitializedOnResetProducer() { - final Producer producer = mock(Producer.class); + public void shouldSetNewProducerOnResetProducer() { + eosBetaStreamsProducer.resetProducer(); - eosBetaStreamsProducer.initTransaction(); - eosBetaStreamsProducer.resetProducer(producer); - - producer.initTransactions(); - expectLastCall(); - replay(producer); - - eosBetaStreamsProducer.initTransaction(); - - verify(producer); + assertThat(eosBetaMockClientSupplier.producers.size(), is(2)); + assertThat(eosBetaStreamsProducer.kafkaProducer(), is(eosBetaMockClientSupplier.producers.get(1))); } - // error handling tests - @Test - public void shouldFailIfProducerIsNullOnReInitializeProducer() { - final NullPointerException thrown = assertThrows( - NullPointerException.class, - () -> eosBetaStreamsProducer.resetProducer(null) + public void shouldResetTransactionInitializedOnResetProducer() { + final StreamsProducer streamsProducer = new StreamsProducer( + eosBetaConfig, + "threadId", + clientSupplier, + null, + UUID.randomUUID(), + logContext ); + streamsProducer.initTransaction(); + + reset(mockedProducer); + mockedProducer.close(); + mockedProducer.initTransactions(); + expectLastCall(); + replay(mockedProducer); + + streamsProducer.resetProducer(); + streamsProducer.initTransaction(); - assertThat(thrown.getMessage(), is("producer cannot be null")); + verify(mockedProducer); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 4d18718caf14b..67630b9a0da59 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -16,14 +16,8 @@ */ package org.apache.kafka.streams.state; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.producer.MockProducer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @@ -39,12 +33,12 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; -import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsProducer; import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; @@ -193,15 +187,26 @@ public static KeyValueStoreTestDriver create(final Serializer ke private final StateSerdes stateSerdes; private KeyValueStoreTestDriver(final StateSerdes serdes) { - final ByteArraySerializer rawSerializer = new ByteArraySerializer(); - final Producer producer = new MockProducer<>(true, rawSerializer, rawSerializer); - final Consumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class); + props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); final LogContext logContext = new LogContext("KeyValueStoreTestDriver "); final RecordCollector recordCollector = new RecordCollectorImpl( logContext, new TaskId(0, 0), - new StreamsProducer(producer, StreamThread.ProcessingMode.AT_LEAST_ONCE, logContext), + new StreamsProducer( + new StreamsConfig(props), + "threadId", + new MockClientSupplier(), + null, + null, + logContext), new DefaultProductionExceptionHandler(), new MockStreamsMetrics(new Metrics()) ) { @@ -240,15 +245,6 @@ public void send(final String topic, stateDir.mkdirs(); stateSerdes = serdes; - props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); - props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class); - props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); - context = new InternalMockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 203c8c1cb195e..da8f434c21d52 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -68,6 +68,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.hamcrest.MatcherAssert.assertThat; @@ -370,20 +371,15 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, clientSupplier.restoreConsumer, new MockStateRestoreListener()), topology.storeToChangelogTopic(), partitions); - final StreamThread.ProcessingMode processingMode; - if (StreamThread.eosAlphaEnabled(streamsConfig)) { - processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; - } else if (StreamThread.eosBetaEnabled(streamsConfig)) { - processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_BETA; - } else { - processingMode = StreamThread.ProcessingMode.AT_LEAST_ONCE; - } final RecordCollector recordCollector = new RecordCollectorImpl( logContext, taskId, new StreamsProducer( - clientSupplier.getProducer(new HashMap<>()), - processingMode, + streamsConfig, + "threadId", + clientSupplier, + new TaskId(0, 0), + UUID.randomUUID(), logContext ), streamsConfig.defaultProductionExceptionHandler(), diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index bf956c018c175..641f6e528e2b0 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; @@ -52,7 +53,7 @@ public void setApplicationIdForProducer(final String applicationId) { this.applicationId = applicationId; } - public void setClusterForAdminClient(final Cluster cluster) { + public void setCluster(final Cluster cluster) { this.cluster = cluster; } @@ -68,7 +69,7 @@ public Producer getProducer(final Map config) { } else { assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } - final MockProducer producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); + final MockProducer producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); producers.add(producer); return producer; } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 8959f98fa856f..bfcfb87e1cb4c 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -299,13 +299,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, logContext = new LogContext("topology-test-driver "); mockWallClockTime = new MockTime(initialWallClockTimeMs); - if (StreamThread.eosAlphaEnabled(streamsConfig)) { - processingMode = EXACTLY_ONCE_ALPHA; - } else if (StreamThread.eosBetaEnabled(streamsConfig)) { - processingMode = EXACTLY_ONCE_BETA; - } else { - processingMode = AT_LEAST_ONCE; - } + processingMode = StreamThread.processingMode(streamsConfig); final StreamsMetricsImpl streamsMetrics = setupMetrics(streamsConfig); setupTopology(builder, streamsConfig); @@ -325,8 +319,28 @@ public List partitionsFor(final String topic) { } }; testDriverProducer = new TestDriverProducer( - producer, - processingMode, + streamsConfig, + new KafkaClientSupplier() { + @Override + public Producer getProducer(final Map config) { + return producer; + } + + @Override + public Consumer getConsumer(final Map config) { + throw new IllegalStateException(); + } + + @Override + public Consumer getRestoreConsumer(final Map config) { + throw new IllegalStateException(); + } + + @Override + public Consumer getGlobalConsumer(final Map config) { + throw new IllegalStateException(); + } + }, logContext ); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/internals/TestDriverProducer.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/internals/TestDriverProducer.java index fce346594357d..14e5e1352936b 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/internals/TestDriverProducer.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/internals/TestDriverProducer.java @@ -18,19 +18,22 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; import java.util.Map; +import java.util.UUID; public class TestDriverProducer extends StreamsProducer { - public TestDriverProducer(final Producer producer, - final StreamThread.ProcessingMode processingMode, + public TestDriverProducer(final StreamsConfig config, + final KafkaClientSupplier clientSupplier, final LogContext logContext) { - super(producer, processingMode, logContext); + super(config, "TopologyTestDriver-Thread", clientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext); } @Override