diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 952165b5854eb..60bcfca883916 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -179,7 +179,8 @@ private void close(final boolean clean) { } if (state() == State.CLOSING) { - StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory); + StateManagerUtil.closeStateManager(log, logPrefix, clean, + false, stateMgr, stateDirectory, TaskType.STANDBY); // TODO: if EOS is enabled, we should wipe out the state stores like we did for StreamTask too } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index f0c3d3676728d..17d15cfed1aee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; @@ -31,6 +32,10 @@ import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue; import static org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped; +/** + * Shared functions to handle state store registration and cleanup between + * active and standby tasks. + */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; @@ -78,38 +83,43 @@ static void registerStateStores(final Logger log, log.debug("Initialized state stores"); } - static void wipeStateStores(final Logger log, final ProcessorStateManager stateMgr) { - // we can just delete the whole dir of the task, including the state store images and the checkpoint files - try { - Utils.delete(stateMgr.baseDir()); - } catch (final IOException fatalException) { - // since it is only called under dirty close, we always swallow the exception - log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException); - } - } - /** * @throws ProcessorStateException if there is an error while closing the state manager */ static void closeStateManager(final Logger log, final String logPrefix, final boolean closeClean, + final boolean wipeStateStore, final ProcessorStateManager stateMgr, - final StateDirectory stateDirectory) { + final StateDirectory stateDirectory, + final TaskType taskType) { + if (closeClean && wipeStateStore) { + throw new IllegalArgumentException("State store could not be wiped out during clean close"); + } + ProcessorStateException exception = null; - log.trace("Closing state manager"); final TaskId id = stateMgr.taskId(); + log.trace("Closing state manager for {}", id); + try { stateMgr.close(); + + if (wipeStateStore) { + // we can just delete the whole dir of the task, including the state store images and the checkpoint files + Utils.delete(stateMgr.baseDir()); + } } catch (final ProcessorStateException e) { exception = e; + } catch (final IOException e) { + throw new ProcessorStateException("Failed to wiping state stores for task " + id, e); } finally { try { stateDirectory.unlock(id); } catch (final IOException e) { if (exception == null) { - exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", logPrefix), e); + exception = new ProcessorStateException( + String.format("%sFailed to release state dir lock", logPrefix), e); } } } @@ -118,7 +128,7 @@ static void closeStateManager(final Logger log, if (closeClean) throw exception; else - log.warn("Closing standby task " + id + " uncleanly throws an exception " + exception); + log.warn("Closing {} task {} uncleanly and swallows an exception", taskType, id, exception); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index a6b6bb5f6ba47..bdabf43759d4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -402,6 +402,8 @@ public void closeDirty() { * 3. then flush the record collector * 4. then commit the record collector -- for EOS this is the synchronization barrier * 5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed + * 6. then if we are closing on EOS and dirty, wipe out the state store directory + * 7. finally release the state manager lock * * * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} -- @@ -440,15 +442,14 @@ private void close(final boolean clean) { } if (state() == State.CLOSING) { - // first close state manager (which is idempotent) then close the record collector (which could throw), - // if the latter throws and we re-close dirty which would close the state manager again. - StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory); - // if EOS is enabled, we wipe out the whole state store for unclean close // since they are invalid to use anymore - if (!clean && !eosDisabled) { - StateManagerUtil.wipeStateStores(log, stateMgr); - } + final boolean wipeStateStore = !clean && !eosDisabled; + + // first close state manager (which is idempotent) then close the record collector (which could throw), + // if the latter throws and we re-close dirty which would close the state manager again. + StateManagerUtil.closeStateManager(log, logPrefix, clean, + wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE); executeAndMaybeSwallow(clean, recordCollector::close, "record collector close"); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 7a735cd0ee742..639dda26ee904 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -55,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -93,8 +92,6 @@ public static Collection data() { static final long ANY_UNIQUE_KEY = 0L; StreamsBuilder builder; - int numRecordsExpected = 0; - AtomicBoolean finalResultReached = new AtomicBoolean(false); private final List> input = Arrays.asList( new Input<>(INPUT_TOPIC_LEFT, null), diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java new file mode 100644 index 0000000000000..c82759d77560b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.test.IntegrationTest; + +import org.apache.kafka.test.TestUtils; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Locale; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.junit.Assert.assertFalse; + +/** + * Test the unclean shutdown behavior around state store cleanup. + */ +@Category(IntegrationTest.class) +public class EOSUncleanShutdownIntegrationTest { + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + + @ClassRule + public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory()); + + private static final Properties STREAMS_CONFIG = new Properties(); + private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); + private static final Long COMMIT_INTERVAL = 100L; + + private static final int RECORD_TOTAL = 3; + + @BeforeClass + public static void setupConfigsAndUtils() { + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath()); + } + + @Test + public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException { + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test"; + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + + final String input = "input-topic"; + cleanStateBeforeTest(CLUSTER, input); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream = builder.stream(input); + + final AtomicInteger recordCount = new AtomicInteger(0); + + final KTable valueCounts = inputStream + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as("aggregated_value")); + valueCounts.toStream().peek((key, value) -> { + if (recordCount.incrementAndGet() >= RECORD_TOTAL) { + throw new IllegalStateException("Crash on the " + RECORD_TOTAL + " record"); + } + }); + + final Properties producerConfig = mkProperties(mkMap( + mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer) STRING_SERIALIZER).getClass().getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer) STRING_SERIALIZER).getClass().getName()), + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) + )); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, builder, true); + + final File stateDir = new File( + String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0")); + try { + + IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(), + singletonList(new KeyValueTimestamp<>("k1", "v1", 0L))); + + TestUtils.waitForCondition(stateDir::exists, + "Failed awaiting CreateTopics first request failure"); + + IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(), + asList(new KeyValueTimestamp<>("k2", "v2", 1L), + new KeyValueTimestamp<>("k3", "v3", 2L))); + + TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL, + "Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get()); + } finally { + TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR), + "Expected ERROR state but driver is on " + driver.state()); + + driver.close(); + + assertFalse(stateDir.exists()); + + cleanStateAfterTest(CLUSTER, driver); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java new file mode 100644 index 0000000000000..8d2b184dafec5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.test.MockKeyValueStore; +import org.apache.kafka.test.TestUtils; +import org.easymock.IMocksControl; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.easymock.EasyMock.createStrictControl; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Utils.class) +public class StateManagerUtilTest { + + @Mock(type = MockType.NICE) + private ProcessorStateManager stateManager; + + @Mock(type = MockType.NICE) + private StateDirectory stateDirectory; + + @Mock(type = MockType.NICE) + private ProcessorTopology topology; + + @Mock(type = MockType.NICE) + private InternalProcessorContext processorContext; + + private IMocksControl ctrl; + + private Logger logger = new LogContext("test").logger(AbstractTask.class); + + private final TaskId taskId = new TaskId(0, 0); + + @Before + public void setup() { + ctrl = createStrictControl(); + topology = ctrl.createMock(ProcessorTopology.class); + processorContext = ctrl.createMock(InternalProcessorContext.class); + + stateManager = ctrl.createMock(ProcessorStateManager.class); + stateDirectory = ctrl.createMock(StateDirectory.class); + } + + @Test + public void testRegisterStateStoreWhenTopologyEmpty() { + expect(topology.stateStores()).andReturn(emptyList()); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.registerStateStores(logger, + "logPrefix:", topology, stateManager, stateDirectory, processorContext); + + ctrl.verify(); + } + + @Test + public void testRegisterStateStoreFailToLockStateDirectory() throws IOException { + expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); + + expect(stateManager.taskId()).andReturn(taskId); + + expect(stateDirectory.lock(taskId)).andReturn(false); + + ctrl.checkOrder(true); + ctrl.replay(); + + final LockException thrown = assertThrows(LockException.class, + () -> StateManagerUtil.registerStateStores(logger, "logPrefix:", + topology, stateManager, stateDirectory, processorContext)); + + assertEquals("logPrefix:Failed to lock the state directory for task 0_0", thrown.getMessage()); + + ctrl.verify(); + } + + @Test + public void testRegisterStateStoreLockThrowIOExceptionWrappedAsStreamException() throws IOException { + expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); + + expect(stateManager.taskId()).andReturn(taskId); + + expect(stateDirectory.lock(taskId)).andThrow(new IOException("Fail to lock state dir")); + + ctrl.checkOrder(true); + ctrl.replay(); + + final StreamsException thrown = assertThrows(StreamsException.class, + () -> StateManagerUtil.registerStateStores(logger, "logPrefix:", + topology, stateManager, stateDirectory, processorContext)); + + assertEquals("logPrefix:Fatal error while trying to " + + "lock the state directory for task 0_0", thrown.getMessage()); + assertEquals(IOException.class, thrown.getCause().getClass()); + assertEquals("Fail to lock state dir", thrown.getCause().getMessage()); + + ctrl.verify(); + } + + @Test + public void testRegisterStateStores() throws IOException { + expect(topology.stateStores()) + .andReturn(Arrays.asList(new MockKeyValueStore("store1", false), + new MockKeyValueStore("store2", false))); + + expect(stateManager.taskId()).andReturn(taskId); + + expect(stateDirectory.lock(taskId)).andReturn(true); + + final MockKeyValueStore store1 = new MockKeyValueStore("store1", false); + final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); + + expect(topology.stateStores()).andReturn(Arrays.asList(store1, store2)); + + processorContext.uninitialize(); + expectLastCall(); + processorContext.register(store1, store1.stateRestoreCallback); + expectLastCall(); + + processorContext.uninitialize(); + expectLastCall(); + processorContext.register(store2, store2.stateRestoreCallback); + expectLastCall(); + + stateManager.initializeStoreOffsetsFromCheckpoint(); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.registerStateStores(logger, "logPrefix:", + topology, stateManager, stateDirectory, processorContext); + + ctrl.verify(); + } + + @Test + public void testShouldThrowWhenCleanAndWipeStateAreBothTrue() { + assertThrows(IllegalArgumentException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, true, stateManager, stateDirectory, TaskType.ACTIVE)); + } + + @Test + public void testCloseStateManagerClean() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerThrowsExceptionWhenClean() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + stateDirectory.unlock(taskId); + expectLastCall().andThrow(new IOException("Timeout")); + + ctrl.checkOrder(true); + ctrl.replay(); + + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE)); + + assertEquals("logPrefix:Failed to release state dir lock", thrown.getMessage()); + assertEquals(IOException.class, thrown.getCause().getClass()); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerOnlyThrowsFirstExceptionWhenClean() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); + + // The unlock logic should still be executed. + stateDirectory.unlock(taskId); + expectLastCall().andThrow(new IOException("Timeout")); + + ctrl.checkOrder(true); + ctrl.replay(); + + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE)); + + // Thrown stateMgr exception will not be wrapped. + assertEquals("state manager failed to close", thrown.getMessage()); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerDirtyShallSwallowException() throws IOException { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE); + + ctrl.verify(); + + LogCaptureAppender.unregister(appender); + final List strings = appender.getMessages(); + assertTrue(strings.contains("testClosing ACTIVE task 0_0 uncleanly and swallows an exception")); + } + + @Test + public void testCloseStateManagerWithStateStoreWipeOut() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + // The `baseDir` will be accessed when attempting to delete the state store. + expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { + final File unknownFile = new File("/unknown/path"); + mockStatic(Utils.class); + + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + expect(stateManager.baseDir()).andReturn(unknownFile); + + Utils.delete(unknownFile); + expectLastCall().andThrow(new IOException("Deletion failed")); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + replayAll(); + + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); + + assertEquals("Failed to wiping state stores for task 0_0", thrown.getMessage()); + assertEquals(IOException.class, thrown.getCause().getClass()); + + ctrl.verify(); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1662196b8ce19..585952cd800bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; +import org.easymock.IMocksControl; import org.easymock.Mock; import org.easymock.MockType; import org.junit.After; @@ -82,6 +83,7 @@ import static org.apache.kafka.streams.processor.internals.StreamTask.encodeTimestamp; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24; +import static org.easymock.EasyMock.createStrictControl; import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; @@ -241,6 +243,37 @@ public void shouldNotAttemptToLockIfNoStores() { verify(stateDirectory); } + @Test + public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { + final IMocksControl ctrl = createStrictControl(); + final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); + stateDirectory = ctrl.createMock(StateDirectory.class); + + stateManager.registerGlobalStateStores(Collections.emptyList()); + EasyMock.expectLastCall(); + + EasyMock.expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + EasyMock.expectLastCall(); + + // The `baseDir` will be accessed when attempting to delete the state store. + EasyMock.expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); + + stateDirectory.unlock(taskId); + EasyMock.expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + task = createStatefulTask(createConfig(true, "100"), true, stateManager); + task.transitionTo(Task.State.CLOSING); + task.closeDirty(); + task = null; + + ctrl.verify(); + } + @Test public void shouldReadCommittedStreamTimeOnInitialize() { stateDirectory = EasyMock.createNiceMock(StateDirectory.class); @@ -1580,6 +1613,10 @@ private StreamTask createFaultyStatefulTask(final StreamsConfig config) { } private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) { + return createStatefulTask(config, logged, stateManager); + } + + private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged, final ProcessorStateManager stateManager) { final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, logged); final ProcessorTopology topology = ProcessorTopologyFactories.with(