Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9618: Directory deletion failure leading to error task RocksDB open #8186

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ private void close(final boolean clean) {
}

if (state() == State.CLOSING) {
StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory);
StateManagerUtil.closeStateManager(log, logPrefix, clean,
false, stateMgr, stateDirectory, TaskType.STANDBY);

// TODO: if EOS is enabled, we should wipe out the state stores like we did for StreamTask too
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

Expand All @@ -31,6 +32,10 @@
import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
import static org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;

/**
* Shared functions to handle state store registration and cleanup between
* active and standby tasks.
*/
final class StateManagerUtil {
static final String CHECKPOINT_FILE_NAME = ".checkpoint";

Expand Down Expand Up @@ -78,38 +83,43 @@ static void registerStateStores(final Logger log,
log.debug("Initialized state stores");
}

static void wipeStateStores(final Logger log, final ProcessorStateManager stateMgr) {
// we can just delete the whole dir of the task, including the state store images and the checkpoint files
try {
Utils.delete(stateMgr.baseDir());
} catch (final IOException fatalException) {
// since it is only called under dirty close, we always swallow the exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we now always wipe the state stores (even on clean close)? cc/ @guozhangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not wipe state stores on clean close, but only in dirty close.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wipeStateStores is only triggered if it is dirty close, under EOS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I wasn't looking at the latest trunk, I see you fixed that in a recent PR. In that case we should leave the comment?

log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException);
}
}

/**
* @throws ProcessorStateException if there is an error while closing the state manager
*/
static void closeStateManager(final Logger log,
final String logPrefix,
final boolean closeClean,
final boolean wipeStateStore,
final ProcessorStateManager stateMgr,
final StateDirectory stateDirectory) {
final StateDirectory stateDirectory,
final TaskType taskType) {
if (closeClean && wipeStateStore) {
throw new IllegalArgumentException("State store could not be wiped out during clean close");
}

ProcessorStateException exception = null;
log.trace("Closing state manager");

final TaskId id = stateMgr.taskId();
log.trace("Closing state manager for {}", id);

try {
stateMgr.close();

if (wipeStateStore) {
// we can just delete the whole dir of the task, including the state store images and the checkpoint files
Utils.delete(stateMgr.baseDir());
}
} catch (final ProcessorStateException e) {
exception = e;
} catch (final IOException e) {
throw new ProcessorStateException("Failed to wiping state stores for task " + id, e);
} finally {
try {
stateDirectory.unlock(id);
} catch (final IOException e) {
if (exception == null) {
exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", logPrefix), e);
exception = new ProcessorStateException(
String.format("%sFailed to release state dir lock", logPrefix), e);
}
}
}
Expand All @@ -118,7 +128,7 @@ static void closeStateManager(final Logger log,
if (closeClean)
throw exception;
else
log.warn("Closing standby task " + id + " uncleanly throws an exception " + exception);
log.warn("Closing {} task {} uncleanly and swallows an exception", taskType, id, exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ public void closeDirty() {
* 3. then flush the record collector
* 4. then commit the record collector -- for EOS this is the synchronization barrier
* 5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
* 6. then if we are closing on EOS and dirty, wipe out the state store directory
* 7. finally release the state manager lock
* </pre>
*
* @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
Expand Down Expand Up @@ -440,15 +442,14 @@ private void close(final boolean clean) {
}

if (state() == State.CLOSING) {
// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory);

// if EOS is enabled, we wipe out the whole state store for unclean close
// since they are invalid to use anymore
if (!clean && !eosDisabled) {
StateManagerUtil.wipeStateStores(log, stateMgr);
}
final boolean wipeStateStore = !clean && !eosDisabled;

// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean,
wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE);

executeAndMaybeSwallow(clean, recordCollector::close, "record collector close");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
Expand Down Expand Up @@ -93,8 +92,6 @@ public static Collection<Object[]> data() {
static final long ANY_UNIQUE_KEY = 0L;

StreamsBuilder builder;
int numRecordsExpected = 0;
AtomicBoolean finalResultReached = new AtomicBoolean(false);

private final List<Input<String>> input = Arrays.asList(
new Input<>(INPUT_TOPIC_LEFT, null),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;

import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.junit.Assert.assertFalse;

/**
* Test the unclean shutdown behavior around state store cleanup.
*/
@Category(IntegrationTest.class)
public class EOSUncleanShutdownIntegrationTest {

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

@ClassRule
public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory());

private static final Properties STREAMS_CONFIG = new Properties();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Long COMMIT_INTERVAL = 100L;

private static final int RECORD_TOTAL = 3;

@BeforeClass
public static void setupConfigsAndUtils() {
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);

STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath());
}

@Test
public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);

final String input = "input-topic";
cleanStateBeforeTest(CLUSTER, input);

final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> inputStream = builder.stream(input);

final AtomicInteger recordCount = new AtomicInteger(0);

final KTable<String, String> valueCounts = inputStream
.groupByKey()
.aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as("aggregated_value"));
valueCounts.toStream().peek((key, value) -> {
if (recordCount.incrementAndGet() >= RECORD_TOTAL) {
throw new IllegalStateException("Crash on the " + RECORD_TOTAL + " record");
}
});

final Properties producerConfig = mkProperties(mkMap(
mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
));
final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, builder, true);

final File stateDir = new File(
String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0"));
try {

IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(),
singletonList(new KeyValueTimestamp<>("k1", "v1", 0L)));

TestUtils.waitForCondition(stateDir::exists,
"Failed awaiting CreateTopics first request failure");

IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(),
asList(new KeyValueTimestamp<>("k2", "v2", 1L),
new KeyValueTimestamp<>("k3", "v3", 2L)));

TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL,
"Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get());
abbccdda marked this conversation as resolved.
Show resolved Hide resolved
} finally {
TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR),
"Expected ERROR state but driver is on " + driver.state());

driver.close();

assertFalse(stateDir.exists());

cleanStateAfterTest(CLUSTER, driver);
}
}
}
Loading