Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Check store directory empty to decide whether throw task corrupted exception with EOS #8180

Merged
merged 11 commits into from
Mar 6, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

/**
* Indicates a specific task is corrupted and need to be re-initialized. It can be thrown when
Expand All @@ -32,15 +32,15 @@
*/
public class TaskCorruptedException extends StreamsException {

private final Map<TaskId, Set<TopicPartition>> taskWithChangelogs;
private final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs;

public TaskCorruptedException(final Map<TaskId, Set<TopicPartition>> taskWithChangelogs) {
public TaskCorruptedException(final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs) {
super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized");

this.taskWithChangelogs = taskWithChangelogs;
}

public Map<TaskId, Set<TopicPartition>> corruptedTaskWithChangelogs() {
public Map<TaskId, Collection<TopicPartition>> corruptedTaskWithChangelogs() {
return taskWithChangelogs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Collection<TopicPartition> changelogPartitions() {
}

@Override
public void markChangelogAsCorrupted(final Set<TopicPartition> partitions) {
public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
stateMgr.markChangelogAsCorrupted(partitions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,

final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
partitions,
Task.TaskType.ACTIVE,
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
logContext,
stateDirectory,
topology.storeToChangelogTopic(),
storeChangelogReader,
logContext
topology.storeToChangelogTopic(),
partitions
);

if (threadProducer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.StateRestoreCallback;
Expand All @@ -34,10 +35,10 @@
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.lang.String.format;
Expand Down Expand Up @@ -141,6 +142,7 @@ public String toString() {
private final TaskId taskId;
private final String logPrefix;
private final TaskType taskType;
private final boolean eosEnabled;
private final ChangelogRegister changelogReader;
private final Map<String, String> storeToChangelogTopic;
private final Collection<TopicPartition> sourcePartitions;
Expand All @@ -152,28 +154,29 @@ public String toString() {
private final File baseDir;
private final OffsetCheckpoint checkpointFile;

public static String storeChangelogTopic(final String applicationId,
final String storeName) {
public static String storeChangelogTopic(final String applicationId, final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}

/**
* @throws ProcessorStateException if the task directory does not exist and could not be created
*/
public ProcessorStateManager(final TaskId taskId,
final Collection<TopicPartition> sources,
final TaskType taskType,
final boolean eosEnabled,
final LogContext logContext,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I just re-arranged the parameter orders and below I use the log-prefix from the log-context rather than creating a new string, no critical changes here.

final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic,
final ChangelogRegister changelogReader,
final LogContext logContext) throws ProcessorStateException {
this.logPrefix = format("task [%s] ", taskId);
this.log = logContext.logger(ProcessorStateManager.class);
final Map<String, String> storeToChangelogTopic,
final Collection<TopicPartition> sourcePartitions) throws ProcessorStateException {

this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.taskId = taskId;
this.taskType = taskType;
this.sourcePartitions = sources;
this.eosEnabled = eosEnabled;
this.changelogReader = changelogReader;
this.sourcePartitions = sourcePartitions;
this.storeToChangelogTopic = storeToChangelogTopic;

this.baseDir = stateDirectory.directoryForTask(taskId);
Expand All @@ -195,7 +198,7 @@ public StateStore getGlobalStore(final String name) {
}

// package-private for test only
void initializeStoreOffsetsFromCheckpoint() {
void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
try {
final Map<TopicPartition, Long> loadedCheckpoints = checkpointFile.read();

Expand All @@ -211,11 +214,21 @@ void initializeStoreOffsetsFromCheckpoint() {
log.debug("State store {} initialized from checkpoint with offset {} at changelog {}",
store.stateStore.name(), store.offset, store.changelogPartition);
} else {
// TODO K9113: for EOS when there's no checkpointed offset, we should treat it as TaskCorrupted

log.info("State store {} did not find checkpoint offset, hence would " +
// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file
// and hence we are uncertain the the current local state only contains committed data;
// in that case we need to treat it as a task-corrupted exception
if (eosEnabled && !storeDirIsEmpty) {
log.warn("State store {} did not find checkpoint offsets while stores are not empty, " +
"since under EOS it has the risk of getting uncommitted data in stores we have to " +
"treat it as a task corruption error and wipe out the local state of task {} " +
"before re-bootstrapping", store.stateStore.name(), taskId);

throw new TaskCorruptedException(Collections.singletonMap(taskId, changelogPartitions()));
} else {
log.info("State store {} did not find checkpoint offset, hence would " +
"default to the starting offset at changelog {}",
store.stateStore.name(), store.changelogPartition);
store.stateStore.name(), store.changelogPartition);
}
}
}
}
Expand All @@ -225,6 +238,8 @@ void initializeStoreOffsetsFromCheckpoint() {
}

checkpointFile.delete();
} catch (final TaskCorruptedException e) {
throw e;
} catch (final IOException | RuntimeException e) {
// both IOException or runtime exception like number parsing can throw
throw new ProcessorStateException(format("%sError loading and deleting checkpoint file when creating the state manager",
Expand Down Expand Up @@ -287,7 +302,7 @@ Collection<TopicPartition> changelogPartitions() {
return changelogOffsets().keySet();
}

void markChangelogAsCorrupted(final Set<TopicPartition> partitions) {
void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) {
for (final StateStoreMetadata storeMetadata : stores.values()) {
if (partitions.contains(storeMetadata.changelogPartition)) {
storeMetadata.corrupted = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;

class StandbyTaskCreator {
private final InternalTopologyBuilder builder;
private final StreamsConfig config;
Expand Down Expand Up @@ -71,12 +73,13 @@ Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCre
if (topology.hasStateWithChangelogs()) {
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
partitions,
Task.TaskType.STANDBY,
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
logContext,
stateDirectory,
topology.storeToChangelogTopic(),
storeChangelogReader,
logContext
topology.storeToChangelogTopic(),
partitions
);

final StandbyTask task = new StandbyTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.HashMap;
import java.util.regex.Pattern;

import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;

/**
* Manages the directories where the state of Tasks owned by a {@link StreamThread} are
* stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not
Expand Down Expand Up @@ -105,6 +107,20 @@ public File directoryForTask(final TaskId taskId) {
return taskDir;
}

/**
* Decide if the directory of the task is empty or not
*/
boolean directoryForTaskIsEmpty(final TaskId taskId) {
final File taskDir = directoryForTask(taskId);

final File[] storeDirs = taskDir.listFiles(pathname ->
!pathname.getName().equals(LOCK_FILE_NAME) &&
!pathname.getName().equals(CHECKPOINT_FILE_NAME));

// if the task is stateless, storeDirs would be null
return storeDirs == null || storeDirs.length == 0;
}

/**
* Get or create the directory for the global stores.
* @return directory for the global stores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
Expand All @@ -27,6 +29,9 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.state.internals.RecordConverters.identity;
import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
Expand Down Expand Up @@ -71,6 +76,8 @@ static void registerStateStores(final Logger log,
}
log.debug("Acquired state directory lock");

final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id);

// We should only load checkpoint AFTER the corresponding state directory lock has been acquired and
// the state stores have been registered; we should not try to load at the state manager construction time.
// See https://issues.apache.org/jira/browse/KAFKA-8574
Expand All @@ -79,7 +86,8 @@ static void registerStateStores(final Logger log,
store.init(processorContext, store);
log.trace("Registered state store {}", store.name());
}
stateMgr.initializeStoreOffsetsFromCheckpoint();

stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
log.debug("Initialized state stores");
}

Expand All @@ -103,10 +111,15 @@ static void closeStateManager(final Logger log,
log.trace("Closing state manager for {}", id);

try {
final Map<TopicPartition, Long> emptyOffsets = stateMgr.changelogPartitions().stream()
.collect(Collectors.toMap(Function.identity(), entry -> ListOffsetResponse.UNKNOWN_OFFSET));

stateMgr.close();

if (wipeStateStore) {
// we can just delete the whole dir of the task, including the state store images and the checkpoint files
// we can just delete the whole dir of the task, including the state store images and the checkpoint files,
// and then we write an empty checkpoint file indicating that the previous close is graceful and we just
// need to re-bootstrap the restoration from the beginning
Utils.delete(stateMgr.baseDir());
}
} catch (final ProcessorStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public void restore() {
"truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
" it later.", e.getClass().getName(), e.partitions());

final Map<TaskId, Set<TopicPartition>> taskWithCorruptedChangelogs = new HashMap<>();
final Map<TaskId, Collection<TopicPartition>> taskWithCorruptedChangelogs = new HashMap<>();
for (final TopicPartition partition : e.partitions()) {
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,48 +413,50 @@ public void closeDirty() {
*/
private void close(final boolean clean) {
if (state() == State.CREATED) {
// the task is created and not initialized, do nothing
transitionTo(State.CLOSING);
} else {
if (state() == State.RUNNING) {
closeTopology(clean);
// the task is created and not initialized, just re-write the checkpoint file
executeAndMaybeSwallow(clean, () -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an improvement I want to add along with the PR: since we delete the checkpoint file after completed loading, and before we initialize to RESTORING if there's an exception we could lose that checkpoint. So here in Restoring / Created state upon closing I also added the checkpoint logic here.

stateMgr.checkpoint(Collections.emptyMap());
}, "state manager checkpoint");

if (clean) {
commitState();
// whenever we have successfully committed state, it is safe to checkpoint
// the state as well no matter if EOS is enabled or not
stateMgr.checkpoint(checkpointableOffsets());
} else {
executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush");
}
transitionTo(State.CLOSING);
} else if (state() == State.RUNNING) {
closeTopology(clean);

transitionTo(State.CLOSING);
} else if (state() == State.RESTORING) {
executeAndMaybeSwallow(clean, () -> {
stateMgr.flush();
stateMgr.checkpoint(Collections.emptyMap());
}, "state manager flush and checkpoint");

transitionTo(State.CLOSING);
} else if (state() == State.SUSPENDED) {
// do not need to commit / checkpoint, since when suspending we've already committed the state
transitionTo(State.CLOSING);
if (clean) {
commitState();
// whenever we have successfully committed state, it is safe to checkpoint
// the state as well no matter if EOS is enabled or not
stateMgr.checkpoint(checkpointableOffsets());
} else {
executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush");
}

if (state() == State.CLOSING) {
// if EOS is enabled, we wipe out the whole state store for unclean close
// since they are invalid to use anymore
final boolean wipeStateStore = !clean && !eosDisabled;
transitionTo(State.CLOSING);
} else if (state() == State.RESTORING) {
executeAndMaybeSwallow(clean, () -> {
stateMgr.flush();
stateMgr.checkpoint(Collections.emptyMap());
}, "state manager flush and checkpoint");

// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean,
wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE);
transitionTo(State.CLOSING);
} else if (state() == State.SUSPENDED) {
// do not need to commit / checkpoint, since when suspending we've already committed the state
transitionTo(State.CLOSING);
}

executeAndMaybeSwallow(clean, recordCollector::close, "record collector close");
} else {
throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
}
if (state() == State.CLOSING) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have added the stores, but then before transiting to RESTORING an exception happens; hence here I always call closeStateManager which would just be an no-op if the lock is not grabbed / stores not added.

// if EOS is enabled, we wipe out the whole state store for unclean close
// since they are invalid to use anymore
final boolean wipeStateStore = !clean && !eosDisabled;

// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean,
wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE);

executeAndMaybeSwallow(clean, recordCollector::close, "record collector close");
} else {
throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
}

partitionGroup.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ int getAssignmentErrorCode() {
return assignmentErrorCode.get();
}


private final Time time;
private final Logger log;
private final String logPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ enum TaskType {
*/
Map<TopicPartition, Long> changelogOffsets();

void markChangelogAsCorrupted(final Set<TopicPartition> partitions);
void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);

default Map<TopicPartition, Long> purgableOffsets() {
return Collections.emptyMap();
Expand Down
Loading