Skip to content

Commit

Permalink
KAFKA-9618: Directory deletion failure leading to error task RocksDB …
Browse files Browse the repository at this point in the history
…open (#8186)

We should have the following order:

1) close state stores
2) wipe out local directory
3) release directory lock

to avoid the issue. There's an known problem that with some FS one cannot delete the lock file while the calling thread still grabs the file lock, and this would be fixed in another ticket.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
Boyang Chen authored Mar 4, 2020
1 parent 69b670a commit c2ec974
Show file tree
Hide file tree
Showing 7 changed files with 549 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ private void close(final boolean clean) {
}

if (state() == State.CLOSING) {
StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory);
StateManagerUtil.closeStateManager(log, logPrefix, clean,
false, stateMgr, stateDirectory, TaskType.STANDBY);

// TODO: if EOS is enabled, we should wipe out the state stores like we did for StreamTask too
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

Expand All @@ -31,6 +32,10 @@
import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
import static org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;

/**
* Shared functions to handle state store registration and cleanup between
* active and standby tasks.
*/
final class StateManagerUtil {
static final String CHECKPOINT_FILE_NAME = ".checkpoint";

Expand Down Expand Up @@ -78,38 +83,43 @@ static void registerStateStores(final Logger log,
log.debug("Initialized state stores");
}

static void wipeStateStores(final Logger log, final ProcessorStateManager stateMgr) {
// we can just delete the whole dir of the task, including the state store images and the checkpoint files
try {
Utils.delete(stateMgr.baseDir());
} catch (final IOException fatalException) {
// since it is only called under dirty close, we always swallow the exception
log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException);
}
}

/**
* @throws ProcessorStateException if there is an error while closing the state manager
*/
static void closeStateManager(final Logger log,
final String logPrefix,
final boolean closeClean,
final boolean wipeStateStore,
final ProcessorStateManager stateMgr,
final StateDirectory stateDirectory) {
final StateDirectory stateDirectory,
final TaskType taskType) {
if (closeClean && wipeStateStore) {
throw new IllegalArgumentException("State store could not be wiped out during clean close");
}

ProcessorStateException exception = null;
log.trace("Closing state manager");

final TaskId id = stateMgr.taskId();
log.trace("Closing state manager for {}", id);

try {
stateMgr.close();

if (wipeStateStore) {
// we can just delete the whole dir of the task, including the state store images and the checkpoint files
Utils.delete(stateMgr.baseDir());
}
} catch (final ProcessorStateException e) {
exception = e;
} catch (final IOException e) {
throw new ProcessorStateException("Failed to wiping state stores for task " + id, e);
} finally {
try {
stateDirectory.unlock(id);
} catch (final IOException e) {
if (exception == null) {
exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", logPrefix), e);
exception = new ProcessorStateException(
String.format("%sFailed to release state dir lock", logPrefix), e);
}
}
}
Expand All @@ -118,7 +128,7 @@ static void closeStateManager(final Logger log,
if (closeClean)
throw exception;
else
log.warn("Closing standby task " + id + " uncleanly throws an exception " + exception);
log.warn("Closing {} task {} uncleanly and swallows an exception", taskType, id, exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ public void closeDirty() {
* 3. then flush the record collector
* 4. then commit the record collector -- for EOS this is the synchronization barrier
* 5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
* 6. then if we are closing on EOS and dirty, wipe out the state store directory
* 7. finally release the state manager lock
* </pre>
*
* @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
Expand Down Expand Up @@ -440,15 +442,14 @@ private void close(final boolean clean) {
}

if (state() == State.CLOSING) {
// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory);

// if EOS is enabled, we wipe out the whole state store for unclean close
// since they are invalid to use anymore
if (!clean && !eosDisabled) {
StateManagerUtil.wipeStateStores(log, stateMgr);
}
final boolean wipeStateStore = !clean && !eosDisabled;

// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean,
wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE);

executeAndMaybeSwallow(clean, recordCollector::close, "record collector close");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
Expand Down Expand Up @@ -93,8 +92,6 @@ public static Collection<Object[]> data() {
static final long ANY_UNIQUE_KEY = 0L;

StreamsBuilder builder;
int numRecordsExpected = 0;
AtomicBoolean finalResultReached = new AtomicBoolean(false);

private final List<Input<String>> input = Arrays.asList(
new Input<>(INPUT_TOPIC_LEFT, null),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;

import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.junit.Assert.assertFalse;

/**
* Test the unclean shutdown behavior around state store cleanup.
*/
@Category(IntegrationTest.class)
public class EOSUncleanShutdownIntegrationTest {

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

@ClassRule
public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory());

private static final Properties STREAMS_CONFIG = new Properties();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Long COMMIT_INTERVAL = 100L;

private static final int RECORD_TOTAL = 3;

@BeforeClass
public static void setupConfigsAndUtils() {
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);

STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath());
}

@Test
public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);

final String input = "input-topic";
cleanStateBeforeTest(CLUSTER, input);

final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> inputStream = builder.stream(input);

final AtomicInteger recordCount = new AtomicInteger(0);

final KTable<String, String> valueCounts = inputStream
.groupByKey()
.aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as("aggregated_value"));
valueCounts.toStream().peek((key, value) -> {
if (recordCount.incrementAndGet() >= RECORD_TOTAL) {
throw new IllegalStateException("Crash on the " + RECORD_TOTAL + " record");
}
});

final Properties producerConfig = mkProperties(mkMap(
mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
));
final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, builder, true);

final File stateDir = new File(
String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0"));
try {

IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(),
singletonList(new KeyValueTimestamp<>("k1", "v1", 0L)));

TestUtils.waitForCondition(stateDir::exists,
"Failed awaiting CreateTopics first request failure");

IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(),
asList(new KeyValueTimestamp<>("k2", "v2", 1L),
new KeyValueTimestamp<>("k3", "v3", 2L)));

TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL,
"Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get());
} finally {
TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR),
"Expected ERROR state but driver is on " + driver.state());

driver.close();

assertFalse(stateDir.exists());

cleanStateAfterTest(CLUSTER, driver);
}
}
}
Loading

0 comments on commit c2ec974

Please sign in to comment.