diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d59ec66840665..7c858696975fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -577,7 +577,7 @@ public void closeCleanAndRecycleState() { protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { // commitNeeded indicates we may have processed some records since last commit // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not - if (commitNeeded) { + if (commitNeeded || enforceCheckpoint) { stateMgr.updateChangelogOffsets(checkpointableOffsets()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 0d5c18762e190..08cfc4b951b88 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.IsolationLevel; @@ -39,6 +40,8 @@ import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -46,7 +49,12 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.streams.state.internals.RocksDBStore; +import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockInternalProcessorContext; +import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.AfterClass; @@ -63,6 +71,8 @@ import java.io.File; import java.io.IOException; +import java.math.BigInteger; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -82,10 +92,12 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState; +import static org.apache.kafka.test.TestUtils.consumerConfig; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -135,12 +147,15 @@ public static void closeCluster() { private volatile boolean hasUnexpectedError = false; + private String stateTmpDir; + @SuppressWarnings("deprecation") @Parameters(name = "{0}") public static Collection data() { - return Arrays.asList(new String[][] { - {StreamsConfig.EXACTLY_ONCE}, - {StreamsConfig.EXACTLY_ONCE_V2} + return Arrays.asList(new String[][]{ + {StreamsConfig.AT_LEAST_ONCE}, + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_V2} }); } @@ -396,6 +411,8 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception { @Test public void shouldNotViolateEosIfOneTaskFails() throws Exception { + if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return; + // this test writes 10 + 5 + 5 records per partition (running with 2 partitions) // the app is supposed to copy all 40 records into the output topic // @@ -495,6 +512,8 @@ public void shouldNotViolateEosIfOneTaskFails() throws Exception { @Test public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { + if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return; + // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions) // the app is supposed to emit all 40 update records into the output topic // @@ -609,6 +628,8 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { @Test public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception { + if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return; + // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions) // the app is supposed to copy all 60 records into the output topic // @@ -765,6 +786,84 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th } } + @Test + public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception { + final List> writtenData = prepareData(0L, 10, 0L, 1L); + final List> expectedResult = computeExpectedResult(writtenData); + + try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, eosConfig, MAX_POLL_INTERVAL_MS)) { + + startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS); + + writeInputData(writtenData); + + waitForCondition( + () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS, + "SteamsTasks did not request commit."); + + final List> committedRecords = readResult(writtenData.size(), CONSUMER_GROUP_ID); + + checkResultPerKey( + committedRecords, + expectedResult, + "The committed records do not match what expected"); + + verifyStateStore( + streams, + getMaxPerKey(expectedResult), + "The state store content do not match what expected"); + } + + final Set> expectedState = getMaxPerKey(expectedResult); + verifyStateIsInStoreAndOffsetsAreInCheckpoint(0, expectedState); + verifyStateIsInStoreAndOffsetsAreInCheckpoint(1, expectedState); + + assertThat("Not all expected state values were found in the state stores", expectedState.isEmpty()); + } + + private void verifyStateIsInStoreAndOffsetsAreInCheckpoint(final int partition, final Set> expectedState) throws IOException { + final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator; + + // Verify that the data in the state store on disk is fully up-to-date + final StateStoreContext context = new MockInternalProcessorContext(new Properties(), new TaskId(0, 0), new File(stateStoreDir)); + final MockKeyValueStore stateStore = new MockKeyValueStore("store", false); + final RocksDBStore store = (RocksDBStore) new RocksDbKeyValueBytesStoreSupplier(storeName, false).get(); + store.init(context, stateStore); + + store.all().forEachRemaining(kv -> { + final KeyValue kv2 = new KeyValue<>(new BigInteger(kv.key.get()).longValue(), new BigInteger(kv.value).longValue()); + expectedState.remove(kv2); + }); + + // Verify that the checkpointed offsets match exactly with max offset of the records in the changelog + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateStoreDir + ".checkpoint")); + final Map checkpointedOffsets = checkpoint.read(); + checkpointedOffsets.forEach(this::verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset); + } + + private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(final TopicPartition tp, final long checkpointedOffset) { + final KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig(CLUSTER.bootstrapServers(), Serdes.ByteArray().deserializer().getClass(), Serdes.ByteArray().deserializer().getClass())); + final List partitions = Collections.singletonList(tp); + consumer.assign(partitions); + consumer.seekToEnd(partitions); + final long topicEndOffset = consumer.position(tp); + + assertTrue("changelog topic end " + topicEndOffset + " is less than checkpointed offset " + checkpointedOffset, + topicEndOffset >= checkpointedOffset); + + consumer.seekToBeginning(partitions); + + Long maxRecordOffset = null; + while (consumer.position(tp) != topicEndOffset) { + final List> records = consumer.poll(Duration.ofMillis(0)).records(tp); + if (!records.isEmpty()) { + maxRecordOffset = records.get(records.size() - 1).offset(); + } + } + + assertEquals("Checkpointed offset does not match end of changelog", maxRecordOffset, (Long) checkpointedOffset); + } + private List> prepareData(final long fromInclusive, final long toExclusive, final Long... keys) { @@ -875,6 +974,8 @@ public void close() { } } }, storeNames) .to(SINGLE_PARTITION_OUTPUT_TOPIC); + stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator; + final Properties properties = new Properties(); // Set commit interval to a larger value to avoid affection of controlled stream commit, // but not too large as we need to have a relatively low transaction timeout such @@ -890,7 +991,7 @@ public void close() { } properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir); + properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142"); final Properties config = StreamsTestUtils.getStreamsConfig( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 2c4df6571bbb4..077d921a0eb05 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1659,7 +1659,8 @@ public void shouldThrowIfPostCommittingOnIllegalState() { public void shouldSkipCheckpointingSuspendedCreatedTask() { stateManager.checkpoint(); EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint")); - EasyMock.replay(stateManager); + EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); + EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig("100"), true); task.suspend(); @@ -1672,7 +1673,8 @@ public void shouldCheckpointForSuspendedTask() { EasyMock.expectLastCall().once(); EasyMock.expect(stateManager.changelogOffsets()) .andReturn(singletonMap(partition1, 1L)); - EasyMock.replay(stateManager); + EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); + EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig("100"), true); task.initializeIfNeeded();