diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 721fd1c2349f0..2a8e532b627b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -132,6 +132,8 @@ private void maybeBeginTxn() { producer.beginTransaction(); } catch (final ProducerFencedException error) { throw new TaskMigratedException(taskId, "Producer get fenced trying to begin a new transaction", error); + } catch (final KafkaException error) { + throw new StreamsException("Producer encounter unexpected error trying to begin a new transaction", error); } transactionInFlight = true; } @@ -150,6 +152,8 @@ private void maybeAbortTxn() { */ // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens + } catch (final KafkaException error) { + throw new StreamsException("Producer encounter unexpected error trying to abort the transaction", error); } transactionInFlight = false; } @@ -157,9 +161,9 @@ private void maybeAbortTxn() { public void commit(final Map offsets) { if (eosEnabled) { - try { - maybeBeginTxn(); + maybeBeginTxn(); + try { producer.sendOffsetsToTransaction(offsets, applicationId); producer.commitTransaction(); transactionInFlight = false; @@ -247,6 +251,7 @@ public void send(final String topic, final StreamPartitioner partitioner) { final Integer partition; + // TODO K9113: we need to decide how to handle exceptions from partitionsFor if (partitioner != null) { final List partitions = producer.partitionsFor(topic); if (partitions.size() > 0) { @@ -273,9 +278,9 @@ public void send(final String topic, final Serializer valueSerializer) { checkForException(); - try { - maybeBeginTxn(); + maybeBeginTxn(); + try { final byte[] keyBytes = keySerializer.serialize(topic, headers, key); final byte[] valBytes = valueSerializer.serialize(topic, headers, value); @@ -301,7 +306,7 @@ public void send(final String topic, if (isRecoverable(uncaughtException)) { // producer.send() call may throw a KafkaException which wraps a FencedException, // in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException - throw new TaskMigratedException(taskId, "Producer cannot send records anymore since it got fenced", uncaughtException); + throw new TaskMigratedException(taskId, "Producer cannot send records anymore since it got fenced", uncaughtException.getCause()); } else { final String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, uncaughtException.toString()); throw new StreamsException(errorMessage, uncaughtException); @@ -354,7 +359,7 @@ public void close() { @Override public Map offsets() { - return Collections.unmodifiableMap(offsets); + return Collections.unmodifiableMap(new HashMap<>(offsets)); } // for testing only diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 869ddfd0262d8..50f955d7e987a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownProducerIdException; @@ -54,6 +54,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.StreamsTestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.Arrays; @@ -62,16 +64,13 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -// TODO K9113: should improve test coverage public class RecordCollectorTest { private final TaskId taskId = new TaskId(0, 0); @@ -79,189 +78,215 @@ public class RecordCollectorTest { private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics()); private final StreamsConfig streamsConfig = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test")); - private final List infos = Arrays.asList( - new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]) + private final String topic = "topic"; + private final Cluster cluster = new Cluster( + "cluster", + Collections.singletonList(Node.noNode()), + Arrays.asList( + new PartitionInfo(topic, 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topic, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topic, 2, Node.noNode(), new Node[0], new Node[0]) + ), + Collections.emptySet(), + Collections.emptySet() ); - private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, - Collections.emptySet(), Collections.emptySet()); - - - private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); private final StringSerializer stringSerializer = new StringSerializer(); - + private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); + private final MockProducer producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer); + private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final StreamPartitioner streamPartitioner = (topic, key, value, numPartitions) -> Integer.parseInt(key) % numPartitions; - @Test - public void testSpecificPartition() { + private RecordCollectorImpl collector; - final RecordCollectorImpl collector = new RecordCollectorImpl( - taskId, - streamsConfig, - logContext, - streamsMetrics, - null, - id -> new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) - ); + @Before + public void setup() { + collector = new RecordCollectorImpl(taskId, streamsConfig, logContext, streamsMetrics, consumer, id -> producer); + } - final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); + @After + public void cleanup() { + collector.close(); + } - collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); + @Test + public void shouldSendToSpecificPartition() { + final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); - collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", headers, 1, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer); + Map offsets = collector.offsets(); - final Map offsets = collector.offsets(); + assertEquals(2L, (long) offsets.get(new TopicPartition(topic, 0))); + assertEquals(1L, (long) offsets.get(new TopicPartition(topic, 1))); + assertEquals(0L, (long) offsets.get(new TopicPartition(topic, 2))); + assertEquals(6, producer.history().size()); - assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0))); - assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1))); - assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); + collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer); - // ignore StreamPartitioner - collector.send("topic1", "999", "0", null, 0, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", null, 1, null, stringSerializer, stringSerializer); - collector.send("topic1", "999", "0", headers, 2, null, stringSerializer, stringSerializer); + offsets = collector.offsets(); - assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0))); - assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); - assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 2))); + assertEquals(3L, (long) offsets.get(new TopicPartition(topic, 0))); + assertEquals(2L, (long) offsets.get(new TopicPartition(topic, 1))); + assertEquals(1L, (long) offsets.get(new TopicPartition(topic, 2))); + assertEquals(9, producer.history().size()); } @Test - public void testStreamPartitioner() { + public void shouldSendWithPartitioner() { + final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); - final RecordCollectorImpl collector = new RecordCollectorImpl( - taskId, - streamsConfig, - logContext, - streamsMetrics, - null, - id -> new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) - ); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "28", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "82", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "244", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); + final Map offsets = collector.offsets(); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + assertEquals(4L, (long) offsets.get(new TopicPartition(topic, 0))); + assertEquals(2L, (long) offsets.get(new TopicPartition(topic, 1))); + assertEquals(0L, (long) offsets.get(new TopicPartition(topic, 2))); + assertEquals(9, producer.history().size()); - collector.send("topic1", "28", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "82", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); - collector.send("topic1", "244", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); + // returned offsets should not be modified + final TopicPartition topicPartition = new TopicPartition(topic, 0); + assertThrows(UnsupportedOperationException.class, () -> offsets.put(topicPartition, 50L)); + } - collector.send("topic1", "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + @Test + public void shouldSendWithNoPartition() { + final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); + + collector.send(topic, "3", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "9", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "27", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "81", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "243", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "28", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "82", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "244", "0", headers, null, null, stringSerializer, stringSerializer); + collector.send(topic, "245", "0", headers, null, null, stringSerializer, stringSerializer); final Map offsets = collector.offsets(); - assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0))); - assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); - assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); + // with mock producer without specific partition, we would use default producer partitioner with murmur hash + assertEquals(3L, (long) offsets.get(new TopicPartition(topic, 0))); + assertEquals(2L, (long) offsets.get(new TopicPartition(topic, 1))); + assertEquals(1L, (long) offsets.get(new TopicPartition(topic, 2))); + assertEquals(9, producer.history().size()); } @Test - public void shouldNotAllowOffsetsToBeUpdatedExternally() { - final String topic = "topic1"; - final TopicPartition topicPartition = new TopicPartition(topic, 0); - - final RecordCollectorImpl collector = new RecordCollectorImpl( + public void shouldUpdateOffsetsUponCompletion() { + final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, logContext, streamsMetrics, - null, - id -> new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) + consumer, + id -> new MockProducer<>(cluster, false, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) ); + Map offsets = collector.offsets(); + collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); - collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); - collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer); + collector.send(topic, "999", "0", null, 2, null, stringSerializer, stringSerializer); - final Map offsets = collector.offsets(); + assertEquals(Collections.emptyMap(), offsets); - assertThat(offsets.get(topicPartition), equalTo(2L)); - assertThrows(UnsupportedOperationException.class, () -> offsets.put(new TopicPartition(topic, 0), 50L)); + collector.flush(); - assertThat(collector.offsets().get(topicPartition), equalTo(2L)); + offsets = collector.offsets(); + assertEquals((Long) 0L, offsets.get(new TopicPartition(topic, 0))); + assertEquals((Long) 0L, offsets.get(new TopicPartition(topic, 1))); + assertEquals((Long) 0L, offsets.get(new TopicPartition(topic, 2))); } - @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() { + @Test + public void shouldThrowStreamsExceptionOnSendFatalException() { + final KafkaException exception = new KafkaException(); final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - throw new KafkaException(); + throw exception; } } ); final StreamsException thrown = assertThrows(StreamsException.class, () -> - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) ); - assertThat(thrown.getCause(), instanceOf(KafkaException.class)); + assertEquals(exception, thrown.getCause()); } - @Test(expected = TaskMigratedException.class) - public void shouldThrowRecoverableExceptionOnProducerFencedException() { + @Test + public void shouldThrowTaskMigratedExceptionOnProducerFencedException() { + final KafkaException exception = new ProducerFencedException("KABOOM!"); final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - throw new KafkaException(new ProducerFencedException("asdf")); + throw new KafkaException(exception); } } ); - final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () -> - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) + final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) ); - assertThat(thrown.getCause(), instanceOf(KafkaException.class)); - assertThat(thrown.getCause().getCause(), instanceOf(ProducerFencedException.class)); + assertEquals(exception, thrown.getCause()); } @Test - public void shouldThrowRecoverableExceptionOnUnknownProducerException() { + public void shouldThrowTaskMigratedExceptionOnUnknownProducerIdException() { + final KafkaException exception = new UnknownProducerIdException("KABOOM!"); final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - throw new KafkaException(new UnknownProducerIdException("asdf")); + throw new KafkaException(exception); } } ); - final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () -> - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) + final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) ); - assertThat(thrown.getCause(), instanceOf(KafkaException.class)); - assertThat(thrown.getCause().getCause(), instanceOf(UnknownProducerIdException.class)); + assertEquals(exception, thrown.getCause()); } @Test - public void shouldThrowRecoverableExceptionWhenProducerFencedInCallback() { + public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedInCallback() { + final KafkaException exception = new ProducerFencedException("KABOOM!"); final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, @@ -271,21 +296,29 @@ public void shouldThrowRecoverableExceptionWhenProducerFencedInCallback() { id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new ProducerFencedException("asdf")); + callback.onCompletion(null, exception); return null; } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () -> - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + + TaskMigratedException thrown = assertThrows(TaskMigratedException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) ); - assertThat(thrown.getCause(), instanceOf(ProducerFencedException.class)); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(TaskMigratedException.class, collector::flush); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(TaskMigratedException.class, collector::close); + assertEquals(exception, thrown.getCause()); } @Test - public void shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() { + public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerUnknownInCallback() { + final KafkaException exception = new UnknownProducerIdException("KABOOM!"); final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, @@ -295,20 +328,28 @@ public void shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() { id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new UnknownProducerIdException("asdf")); + callback.onCompletion(null, exception); return null; } }); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - final RecoverableClientException thrown = assertThrows(RecoverableClientException.class, () -> - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + + TaskMigratedException thrown = assertThrows(TaskMigratedException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) ); - assertThat(thrown.getCause(), instanceOf(UnknownProducerIdException.class)); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(TaskMigratedException.class, collector::flush); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(TaskMigratedException.class, collector::close); + assertEquals(exception, thrown.getCause()); } @Test public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() { + final KafkaException exception = new KafkaException("KABOOM!"); final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, @@ -318,18 +359,24 @@ public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultEx id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new Exception()); + callback.onCompletion(null, exception); return null; } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - try { - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - fail("Should have thrown StreamsException"); - } catch (final StreamsException expected) { /* ok */ } + StreamsException thrown = assertThrows(StreamsException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) + ); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(StreamsException.class, collector::flush); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(StreamsException.class, collector::close); + assertEquals(exception, thrown.getCause()); } @Test @@ -353,7 +400,7 @@ public synchronized Future send(final ProducerRecord record, fin } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.flush(); final Metric metric = metrics.metrics().get(new MetricName( @@ -367,61 +414,122 @@ public synchronized Future send(final ProducerRecord record, fin assertTrue(messages.get(messages.size() - 1).endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.")); LogCaptureAppender.unregister(logCaptureAppender); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.flush(); + collector.close(); } @Test - public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() { + public void shouldThrowStreamsExceptionOnSubsequentCallIfFatalEvenWithContinueExceptionHandler() { + final KafkaException exception = new AuthenticationException("KABOOM!"); + final Properties props = StreamsTestUtils.getStreamsConfig("test"); + props.setProperty(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, AlwaysContinueProductionExceptionHandler.class.getName()); final RecordCollector collector = new RecordCollectorImpl( taskId, - streamsConfig, + new StreamsConfig(props), logContext, streamsMetrics, null, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new Exception()); + callback.onCompletion(null, exception); return null; } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + + StreamsException thrown = assertThrows(StreamsException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) + ); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(StreamsException.class, collector::flush); + assertEquals(exception, thrown.getCause()); + + thrown = assertThrows(StreamsException.class, collector::close); + assertEquals(exception, thrown.getCause()); + } + + @Test + public void shouldThrowStreamsExceptionOnEOSInitializeTimeout() { + final KafkaException exception = new TimeoutException("KABOOM!"); + final Properties props = StreamsTestUtils.getStreamsConfig("test"); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + + final StreamsException thrown = assertThrows(StreamsException.class, () -> + new RecordCollectorImpl( + taskId, + new StreamsConfig(props), + logContext, + streamsMetrics, + null, + id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public void initTransactions() { + throw exception; + } + } + ) + ); + assertEquals(exception, thrown.getCause()); + } + + @Test + public void shouldThrowStreamsExceptionOnEOSInitializeError() { + final KafkaException exception = new KafkaException("KABOOM!"); + final Properties props = StreamsTestUtils.getStreamsConfig("test"); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - try { - collector.flush(); - fail("Should have thrown StreamsException"); - } catch (final StreamsException expected) { /* ok */ } + final StreamsException thrown = assertThrows(StreamsException.class, () -> + new RecordCollectorImpl( + taskId, + new StreamsConfig(props), + logContext, + streamsMetrics, + null, + id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public void initTransactions() { + throw exception; + } + } + ) + ); + assertEquals(exception, thrown.getCause()); } @Test - public void shouldThrowStreamsExceptionWithTimeoutHintOnProducerTimeoutWithDefaultExceptionHandler() { + public void shouldThrowMigrateExceptionOnEOSFirstSendProducerFenced() { + final KafkaException exception = new ProducerFencedException("KABOOM!"); + final Properties props = StreamsTestUtils.getStreamsConfig("test"); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( taskId, - streamsConfig, + new StreamsConfig(props), logContext, streamsMetrics, null, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new TimeoutException()); - return null; + public synchronized void beginTransaction() { + throw exception; } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - - final StreamsException expected = assertThrows(StreamsException.class, collector::flush); - assertTrue(expected.getCause() instanceof TimeoutException); + final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)); + assertEquals(exception, thrown.getCause()); } @Test - public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExceptionHandler() { + public void shouldThrowMigrateExceptionOnEOSFirstSendFatal() { + final KafkaException exception = new KafkaException("KABOOM!"); final Properties props = StreamsTestUtils.getStreamsConfig("test"); - props.setProperty(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, AlwaysContinueProductionExceptionHandler.class.getName()); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( taskId, new StreamsConfig(props), @@ -430,91 +538,78 @@ public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExcept null, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new Exception()); - return null; + public synchronized void beginTransaction() { + throw exception; } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - - collector.flush(); + final StreamsException thrown = assertThrows(StreamsException.class, () -> + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)); + assertEquals(exception, thrown.getCause()); } @Test - public void shouldThrowStreamsExceptionOnCloseIfASendFailedWithDefaultExceptionHandler() { + public void shouldFailWithMigrateExceptionOnCommitFailed() { final RecordCollector collector = new RecordCollectorImpl( taskId, streamsConfig, logContext, streamsMetrics, - null, - id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new Exception()); - return null; + public void commitSync(final Map offsets) { + throw new CommitFailedException(); } - } + }, + id -> new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - - try { - collector.close(); - fail("Should have thrown StreamsException"); - } catch (final StreamsException expected) { /* ok */ } + assertThrows(TaskMigratedException.class, () -> collector.commit(null)); } @Test - public void shouldNotThrowStreamsExceptionOnCloseIfASendFailedWithContinueExceptionHandler() { - final Properties props = StreamsTestUtils.getStreamsConfig("test"); - props.setProperty(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, AlwaysContinueProductionExceptionHandler.class.getName()); + public void shouldFailWithMigrateExceptionOnCommitUnexpected() { final RecordCollector collector = new RecordCollectorImpl( taskId, - new StreamsConfig(props), + streamsConfig, logContext, streamsMetrics, - null, - id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new Exception()); - return null; + public void commitSync(final Map offsets) { + throw new KafkaException(); } - } + }, + id -> new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - - collector.close(); + assertThrows(StreamsException.class, () -> collector.commit(null)); } @Test - public void shouldThrowStreamsExceptionOnEOSInitializeTimeout() { + public void shouldFailWithMigrateExceptionOnEOSBeginTxnFenced() { final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - - assertThrows(StreamsException.class, () -> - new RecordCollectorImpl( - taskId, - new StreamsConfig(props), - logContext, - streamsMetrics, - null, - id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { - @Override - public void initTransactions() { - throw new TimeoutException("test"); - } + final RecordCollector collector = new RecordCollectorImpl( + taskId, + new StreamsConfig(props), + logContext, + streamsMetrics, + null, + id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public void beginTransaction() { + throw new ProducerFencedException("KABOOM!"); } - ) + } ); + + assertThrows(TaskMigratedException.class, () -> collector.commit(null)); } @Test - public void shouldThrowMigrateExceptionOnEOSProcessFenced() { + public void shouldFailWithMigrateExceptionOnEOSSendOffsetFenced() { final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( @@ -525,17 +620,17 @@ public void shouldThrowMigrateExceptionOnEOSProcessFenced() { null, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - throw new KafkaException(new ProducerFencedException("boom")); + public void sendOffsetsToTransaction(final Map offsets, final String consumerGroupId) { + throw new ProducerFencedException("KABOOM!"); } } ); - assertThrows(TaskMigratedException.class, () -> collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)); + assertThrows(TaskMigratedException.class, () -> collector.commit(null)); } @Test - public void shouldFailWithMigrateExceptionOnEOSProcessFenced() { + public void shouldFailWithMigrateExceptionOnEOSCommitFenced() { final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( @@ -546,20 +641,17 @@ public void shouldFailWithMigrateExceptionOnEOSProcessFenced() { null, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new ProducerFencedException("boom")); - return null; + public void commitTransaction() { + throw new ProducerFencedException("KABOOM!"); } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - - assertThrows(TaskMigratedException.class, () -> collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)); + assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.emptyMap())); } @Test - public void shouldFailWithMigrateExceptionOnEOSProcessUnknownPid() { + public void shouldFailWithMigrateExceptionOnEOSBeginTxnUnexpected() { final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( @@ -570,59 +662,60 @@ public void shouldFailWithMigrateExceptionOnEOSProcessUnknownPid() { null, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public synchronized Future send(final ProducerRecord record, final Callback callback) { - callback.onCompletion(null, new OutOfOrderSequenceException("boom")); - return null; + public void beginTransaction() { + throw new KafkaException("KABOOM!"); } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); - - assertThrows(TaskMigratedException.class, - () -> collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)); + assertThrows(StreamsException.class, () -> collector.commit(null)); } @Test - public void shouldFailWithMigrateExceptionOnCommitFailed() { + public void shouldFailWithStreamsExceptionOnEOSSendOffsetUnexpected() { + final Properties props = StreamsTestUtils.getStreamsConfig("test"); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( taskId, - streamsConfig, + new StreamsConfig(props), logContext, streamsMetrics, - new MockConsumer(OffsetResetStrategy.EARLIEST) { + null, + id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public void commitSync(final Map offsets) { - throw new CommitFailedException(); + public void sendOffsetsToTransaction(final Map offsets, final String consumerGroupId) { + throw new KafkaException("KABOOM!"); } - }, - id -> new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) + } ); - assertThrows(TaskMigratedException.class, () -> collector.commit(null)); + assertThrows(StreamsException.class, () -> collector.commit(null)); } @Test - public void shouldFailWithMigrateExceptionOnCommitUnexpected() { + public void shouldFailWithMigrateExceptionOnEOSCommitUnexpected() { + final Properties props = StreamsTestUtils.getStreamsConfig("test"); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( taskId, - streamsConfig, + new StreamsConfig(props), logContext, streamsMetrics, - new MockConsumer(OffsetResetStrategy.EARLIEST) { + null, + id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public void commitSync(final Map offsets) { - throw new KafkaException(); + public void commitTransaction() { + throw new KafkaException("KABOOM!"); } - }, - id -> new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) + } ); - assertThrows(StreamsException.class, () -> collector.commit(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5L)))); + assertThrows(StreamsException.class, () -> collector.commit(Collections.emptyMap())); } @Test - public void shouldFailWithMigrateExceptionOnEOSSendOffsetFenced() { + public void shouldThrowStreamsExceptionOnEOSCloseFatalException() { + final KafkaException exception = new KafkaException(); final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( @@ -630,20 +723,22 @@ public void shouldFailWithMigrateExceptionOnEOSSendOffsetFenced() { new StreamsConfig(props), logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public void sendOffsetsToTransaction(final Map offsets, final String consumerGroupId) { - throw new ProducerFencedException("boom"); + public void close() { + throw exception; } } ); - assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5L)))); + final StreamsException thrown = assertThrows(StreamsException.class, collector::close); + assertEquals(exception, thrown.getCause()); } @Test - public void shouldFailWithMigrateExceptionOnEOSCommitFenced() { + public void shouldNotAbortTxnOnEOSCloseIfNothingSent() { + final AtomicBoolean functionCalled = new AtomicBoolean(false); final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( @@ -651,20 +746,24 @@ public void shouldFailWithMigrateExceptionOnEOSCommitFenced() { new StreamsConfig(props), logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public void commitTransaction() { - throw new ProducerFencedException("boom"); + public void abortTransaction() { + functionCalled.set(true); + super.abortTransaction(); } } ); - assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5L)))); + collector.close(); + + assertFalse(functionCalled.get()); } @Test - public void shouldFailWithStreamsExceptionOnEOSSendOffsetUnexpected() { + public void shouldNotAbortTxnOnEOSCloseIfCommitted() { + final AtomicBoolean functionCalled = new AtomicBoolean(false); final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( @@ -672,20 +771,27 @@ public void shouldFailWithStreamsExceptionOnEOSSendOffsetUnexpected() { new StreamsConfig(props), logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public void sendOffsetsToTransaction(final Map offsets, final String consumerGroupId) { - throw new KafkaException("boom"); + public void abortTransaction() { + functionCalled.set(true); + super.abortTransaction(); } } ); - assertThrows(StreamsException.class, () -> collector.commit(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5L)))); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + collector.commit(Collections.emptyMap()); + + collector.close(); + + assertFalse(functionCalled.get()); } @Test - public void shouldFailWithMigrateExceptionOnEOSCommitUnexpected() { + public void shouldThrowStreamsExceptionOnEOSAbortTxnFatalException() { + final KafkaException exception = new KafkaException(); final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( @@ -693,44 +799,50 @@ public void shouldFailWithMigrateExceptionOnEOSCommitUnexpected() { new StreamsConfig(props), logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public void commitTransaction() { - throw new KafkaException("boom"); + public void abortTransaction() { + throw exception; } } ); - assertThrows(StreamsException.class, () -> collector.commit(Collections.singletonMap(new TopicPartition("topic", 0), new OffsetAndMetadata(5L)))); + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + + final StreamsException thrown = assertThrows(StreamsException.class, collector::close); + assertEquals(exception, thrown.getCause()); } - @Test(expected = StreamsException.class) - public void shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() { + @Test + public void shouldSwallowOnEOSAbortTxnFatalException() { + final Properties props = StreamsTestUtils.getStreamsConfig("test"); + props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final RecordCollector collector = new RecordCollectorImpl( taskId, - streamsConfig, + new StreamsConfig(props), logContext, streamsMetrics, - null, + consumer, id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override - public List partitionsFor(final String topic) { - return Collections.emptyList(); + public void abortTransaction() { + throw new ProducerFencedException("KABOOM!"); } } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + // this call is to begin an inflight txn + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + + collector.close(); } - @Test(expected = StreamsException.class) - public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() { - final Properties props = StreamsTestUtils.getStreamsConfig("test"); - props.setProperty(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, AlwaysContinueProductionExceptionHandler.class.getName()); + @Test + public void shouldThrowIfTopicIsUnknownOnSendWithPartitioner() { final RecordCollector collector = new RecordCollectorImpl( taskId, - new StreamsConfig(props), + streamsConfig, logContext, streamsMetrics, null, @@ -742,11 +854,12 @@ public List partitionsFor(final String topic) { } ); - collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); + final StreamsException thrown = assertThrows(StreamsException.class, () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)); + assertTrue(thrown.getMessage().startsWith("Could not get partition information for topic")); } @Test - public void testRecordHeaderPassThroughSerializer() { + public void shouldPassThroughRecordHeaderToSerializer() { final CustomStringSerializer keySerializer = new CustomStringSerializer(); final CustomStringSerializer valueSerializer = new CustomStringSerializer(); keySerializer.configure(Collections.emptyMap(), true); @@ -762,7 +875,7 @@ public void testRecordHeaderPassThroughSerializer() { id -> mockProducer ); - collector.send("topic1", "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, streamPartitioner); + collector.send(topic, "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, streamPartitioner); final List> recordHistory = mockProducer.history(); for (final ProducerRecord sentRecord : recordHistory) {