From d3c067f35d184ca75e8cc59bedd56689cbc8269b Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 6 Mar 2020 15:38:55 -0800 Subject: [PATCH] MINOR: Check store directory empty to decide whether throw task corrupted exception with EOS (#8180) Before we register the stores (and hence create the store dirs), we check if the task dir is empty except the lock / checkpoint files. Then later when loading the checkpoint files if we do not find the offsets AND the store dirs are not empty, meaning that the stores may be not empty, we treat it as task corrupted. Reviewers: John Roesler --- .../errors/TaskCorruptedException.java | 8 +- .../processor/internals/AbstractTask.java | 2 +- .../internals/ActiveTaskCreator.java | 7 +- .../internals/ProcessorStateManager.java | 45 ++++++---- .../internals/StandbyTaskCreator.java | 9 +- .../processor/internals/StateDirectory.java | 16 ++++ .../processor/internals/StateManagerUtil.java | 9 +- .../internals/StoreChangelogReader.java | 2 +- .../processor/internals/StreamTask.java | 74 ++++++++-------- .../processor/internals/StreamThread.java | 1 + .../streams/processor/internals/Task.java | 2 +- .../processor/internals/TaskManager.java | 14 +-- .../EOSUncleanShutdownIntegrationTest.java | 11 +-- .../internals/ProcessorStateManagerTest.java | 87 ++++++++++++++----- .../processor/internals/StandbyTaskTest.java | 12 ++- .../internals/StateDirectoryTest.java | 45 ++++++++++ .../internals/StateManagerUtilTest.java | 16 ++-- .../processor/internals/StreamTaskTest.java | 29 +++++-- .../StreamThreadStateStoreProviderTest.java | 7 +- .../kafka/streams/TopologyTestDriver.java | 7 +- 20 files changed, 281 insertions(+), 122 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java index f1a3b48320b7..770ba7052087 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java @@ -19,8 +19,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; +import java.util.Collection; import java.util.Map; -import java.util.Set; /** * Indicates a specific task is corrupted and need to be re-initialized. It can be thrown when @@ -32,15 +32,15 @@ */ public class TaskCorruptedException extends StreamsException { - private final Map> taskWithChangelogs; + private final Map> taskWithChangelogs; - public TaskCorruptedException(final Map> taskWithChangelogs) { + public TaskCorruptedException(final Map> taskWithChangelogs) { super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized"); this.taskWithChangelogs = taskWithChangelogs; } - public Map> corruptedTaskWithChangelogs() { + public Map> corruptedTaskWithChangelogs() { return taskWithChangelogs; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index ee04b25ab84e..8d456ba8f28d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -63,7 +63,7 @@ public Collection changelogPartitions() { } @Override - public void markChangelogAsCorrupted(final Set partitions) { + public void markChangelogAsCorrupted(final Collection partitions) { stateMgr.markChangelogAsCorrupted(partitions); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 43ae0d412ad6..2f40556fcba5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -123,12 +123,13 @@ Collection createTasks(final Consumer consumer, final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, - partitions, Task.TaskType.ACTIVE, + EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), + logContext, stateDirectory, - topology.storeToChangelogTopic(), storeChangelogReader, - logContext + topology.storeToChangelogTopic(), + partitions ); if (threadProducer == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 9a58d5fd7b2d..59b1deaa5da1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -34,10 +35,10 @@ import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import static java.lang.String.format; @@ -141,6 +142,7 @@ public String toString() { private final TaskId taskId; private final String logPrefix; private final TaskType taskType; + private final boolean eosEnabled; private final ChangelogRegister changelogReader; private final Map storeToChangelogTopic; private final Collection sourcePartitions; @@ -152,8 +154,7 @@ public String toString() { private final File baseDir; private final OffsetCheckpoint checkpointFile; - public static String storeChangelogTopic(final String applicationId, - final String storeName) { + public static String storeChangelogTopic(final String applicationId, final String storeName) { return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; } @@ -161,19 +162,21 @@ public static String storeChangelogTopic(final String applicationId, * @throws ProcessorStateException if the task directory does not exist and could not be created */ public ProcessorStateManager(final TaskId taskId, - final Collection sources, final TaskType taskType, + final boolean eosEnabled, + final LogContext logContext, final StateDirectory stateDirectory, - final Map storeToChangelogTopic, final ChangelogRegister changelogReader, - final LogContext logContext) throws ProcessorStateException { - this.logPrefix = format("task [%s] ", taskId); - this.log = logContext.logger(ProcessorStateManager.class); + final Map storeToChangelogTopic, + final Collection sourcePartitions) throws ProcessorStateException { + this.log = logContext.logger(ProcessorStateManager.class); + this.logPrefix = logContext.logPrefix(); this.taskId = taskId; this.taskType = taskType; - this.sourcePartitions = sources; + this.eosEnabled = eosEnabled; this.changelogReader = changelogReader; + this.sourcePartitions = sourcePartitions; this.storeToChangelogTopic = storeToChangelogTopic; this.baseDir = stateDirectory.directoryForTask(taskId); @@ -195,7 +198,7 @@ public StateStore getGlobalStore(final String name) { } // package-private for test only - void initializeStoreOffsetsFromCheckpoint() { + void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { try { final Map loadedCheckpoints = checkpointFile.read(); @@ -211,11 +214,21 @@ void initializeStoreOffsetsFromCheckpoint() { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { - // TODO K9113: for EOS when there's no checkpointed offset, we should treat it as TaskCorrupted - - log.info("State store {} did not find checkpoint offset, hence would " + + // with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file + // and hence we are uncertain the the current local state only contains committed data; + // in that case we need to treat it as a task-corrupted exception + if (eosEnabled && !storeDirIsEmpty) { + log.warn("State store {} did not find checkpoint offsets while stores are not empty, " + + "since under EOS it has the risk of getting uncommitted data in stores we have to " + + "treat it as a task corruption error and wipe out the local state of task {} " + + "before re-bootstrapping", store.stateStore.name(), taskId); + + throw new TaskCorruptedException(Collections.singletonMap(taskId, changelogPartitions())); + } else { + log.info("State store {} did not find checkpoint offset, hence would " + "default to the starting offset at changelog {}", - store.stateStore.name(), store.changelogPartition); + store.stateStore.name(), store.changelogPartition); + } } } } @@ -225,6 +238,8 @@ void initializeStoreOffsetsFromCheckpoint() { } checkpointFile.delete(); + } catch (final TaskCorruptedException e) { + throw e; } catch (final IOException | RuntimeException e) { // both IOException or runtime exception like number parsing can throw throw new ProcessorStateException(format("%sError loading and deleting checkpoint file when creating the state manager", @@ -287,7 +302,7 @@ Collection changelogPartitions() { return changelogOffsets().keySet(); } - void markChangelogAsCorrupted(final Set partitions) { + void markChangelogAsCorrupted(final Collection partitions) { for (final StateStoreMetadata storeMetadata : stores.values()) { if (partitions.contains(storeMetadata.changelogPartition)) { storeMetadata.corrupted = true; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index fbebe72ae68b..16f39c922980 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; + class StandbyTaskCreator { private final InternalTopologyBuilder builder; private final StreamsConfig config; @@ -71,12 +73,13 @@ Collection createTasks(final Map> tasksToBeCre if (topology.hasStateWithChangelogs()) { final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, - partitions, Task.TaskType.STANDBY, + EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), + logContext, stateDirectory, - topology.storeToChangelogTopic(), storeChangelogReader, - logContext + topology.storeToChangelogTopic(), + partitions ); final StandbyTask task = new StandbyTask( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index f5c4c31d0839..206867802cdd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -36,6 +36,8 @@ import java.util.HashMap; import java.util.regex.Pattern; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; + /** * Manages the directories where the state of Tasks owned by a {@link StreamThread} are * stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not @@ -105,6 +107,20 @@ public File directoryForTask(final TaskId taskId) { return taskDir; } + /** + * Decide if the directory of the task is empty or not + */ + boolean directoryForTaskIsEmpty(final TaskId taskId) { + final File taskDir = directoryForTask(taskId); + + final File[] storeDirs = taskDir.listFiles(pathname -> + !pathname.getName().equals(LOCK_FILE_NAME) && + !pathname.getName().equals(CHECKPOINT_FILE_NAME)); + + // if the task is stateless, storeDirs would be null + return storeDirs == null || storeDirs.length == 0; + } + /** * Get or create the directory for the global stores. * @return directory for the global stores diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 17d15cfed1ae..dbecc58eb0d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -71,6 +71,8 @@ static void registerStateStores(final Logger log, } log.debug("Acquired state directory lock"); + final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id); + // We should only load checkpoint AFTER the corresponding state directory lock has been acquired and // the state stores have been registered; we should not try to load at the state manager construction time. // See https://issues.apache.org/jira/browse/KAFKA-8574 @@ -79,7 +81,8 @@ static void registerStateStores(final Logger log, store.init(processorContext, store); log.trace("Registered state store {}", store.name()); } - stateMgr.initializeStoreOffsetsFromCheckpoint(); + + stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty); log.debug("Initialized state stores"); } @@ -106,7 +109,9 @@ static void closeStateManager(final Logger log, stateMgr.close(); if (wipeStateStore) { - // we can just delete the whole dir of the task, including the state store images and the checkpoint files + // we can just delete the whole dir of the task, including the state store images and the checkpoint files, + // and then we write an empty checkpoint file indicating that the previous close is graceful and we just + // need to re-bootstrap the restoration from the beginning Utils.delete(stateMgr.baseDir()); } } catch (final ProcessorStateException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 1b08177ab3a6..7cd21f84ae31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -423,7 +423,7 @@ public void restore() { "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + " it later.", e.getClass().getName(), e.partitions()); - final Map> taskWithCorruptedChangelogs = new HashMap<>(); + final Map> taskWithCorruptedChangelogs = new HashMap<>(); for (final TopicPartition partition : e.partitions()) { final TaskId taskId = changelogs.get(partition).stateManager.taskId(); taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index bdabf43759d4..e3821498c3a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -413,48 +413,50 @@ public void closeDirty() { */ private void close(final boolean clean) { if (state() == State.CREATED) { - // the task is created and not initialized, do nothing - transitionTo(State.CLOSING); - } else { - if (state() == State.RUNNING) { - closeTopology(clean); + // the task is created and not initialized, just re-write the checkpoint file + executeAndMaybeSwallow(clean, () -> { + stateMgr.checkpoint(Collections.emptyMap()); + }, "state manager checkpoint"); - if (clean) { - commitState(); - // whenever we have successfully committed state, it is safe to checkpoint - // the state as well no matter if EOS is enabled or not - stateMgr.checkpoint(checkpointableOffsets()); - } else { - executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush"); - } + transitionTo(State.CLOSING); + } else if (state() == State.RUNNING) { + closeTopology(clean); - transitionTo(State.CLOSING); - } else if (state() == State.RESTORING) { - executeAndMaybeSwallow(clean, () -> { - stateMgr.flush(); - stateMgr.checkpoint(Collections.emptyMap()); - }, "state manager flush and checkpoint"); - - transitionTo(State.CLOSING); - } else if (state() == State.SUSPENDED) { - // do not need to commit / checkpoint, since when suspending we've already committed the state - transitionTo(State.CLOSING); + if (clean) { + commitState(); + // whenever we have successfully committed state, it is safe to checkpoint + // the state as well no matter if EOS is enabled or not + stateMgr.checkpoint(checkpointableOffsets()); + } else { + executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush"); } - if (state() == State.CLOSING) { - // if EOS is enabled, we wipe out the whole state store for unclean close - // since they are invalid to use anymore - final boolean wipeStateStore = !clean && !eosDisabled; + transitionTo(State.CLOSING); + } else if (state() == State.RESTORING) { + executeAndMaybeSwallow(clean, () -> { + stateMgr.flush(); + stateMgr.checkpoint(Collections.emptyMap()); + }, "state manager flush and checkpoint"); - // first close state manager (which is idempotent) then close the record collector (which could throw), - // if the latter throws and we re-close dirty which would close the state manager again. - StateManagerUtil.closeStateManager(log, logPrefix, clean, - wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE); + transitionTo(State.CLOSING); + } else if (state() == State.SUSPENDED) { + // do not need to commit / checkpoint, since when suspending we've already committed the state + transitionTo(State.CLOSING); + } - executeAndMaybeSwallow(clean, recordCollector::close, "record collector close"); - } else { - throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id); - } + if (state() == State.CLOSING) { + // if EOS is enabled, we wipe out the whole state store for unclean close + // since they are invalid to use anymore + final boolean wipeStateStore = !clean && !eosDisabled; + + // first close state manager (which is idempotent) then close the record collector (which could throw), + // if the latter throws and we re-close dirty which would close the state manager again. + StateManagerUtil.closeStateManager(log, logPrefix, clean, + wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE); + + executeAndMaybeSwallow(clean, recordCollector::close, "record collector close"); + } else { + throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id); } partitionGroup.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1465110b61c4..d67fff8577ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -240,6 +240,7 @@ int getAssignmentErrorCode() { return assignmentErrorCode.get(); } + private final Time time; private final Logger log; private final String logPrefix; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 332c39e1d52c..bdf4ed4bf4ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -174,7 +174,7 @@ enum TaskType { */ Map changelogOffsets(); - void markChangelogAsCorrupted(final Set partitions); + void markChangelogAsCorrupted(final Collection partitions); default Map purgableOffsets() { return Collections.emptyMap(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 009a856a1143..ba75f86b0c32 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -132,8 +132,8 @@ void handleRebalanceComplete() { rebalanceInProgress = false; } - void handleCorruption(final Map> taskWithChangelogs) { - for (final Map.Entry> entry : taskWithChangelogs.entrySet()) { + void handleCorruption(final Map> taskWithChangelogs) { + for (final Map.Entry> entry : taskWithChangelogs.entrySet()) { final TaskId taskId = entry.getKey(); final Task task = tasks.get(taskId); @@ -141,16 +141,10 @@ void handleCorruption(final Map> taskWithChangelogs) changelogReader.remove(task.changelogPartitions()); // mark corrupted partitions to not be checkpointed, and then close the task as dirty - final Set corruptedPartitions = entry.getValue(); + final Collection corruptedPartitions = entry.getValue(); task.markChangelogAsCorrupted(corruptedPartitions); - try { - task.closeClean(); - } catch (final RuntimeException e) { - log.error("Failed to close task {} cleanly while handling corrupted tasks. Attempting to re-close it as dirty.", task.id()); - task.closeDirty(); - } - + task.closeDirty(); task.revive(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index c82759d77560..84019219b52c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -56,6 +56,7 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.junit.Assert.assertFalse; + /** * Test the unclean shutdown behavior around state store cleanup. */ @@ -101,8 +102,8 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE final AtomicInteger recordCount = new AtomicInteger(0); final KTable valueCounts = inputStream - .groupByKey() - .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as("aggregated_value")); + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as("aggregated_value")); valueCounts.toStream().peek((key, value) -> { if (recordCount.incrementAndGet() >= RECORD_TOTAL) { throw new IllegalStateException("Crash on the " + RECORD_TOTAL + " record"); @@ -117,10 +118,9 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE )); final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, builder, true); - final File stateDir = new File( - String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0")); - try { + final File stateDir = new File(String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0")); + try { IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(), singletonList(new KeyValueTimestamp<>("k1", "v1", 0L))); @@ -139,6 +139,7 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE driver.close(); + // the state directory should still exist with the empty checkpoint file assertFalse(stateDir.exists()); cleanStateAfterTest(CLUSTER, driver); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 624f9dc3c8be..0978877f3387 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; @@ -157,16 +158,17 @@ public void shouldReportTaskType() { public void shouldReportChangelogAsSource() { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, - mkSet(persistentStorePartition, nonPersistentStorePartition), Task.TaskType.STANDBY, + false, + logContext, stateDirectory, + changelogReader, mkMap( mkEntry(persistentStoreName, persistentStoreTopicName), mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName), mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName) ), - changelogReader, - logContext); + mkSet(persistentStorePartition, nonPersistentStorePartition)); assertTrue(stateMgr.changelogAsSource(persistentStorePartition)); assertTrue(stateMgr.changelogAsSource(nonPersistentStorePartition)); @@ -177,15 +179,15 @@ public void shouldReportChangelogAsSource() { public void shouldFindSingleStoreForChangelog() { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, - Collections.emptySet(), Task.TaskType.STANDBY, + false, + logContext, stateDirectory, - mkMap( + changelogReader, mkMap( mkEntry(persistentStoreName, persistentStoreTopicName), mkEntry(persistentStoreTwoName, persistentStoreTopicName) ), - changelogReader, - logContext); + Collections.emptySet()); stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStore.stateRestoreCallback); @@ -286,12 +288,13 @@ public void shouldRegisterNonPersistentStore() { public void shouldNotRegisterNonLoggedStore() { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, - emptySet(), Task.TaskType.STANDBY, + false, + logContext, stateDirectory, - emptyMap(), changelogReader, - logContext); + emptyMap(), + emptySet()); try { stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); @@ -318,7 +321,7 @@ public void shouldInitializeOffsetsFromCheckpointFile() throws IOException { stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); - stateMgr.initializeStoreOffsetsFromCheckpoint(); + stateMgr.initializeStoreOffsetsFromCheckpoint(true); assertFalse(checkpointFile.exists()); assertEquals(mkSet( @@ -406,7 +409,7 @@ public void shouldOverrideOffsetsWhenRestoreAndProcess() throws IOException { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); try { stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); - stateMgr.initializeStoreOffsetsFromCheckpoint(); + stateMgr.initializeStoreOffsetsFromCheckpoint(true); final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition); assertThat(storeMetadata, notNullValue()); @@ -435,7 +438,7 @@ public void shouldWriteCheckpointForPersistentStore() throws IOException { try { stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); - stateMgr.initializeStoreOffsetsFromCheckpoint(); + stateMgr.initializeStoreOffsetsFromCheckpoint(true); final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition); assertThat(storeMetadata, notNullValue()); @@ -457,7 +460,7 @@ public void shouldNotWriteCheckpointForNonPersistent() throws IOException { try { stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); - stateMgr.initializeStoreOffsetsFromCheckpoint(); + stateMgr.initializeStoreOffsetsFromCheckpoint(true); final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(nonPersistentStorePartition); assertThat(storeMetadata, notNullValue()); @@ -475,12 +478,13 @@ public void shouldNotWriteCheckpointForNonPersistent() throws IOException { public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOException { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, - emptySet(), Task.TaskType.STANDBY, + false, + logContext, stateDirectory, - emptyMap(), changelogReader, - logContext); + emptyMap(), + emptySet()); try { stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); @@ -621,7 +625,7 @@ public void shouldThrowIfLoadCheckpointThrows() throws IOException { writer.close(); try { - stateMgr.initializeStoreOffsetsFromCheckpoint(); + stateMgr.initializeStoreOffsetsFromCheckpoint(true); fail("should have thrown processor state exception when IO exception happens"); } catch (final ProcessorStateException e) { // pass @@ -705,19 +709,58 @@ public void close() { Assert.assertTrue(closedStore.get()); } - private ProcessorStateManager getStateManager(final Task.TaskType taskType) { + @Test + public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { + final long checkpointOffset = 10L; + + final Map offsets = mkMap( + mkEntry(persistentStorePartition, checkpointOffset), + mkEntry(nonPersistentStorePartition, checkpointOffset), + mkEntry(irrelevantPartition, 999L) + ); + checkpoint.write(offsets); + + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE, true); + + try { + stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); + stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); + stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); + + final TaskCorruptedException exception = assertThrows(TaskCorruptedException.class, + () -> stateMgr.initializeStoreOffsetsFromCheckpoint(false)); + + assertEquals(Collections.singletonMap(taskId, stateMgr.changelogPartitions()), exception.corruptedTaskWithChangelogs()); + } finally { + stateMgr.close(); + } + } + + @Test + public void shouldBeAbleToCloseWithoutRegisteringAnyStores() { + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE, true); + + stateMgr.close(); + } + + private ProcessorStateManager getStateManager(final Task.TaskType taskType, final boolean eosEnabled) { return new ProcessorStateManager( taskId, - emptySet(), taskType, + eosEnabled, + logContext, stateDirectory, + changelogReader, mkMap( mkEntry(persistentStoreName, persistentStoreTopicName), mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName), mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName) ), - changelogReader, - logContext); + emptySet()); + } + + private ProcessorStateManager getStateManager(final Task.TaskType taskType) { + return getStateManager(taskType, false); } private MockKeyValueStore getConverterStore() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index c1129f3185b7..7c43409c2b84 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -156,7 +156,7 @@ public void shouldTransitToRunningAfterInitialization() { EasyMock.expectLastCall(); stateManager.registerStore(store2, store2.stateRestoreCallback); EasyMock.expectLastCall(); - + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); task = createStandbyTask(); @@ -188,6 +188,7 @@ public void shouldFlushAndCheckpointStateManagerOnCommit() { EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L)); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); EasyMock.replay(stateManager); task = createStandbyTask(); @@ -239,6 +240,7 @@ public void shouldNotCommitAndThrowOnCloseDirty() { EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes(); stateManager.checkpoint(EasyMock.anyObject()); EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -263,6 +265,7 @@ public void shouldCommitOnCloseClean() { stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); EasyMock.expectLastCall(); EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L)); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); EasyMock.replay(stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -280,6 +283,7 @@ public void shouldCommitOnCloseClean() { @Test public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); EasyMock.expect(stateManager.changelogOffsets()) .andReturn(Collections.singletonMap(partition, 50L)) .andReturn(Collections.singletonMap(partition, 50L)) @@ -311,6 +315,7 @@ public void shouldThrowOnCloseCleanError() { stateManager.close(); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L)); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); EasyMock.replay(stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -326,6 +331,7 @@ public void shouldThrowOnCloseCleanError() { EasyMock.verify(stateManager); EasyMock.reset(stateManager); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); EasyMock.replay(stateManager); } @@ -333,6 +339,7 @@ public void shouldThrowOnCloseCleanError() { public void shouldThrowOnCloseCleanFlushError() { stateManager.flush(); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -347,6 +354,7 @@ public void shouldThrowOnCloseCleanFlushError() { EasyMock.verify(stateManager); EasyMock.reset(stateManager); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); } @@ -354,6 +362,7 @@ public void shouldThrowOnCloseCleanFlushError() { public void shouldThrowOnCloseCleanCheckpointError() { stateManager.checkpoint(EasyMock.anyObject()); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -369,6 +378,7 @@ public void shouldThrowOnCloseCleanCheckpointError() { EasyMock.verify(stateManager); EasyMock.reset(stateManager); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 1f7163f5d13a..e1ca918848bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -30,8 +32,10 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Properties; @@ -39,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -129,6 +134,46 @@ public void shouldBeTrueIfAlreadyHoldsLock() throws Exception { } } + @Test + public void shouldBeAbleToUnlockEvenWithoutLocking() throws Exception { + final TaskId taskId = new TaskId(0, 0); + directory.unlock(taskId); + } + + @Test + public void shouldReportDirectoryEmpty() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + // when task dir first created, it should be empty + assertTrue(directory.directoryForTaskIsEmpty(taskId)); + + // after locking, it should still be empty + directory.lock(taskId); + assertTrue(directory.directoryForTaskIsEmpty(taskId)); + + // after writing checkpoint, it should still be empty + final OffsetCheckpoint checkpointFile = new OffsetCheckpoint(new File(directory.directoryForTask(taskId), CHECKPOINT_FILE_NAME)); + assertTrue(directory.directoryForTaskIsEmpty(taskId)); + + checkpointFile.write(Collections.singletonMap(new TopicPartition("topic", 0), 0L)); + assertTrue(directory.directoryForTaskIsEmpty(taskId)); + + // if some store dir is created, it should not be empty + final File dbDir = new File(new File(directory.directoryForTask(taskId), "db"), "store1"); + + Files.createDirectories(dbDir.getParentFile().toPath()); + Files.createDirectories(dbDir.getAbsoluteFile().toPath()); + + assertFalse(directory.directoryForTaskIsEmpty(taskId)); + + // after wiping out the state dir, the dir should show as empty again + Utils.delete(dbDir.getParentFile()); + assertTrue(directory.directoryForTaskIsEmpty(taskId)); + + directory.unlock(taskId); + assertTrue(directory.directoryForTaskIsEmpty(taskId)); + } + @Test public void shouldThrowProcessorStateException() throws Exception { final TaskId taskId = new TaskId(0, 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java index 8d2b184dafec..a9f831b00be1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java @@ -39,6 +39,7 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static java.util.Collections.emptyList; @@ -149,6 +150,7 @@ public void testRegisterStateStores() throws IOException { expect(stateManager.taskId()).andReturn(taskId); expect(stateDirectory.lock(taskId)).andReturn(true); + expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); final MockKeyValueStore store1 = new MockKeyValueStore("store1", false); final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); @@ -165,7 +167,7 @@ public void testRegisterStateStores() throws IOException { processorContext.register(store2, store2.stateRestoreCallback); expectLastCall(); - stateManager.initializeStoreOffsetsFromCheckpoint(); + stateManager.initializeStoreOffsetsFromCheckpoint(true); expectLastCall(); ctrl.checkOrder(true); @@ -187,6 +189,8 @@ public void testShouldThrowWhenCleanAndWipeStateAreBothTrue() { public void testCloseStateManagerClean() throws IOException { expect(stateManager.taskId()).andReturn(taskId); + expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); + stateManager.close(); expectLastCall(); @@ -205,7 +209,7 @@ public void testCloseStateManagerClean() throws IOException { @Test public void testCloseStateManagerThrowsExceptionWhenClean() throws IOException { expect(stateManager.taskId()).andReturn(taskId); - + expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); stateManager.close(); expectLastCall(); @@ -228,7 +232,7 @@ public void testCloseStateManagerThrowsExceptionWhenClean() throws IOException { @Test public void testCloseStateManagerOnlyThrowsFirstExceptionWhenClean() throws IOException { expect(stateManager.taskId()).andReturn(taskId); - + expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); stateManager.close(); expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); @@ -254,7 +258,7 @@ public void testCloseStateManagerDirtyShallSwallowException() throws IOException final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); expect(stateManager.taskId()).andReturn(taskId); - + expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); stateManager.close(); expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); @@ -277,7 +281,7 @@ public void testCloseStateManagerDirtyShallSwallowException() throws IOException @Test public void testCloseStateManagerWithStateStoreWipeOut() throws IOException { expect(stateManager.taskId()).andReturn(taskId); - + expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); stateManager.close(); expectLastCall(); @@ -302,7 +306,7 @@ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException( mockStatic(Utils.class); expect(stateManager.taskId()).andReturn(taskId); - + expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); stateManager.close(); expectLastCall(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 585952cd800b..032e1375341a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -253,6 +253,7 @@ public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() thr EasyMock.expectLastCall(); EasyMock.expect(stateManager.taskId()).andReturn(taskId); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); stateManager.close(); EasyMock.expectLastCall(); @@ -1017,6 +1018,7 @@ public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctu @Test public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); task = createFaultyStatefulTask(createConfig(false, "100")); @@ -1045,6 +1047,7 @@ public void shouldCommitWhenSuspending() throws IOException { EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, 10L)); recordCollector.commit(EasyMock.eq(Collections.emptyMap())); EasyMock.expectLastCall(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, 10L))); EasyMock.expectLastCall(); EasyMock.replay(recordCollector, stateDirectory, stateManager); @@ -1067,6 +1070,7 @@ public void shouldCommitWhenSuspending() throws IOException { public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws IOException { stateDirectory = EasyMock.createNiceMock(StateDirectory.class); EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(recordCollector, stateDirectory, stateManager); @@ -1084,6 +1088,7 @@ public void shouldNotReInitializeTopologyWhenResuming() throws IOException { EasyMock.expect(recordCollector.offsets()).andThrow(new AssertionError("Should not try to read offsets")).anyTimes(); recordCollector.commit(EasyMock.anyObject()); EasyMock.expectLastCall().andThrow(new AssertionError("Should not try to commit")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); EasyMock.expectLastCall(); @@ -1297,7 +1302,7 @@ public void shouldThrowIfCommittingOnIllegalState() { @Test public void shouldReturnStateManagerChangelogOffsets() { EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition1, 50L)).anyTimes(); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1)); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1)).anyTimes(); EasyMock.replay(stateManager); task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); @@ -1312,13 +1317,11 @@ public void shouldReturnStateManagerChangelogOffsets() { } @Test - public void shouldDoNothingWithCreatedStateOnClose() { - stateManager.close(); - EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called")).anyTimes(); + public void shouldCheckpointWithCreatedStateOnClose() { stateManager.flush(); EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes(); - stateManager.checkpoint(EasyMock.anyObject()); - EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); + stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); + EasyMock.expectLastCall(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1)); EasyMock.replay(stateManager); @@ -1338,6 +1341,7 @@ public void shouldDoNothingWithCreatedStateOnClose() { @Test public void shouldNotCommitAndThrowOnCloseDirty() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); stateManager.close(); EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); // should still flush on close dirty @@ -1374,6 +1378,7 @@ public void shouldNotCommitOnSuspendRestoring() { EasyMock.expectLastCall(); recordCollector.commit(EasyMock.anyObject()); EasyMock.expectLastCall().andThrow(new AssertionError("Should not call this function")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); @@ -1394,6 +1399,7 @@ public void shouldNotCommitOnCloseRestoring() { EasyMock.expectLastCall(); recordCollector.commit(EasyMock.anyObject()); EasyMock.expectLastCall().andThrow(new AssertionError("Should not call this function")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); @@ -1419,6 +1425,7 @@ public void shouldCommitOnCloseClean() { EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); EasyMock.expectLastCall(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(recordCollector, stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -1444,6 +1451,7 @@ public void shouldThrowOnCloseCleanError() { EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); EasyMock.expectLastCall(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)).anyTimes(); stateManager.close(); EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); EasyMock.replay(recordCollector, stateManager); @@ -1462,6 +1470,7 @@ public void shouldThrowOnCloseCleanError() { verify(stateManager); EasyMock.reset(stateManager); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)).anyTimes(); EasyMock.replay(stateManager); } @@ -1478,6 +1487,7 @@ public void shouldThrowOnCloseCleanFlushError() { EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); stateManager.close(); EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(recordCollector, stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -1493,6 +1503,7 @@ public void shouldThrowOnCloseCleanFlushError() { verify(stateManager); EasyMock.reset(stateManager); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); } @@ -1509,6 +1520,8 @@ public void shouldThrowOnCloseCleanCheckpointError() { EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); stateManager.close(); EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); + EasyMock.replay(recordCollector, stateManager); final MetricName metricName = setupCloseTaskMetric(); @@ -1524,11 +1537,15 @@ public void shouldThrowOnCloseCleanCheckpointError() { verify(stateManager); EasyMock.reset(stateManager); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); } @Test public void shouldThrowIfClosingOnIllegalState() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); + EasyMock.replay(stateManager); + task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.closeClean(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 0307b0c7048c..1d7343d002a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -69,6 +69,7 @@ import java.util.Properties; import java.util.Set; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -358,17 +359,17 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, final Set partitions = Collections.singleton(new TopicPartition(topicName, taskId.partition)); final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, - partitions, Task.TaskType.ACTIVE, + EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), + logContext, stateDirectory, - topology.storeToChangelogTopic(), new StoreChangelogReader( new MockTime(), streamsConfig, logContext, clientSupplier.restoreConsumer, new MockStateRestoreListener()), - logContext); + topology.storeToChangelogTopic(), partitions); final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); final RecordCollector recordCollector = new RecordCollectorImpl( logContext, diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 41fef28fa205..65eb17a4beba 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -428,17 +428,18 @@ private void setupTask(final StreamsConfig streamsConfig, final ProcessorStateManager stateManager = new ProcessorStateManager( TASK_ID, - new HashSet<>(partitionsByInputTopic.values()), Task.TaskType.ACTIVE, + EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), + logContext, stateDirectory, - processorTopology.storeToChangelogTopic(), new StoreChangelogReader( mockWallClockTime, streamsConfig, logContext, createRestoreConsumer(processorTopology.storeToChangelogTopic()), stateRestoreListener), - logContext); + processorTopology.storeToChangelogTopic(), + new HashSet<>(partitionsByInputTopic.values())); final RecordCollector recordCollector = new RecordCollectorImpl( logContext, TASK_ID,