Skip to content

Commit

Permalink
KAFKA-13249: Always update changelog offsets before writing the check…
Browse files Browse the repository at this point in the history
…point file (apache#11283)

When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .

I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
hutchiko authored and Ralph Debusmann committed Dec 22, 2021
1 parent 1f56f79 commit c3caea2
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ public void closeCleanAndRecycleState() {
protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
// commitNeeded indicates we may have processed some records since last commit
// and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
if (commitNeeded) {
if (commitNeeded || enforceCheckpoint) {
stateMgr.updateChangelogOffsets(checkpointableOffsets());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.IsolationLevel;
Expand All @@ -39,14 +40,21 @@
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
Expand All @@ -63,6 +71,8 @@

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -82,10 +92,12 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
import static org.apache.kafka.test.TestUtils.consumerConfig;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -135,12 +147,15 @@ public static void closeCluster() {

private volatile boolean hasUnexpectedError = false;

private String stateTmpDir;

@SuppressWarnings("deprecation")
@Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_V2}
return Arrays.asList(new String[][]{
{StreamsConfig.AT_LEAST_ONCE},
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_V2}
});
}

Expand Down Expand Up @@ -396,6 +411,8 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception {

@Test
public void shouldNotViolateEosIfOneTaskFails() throws Exception {
if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;

// this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to copy all 40 records into the output topic
//
Expand Down Expand Up @@ -495,6 +512,8 @@ public void shouldNotViolateEosIfOneTaskFails() throws Exception {

@Test
public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;

// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to emit all 40 update records into the output topic
//
Expand Down Expand Up @@ -609,6 +628,8 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {

@Test
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;

// this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
// the app is supposed to copy all 60 records into the output topic
//
Expand Down Expand Up @@ -765,6 +786,84 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
}
}

@Test
public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);

try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, eosConfig, MAX_POLL_INTERVAL_MS)) {

startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);

writeInputData(writtenData);

waitForCondition(
() -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
"SteamsTasks did not request commit.");

final List<KeyValue<Long, Long>> committedRecords = readResult(writtenData.size(), CONSUMER_GROUP_ID);

checkResultPerKey(
committedRecords,
expectedResult,
"The committed records do not match what expected");

verifyStateStore(
streams,
getMaxPerKey(expectedResult),
"The state store content do not match what expected");
}

final Set<KeyValue<Long, Long>> expectedState = getMaxPerKey(expectedResult);
verifyStateIsInStoreAndOffsetsAreInCheckpoint(0, expectedState);
verifyStateIsInStoreAndOffsetsAreInCheckpoint(1, expectedState);

assertThat("Not all expected state values were found in the state stores", expectedState.isEmpty());
}

private void verifyStateIsInStoreAndOffsetsAreInCheckpoint(final int partition, final Set<KeyValue<Long, Long>> expectedState) throws IOException {
final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator;

// Verify that the data in the state store on disk is fully up-to-date
final StateStoreContext context = new MockInternalProcessorContext(new Properties(), new TaskId(0, 0), new File(stateStoreDir));
final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
final RocksDBStore store = (RocksDBStore) new RocksDbKeyValueBytesStoreSupplier(storeName, false).get();
store.init(context, stateStore);

store.all().forEachRemaining(kv -> {
final KeyValue<Long, Long> kv2 = new KeyValue<>(new BigInteger(kv.key.get()).longValue(), new BigInteger(kv.value).longValue());
expectedState.remove(kv2);
});

// Verify that the checkpointed offsets match exactly with max offset of the records in the changelog
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateStoreDir + ".checkpoint"));
final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read();
checkpointedOffsets.forEach(this::verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset);
}

private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(final TopicPartition tp, final long checkpointedOffset) {
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig(CLUSTER.bootstrapServers(), Serdes.ByteArray().deserializer().getClass(), Serdes.ByteArray().deserializer().getClass()));
final List<TopicPartition> partitions = Collections.singletonList(tp);
consumer.assign(partitions);
consumer.seekToEnd(partitions);
final long topicEndOffset = consumer.position(tp);

assertTrue("changelog topic end " + topicEndOffset + " is less than checkpointed offset " + checkpointedOffset,
topicEndOffset >= checkpointedOffset);

consumer.seekToBeginning(partitions);

Long maxRecordOffset = null;
while (consumer.position(tp) != topicEndOffset) {
final List<ConsumerRecord<String, String>> records = consumer.poll(Duration.ofMillis(0)).records(tp);
if (!records.isEmpty()) {
maxRecordOffset = records.get(records.size() - 1).offset();
}
}

assertEquals("Checkpointed offset does not match end of changelog", maxRecordOffset, (Long) checkpointedOffset);
}

private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
final long toExclusive,
final Long... keys) {
Expand Down Expand Up @@ -875,6 +974,8 @@ public void close() { }
} }, storeNames)
.to(SINGLE_PARTITION_OUTPUT_TOPIC);

stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator;

final Properties properties = new Properties();
// Set commit interval to a larger value to avoid affection of controlled stream commit,
// but not too large as we need to have a relatively low transaction timeout such
Expand All @@ -890,7 +991,7 @@ public void close() { }
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");

final Properties config = StreamsTestUtils.getStreamsConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,8 @@ public void shouldThrowIfPostCommittingOnIllegalState() {
public void shouldSkipCheckpointingSuspendedCreatedTask() {
stateManager.checkpoint();
EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
EasyMock.replay(stateManager);
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);

task = createStatefulTask(createConfig("100"), true);
task.suspend();
Expand All @@ -1672,7 +1673,8 @@ public void shouldCheckpointForSuspendedTask() {
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogOffsets())
.andReturn(singletonMap(partition1, 1L));
EasyMock.replay(stateManager);
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);

task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
Expand Down

0 comments on commit c3caea2

Please sign in to comment.