From c2ec974e81f1c65aa2f2e43f7f4dc820b1957bca Mon Sep 17 00:00:00 2001 From: Boyang Chen Date: Tue, 3 Mar 2020 21:45:11 -0800 Subject: [PATCH] KAFKA-9618: Directory deletion failure leading to error task RocksDB open (#8186) We should have the following order: 1) close state stores 2) wipe out local directory 3) release directory lock to avoid the issue. There's an known problem that with some FS one cannot delete the lock file while the calling thread still grabs the file lock, and this would be fixed in another ticket. Reviewers: A. Sophie Blee-Goldman , Guozhang Wang --- .../processor/internals/StandbyTask.java | 3 +- .../processor/internals/StateManagerUtil.java | 38 +- .../processor/internals/StreamTask.java | 15 +- .../AbstractJoinIntegrationTest.java | 3 - .../EOSUncleanShutdownIntegrationTest.java | 147 ++++++++ .../internals/StateManagerUtilTest.java | 331 ++++++++++++++++++ .../processor/internals/StreamTaskTest.java | 37 ++ 7 files changed, 549 insertions(+), 25 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 952165b5854eb..60bcfca883916 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -179,7 +179,8 @@ private void close(final boolean clean) { } if (state() == State.CLOSING) { - StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory); + StateManagerUtil.closeStateManager(log, logPrefix, clean, + false, stateMgr, stateDirectory, TaskType.STANDBY); // TODO: if EOS is enabled, we should wipe out the state stores like we did for StreamTask too } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index f0c3d3676728d..17d15cfed1aee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; @@ -31,6 +32,10 @@ import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue; import static org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped; +/** + * Shared functions to handle state store registration and cleanup between + * active and standby tasks. + */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; @@ -78,38 +83,43 @@ static void registerStateStores(final Logger log, log.debug("Initialized state stores"); } - static void wipeStateStores(final Logger log, final ProcessorStateManager stateMgr) { - // we can just delete the whole dir of the task, including the state store images and the checkpoint files - try { - Utils.delete(stateMgr.baseDir()); - } catch (final IOException fatalException) { - // since it is only called under dirty close, we always swallow the exception - log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException); - } - } - /** * @throws ProcessorStateException if there is an error while closing the state manager */ static void closeStateManager(final Logger log, final String logPrefix, final boolean closeClean, + final boolean wipeStateStore, final ProcessorStateManager stateMgr, - final StateDirectory stateDirectory) { + final StateDirectory stateDirectory, + final TaskType taskType) { + if (closeClean && wipeStateStore) { + throw new IllegalArgumentException("State store could not be wiped out during clean close"); + } + ProcessorStateException exception = null; - log.trace("Closing state manager"); final TaskId id = stateMgr.taskId(); + log.trace("Closing state manager for {}", id); + try { stateMgr.close(); + + if (wipeStateStore) { + // we can just delete the whole dir of the task, including the state store images and the checkpoint files + Utils.delete(stateMgr.baseDir()); + } } catch (final ProcessorStateException e) { exception = e; + } catch (final IOException e) { + throw new ProcessorStateException("Failed to wiping state stores for task " + id, e); } finally { try { stateDirectory.unlock(id); } catch (final IOException e) { if (exception == null) { - exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", logPrefix), e); + exception = new ProcessorStateException( + String.format("%sFailed to release state dir lock", logPrefix), e); } } } @@ -118,7 +128,7 @@ static void closeStateManager(final Logger log, if (closeClean) throw exception; else - log.warn("Closing standby task " + id + " uncleanly throws an exception " + exception); + log.warn("Closing {} task {} uncleanly and swallows an exception", taskType, id, exception); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index a6b6bb5f6ba47..bdabf43759d4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -402,6 +402,8 @@ public void closeDirty() { * 3. then flush the record collector * 4. then commit the record collector -- for EOS this is the synchronization barrier * 5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed + * 6. then if we are closing on EOS and dirty, wipe out the state store directory + * 7. finally release the state manager lock * * * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} -- @@ -440,15 +442,14 @@ private void close(final boolean clean) { } if (state() == State.CLOSING) { - // first close state manager (which is idempotent) then close the record collector (which could throw), - // if the latter throws and we re-close dirty which would close the state manager again. - StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory); - // if EOS is enabled, we wipe out the whole state store for unclean close // since they are invalid to use anymore - if (!clean && !eosDisabled) { - StateManagerUtil.wipeStateStores(log, stateMgr); - } + final boolean wipeStateStore = !clean && !eosDisabled; + + // first close state manager (which is idempotent) then close the record collector (which could throw), + // if the latter throws and we re-close dirty which would close the state manager again. + StateManagerUtil.closeStateManager(log, logPrefix, clean, + wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE); executeAndMaybeSwallow(clean, recordCollector::close, "record collector close"); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 7a735cd0ee742..639dda26ee904 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -55,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -93,8 +92,6 @@ public static Collection data() { static final long ANY_UNIQUE_KEY = 0L; StreamsBuilder builder; - int numRecordsExpected = 0; - AtomicBoolean finalResultReached = new AtomicBoolean(false); private final List> input = Arrays.asList( new Input<>(INPUT_TOPIC_LEFT, null), diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java new file mode 100644 index 0000000000000..c82759d77560b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.test.IntegrationTest; + +import org.apache.kafka.test.TestUtils; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Locale; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.junit.Assert.assertFalse; + +/** + * Test the unclean shutdown behavior around state store cleanup. + */ +@Category(IntegrationTest.class) +public class EOSUncleanShutdownIntegrationTest { + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + + @ClassRule + public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory()); + + private static final Properties STREAMS_CONFIG = new Properties(); + private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); + private static final Long COMMIT_INTERVAL = 100L; + + private static final int RECORD_TOTAL = 3; + + @BeforeClass + public static void setupConfigsAndUtils() { + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath()); + } + + @Test + public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException { + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test"; + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + + final String input = "input-topic"; + cleanStateBeforeTest(CLUSTER, input); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream = builder.stream(input); + + final AtomicInteger recordCount = new AtomicInteger(0); + + final KTable valueCounts = inputStream + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as("aggregated_value")); + valueCounts.toStream().peek((key, value) -> { + if (recordCount.incrementAndGet() >= RECORD_TOTAL) { + throw new IllegalStateException("Crash on the " + RECORD_TOTAL + " record"); + } + }); + + final Properties producerConfig = mkProperties(mkMap( + mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer) STRING_SERIALIZER).getClass().getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer) STRING_SERIALIZER).getClass().getName()), + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) + )); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, builder, true); + + final File stateDir = new File( + String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0")); + try { + + IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(), + singletonList(new KeyValueTimestamp<>("k1", "v1", 0L))); + + TestUtils.waitForCondition(stateDir::exists, + "Failed awaiting CreateTopics first request failure"); + + IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(), + asList(new KeyValueTimestamp<>("k2", "v2", 1L), + new KeyValueTimestamp<>("k3", "v3", 2L))); + + TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL, + "Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get()); + } finally { + TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR), + "Expected ERROR state but driver is on " + driver.state()); + + driver.close(); + + assertFalse(stateDir.exists()); + + cleanStateAfterTest(CLUSTER, driver); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java new file mode 100644 index 0000000000000..8d2b184dafec5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.test.MockKeyValueStore; +import org.apache.kafka.test.TestUtils; +import org.easymock.IMocksControl; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.easymock.EasyMock.createStrictControl; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Utils.class) +public class StateManagerUtilTest { + + @Mock(type = MockType.NICE) + private ProcessorStateManager stateManager; + + @Mock(type = MockType.NICE) + private StateDirectory stateDirectory; + + @Mock(type = MockType.NICE) + private ProcessorTopology topology; + + @Mock(type = MockType.NICE) + private InternalProcessorContext processorContext; + + private IMocksControl ctrl; + + private Logger logger = new LogContext("test").logger(AbstractTask.class); + + private final TaskId taskId = new TaskId(0, 0); + + @Before + public void setup() { + ctrl = createStrictControl(); + topology = ctrl.createMock(ProcessorTopology.class); + processorContext = ctrl.createMock(InternalProcessorContext.class); + + stateManager = ctrl.createMock(ProcessorStateManager.class); + stateDirectory = ctrl.createMock(StateDirectory.class); + } + + @Test + public void testRegisterStateStoreWhenTopologyEmpty() { + expect(topology.stateStores()).andReturn(emptyList()); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.registerStateStores(logger, + "logPrefix:", topology, stateManager, stateDirectory, processorContext); + + ctrl.verify(); + } + + @Test + public void testRegisterStateStoreFailToLockStateDirectory() throws IOException { + expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); + + expect(stateManager.taskId()).andReturn(taskId); + + expect(stateDirectory.lock(taskId)).andReturn(false); + + ctrl.checkOrder(true); + ctrl.replay(); + + final LockException thrown = assertThrows(LockException.class, + () -> StateManagerUtil.registerStateStores(logger, "logPrefix:", + topology, stateManager, stateDirectory, processorContext)); + + assertEquals("logPrefix:Failed to lock the state directory for task 0_0", thrown.getMessage()); + + ctrl.verify(); + } + + @Test + public void testRegisterStateStoreLockThrowIOExceptionWrappedAsStreamException() throws IOException { + expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); + + expect(stateManager.taskId()).andReturn(taskId); + + expect(stateDirectory.lock(taskId)).andThrow(new IOException("Fail to lock state dir")); + + ctrl.checkOrder(true); + ctrl.replay(); + + final StreamsException thrown = assertThrows(StreamsException.class, + () -> StateManagerUtil.registerStateStores(logger, "logPrefix:", + topology, stateManager, stateDirectory, processorContext)); + + assertEquals("logPrefix:Fatal error while trying to " + + "lock the state directory for task 0_0", thrown.getMessage()); + assertEquals(IOException.class, thrown.getCause().getClass()); + assertEquals("Fail to lock state dir", thrown.getCause().getMessage()); + + ctrl.verify(); + } + + @Test + public void testRegisterStateStores() throws IOException { + expect(topology.stateStores()) + .andReturn(Arrays.asList(new MockKeyValueStore("store1", false), + new MockKeyValueStore("store2", false))); + + expect(stateManager.taskId()).andReturn(taskId); + + expect(stateDirectory.lock(taskId)).andReturn(true); + + final MockKeyValueStore store1 = new MockKeyValueStore("store1", false); + final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); + + expect(topology.stateStores()).andReturn(Arrays.asList(store1, store2)); + + processorContext.uninitialize(); + expectLastCall(); + processorContext.register(store1, store1.stateRestoreCallback); + expectLastCall(); + + processorContext.uninitialize(); + expectLastCall(); + processorContext.register(store2, store2.stateRestoreCallback); + expectLastCall(); + + stateManager.initializeStoreOffsetsFromCheckpoint(); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.registerStateStores(logger, "logPrefix:", + topology, stateManager, stateDirectory, processorContext); + + ctrl.verify(); + } + + @Test + public void testShouldThrowWhenCleanAndWipeStateAreBothTrue() { + assertThrows(IllegalArgumentException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, true, stateManager, stateDirectory, TaskType.ACTIVE)); + } + + @Test + public void testCloseStateManagerClean() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerThrowsExceptionWhenClean() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + stateDirectory.unlock(taskId); + expectLastCall().andThrow(new IOException("Timeout")); + + ctrl.checkOrder(true); + ctrl.replay(); + + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE)); + + assertEquals("logPrefix:Failed to release state dir lock", thrown.getMessage()); + assertEquals(IOException.class, thrown.getCause().getClass()); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerOnlyThrowsFirstExceptionWhenClean() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); + + // The unlock logic should still be executed. + stateDirectory.unlock(taskId); + expectLastCall().andThrow(new IOException("Timeout")); + + ctrl.checkOrder(true); + ctrl.replay(); + + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE)); + + // Thrown stateMgr exception will not be wrapped. + assertEquals("state manager failed to close", thrown.getMessage()); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerDirtyShallSwallowException() throws IOException { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE); + + ctrl.verify(); + + LogCaptureAppender.unregister(appender); + final List strings = appender.getMessages(); + assertTrue(strings.contains("testClosing ACTIVE task 0_0 uncleanly and swallows an exception")); + } + + @Test + public void testCloseStateManagerWithStateStoreWipeOut() throws IOException { + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + // The `baseDir` will be accessed when attempting to delete the state store. + expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); + + ctrl.verify(); + } + + @Test + public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { + final File unknownFile = new File("/unknown/path"); + mockStatic(Utils.class); + + expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + expectLastCall(); + + expect(stateManager.baseDir()).andReturn(unknownFile); + + Utils.delete(unknownFile); + expectLastCall().andThrow(new IOException("Deletion failed")); + + stateDirectory.unlock(taskId); + expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + replayAll(); + + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); + + assertEquals("Failed to wiping state stores for task 0_0", thrown.getMessage()); + assertEquals(IOException.class, thrown.getCause().getClass()); + + ctrl.verify(); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1662196b8ce19..585952cd800bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; +import org.easymock.IMocksControl; import org.easymock.Mock; import org.easymock.MockType; import org.junit.After; @@ -82,6 +83,7 @@ import static org.apache.kafka.streams.processor.internals.StreamTask.encodeTimestamp; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24; +import static org.easymock.EasyMock.createStrictControl; import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; @@ -241,6 +243,37 @@ public void shouldNotAttemptToLockIfNoStores() { verify(stateDirectory); } + @Test + public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { + final IMocksControl ctrl = createStrictControl(); + final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); + stateDirectory = ctrl.createMock(StateDirectory.class); + + stateManager.registerGlobalStateStores(Collections.emptyList()); + EasyMock.expectLastCall(); + + EasyMock.expect(stateManager.taskId()).andReturn(taskId); + + stateManager.close(); + EasyMock.expectLastCall(); + + // The `baseDir` will be accessed when attempting to delete the state store. + EasyMock.expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); + + stateDirectory.unlock(taskId); + EasyMock.expectLastCall(); + + ctrl.checkOrder(true); + ctrl.replay(); + + task = createStatefulTask(createConfig(true, "100"), true, stateManager); + task.transitionTo(Task.State.CLOSING); + task.closeDirty(); + task = null; + + ctrl.verify(); + } + @Test public void shouldReadCommittedStreamTimeOnInitialize() { stateDirectory = EasyMock.createNiceMock(StateDirectory.class); @@ -1580,6 +1613,10 @@ private StreamTask createFaultyStatefulTask(final StreamsConfig config) { } private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) { + return createStatefulTask(config, logged, stateManager); + } + + private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged, final ProcessorStateManager stateManager) { final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, logged); final ProcessorTopology topology = ProcessorTopologyFactories.with(