Skip to content

Commit

Permalink
KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint f…
Browse files Browse the repository at this point in the history
…ile does not exist (#5657)

Reviewers: Guozhang Wang <[email protected]>, John Roesler <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
mjsax authored Oct 8, 2018
1 parent 788ebe8 commit 88bca08
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,10 @@ public ThreadCache getCache() {
public void initialized() {
initialized = true;
}

@Override
public void uninitialize() {
initialized = false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public final String applicationId() {
}

@Override
public final Set<TopicPartition> partitions() {
public Set<TopicPartition> partitions() {
return partitions;
}

Expand Down Expand Up @@ -226,6 +226,9 @@ void registerStateStores() {
}
}

void reinitializeStateStoresForPartitions(final TopicPartition partitions) {
stateMgr.reinitializeStateStoresForPartitions(partitions, processorContext);
}

/**
* @throws ProcessorStateException if there is an error while closing the state manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class AssignedTasks implements RestoringTasks {
// IQ may access this map.
private Map<TaskId, Task> running = new ConcurrentHashMap<>();
private Map<TopicPartition, Task> runningByPartition = new HashMap<>();
private Map<TopicPartition, Task> restoringByPartition = new HashMap<>();
private Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<>();
private int committed = 0;


Expand Down Expand Up @@ -122,7 +122,8 @@ Set<TopicPartition> initializeNewTasks() {
try {
if (!entry.getValue().initializeStateStores()) {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
addToRestoring(entry.getValue());
// cast is safe, because StandbyTasks always returns `true` in `initializeStateStores()` above
addToRestoring((StreamTask) entry.getValue());
} else {
transitionToRunning(entry.getValue(), readyPartitions);
}
Expand Down Expand Up @@ -278,7 +279,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>
return false;
}

private void addToRestoring(final Task task) {
private void addToRestoring(final StreamTask task) {
restoring.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
restoringByPartition.put(topicPartition, task);
Expand Down Expand Up @@ -307,7 +308,7 @@ private void transitionToRunning(final Task task, final Set<TopicPartition> read
}

@Override
public Task restoringTaskFor(final TopicPartition partition) {
public StreamTask restoringTaskFor(final TopicPartition partition) {
return restoringByPartition.get(partition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public interface InternalProcessorContext extends ProcessorContext {
* Mark this contex as being initialized
*/
void initialized();

/**
* Mark this context as being uninitialized
*/
void uninitialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
Expand All @@ -33,9 +35,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;


public class ProcessorStateManager implements StateManager {
Expand All @@ -49,6 +53,7 @@ public class ProcessorStateManager implements StateManager {
private final String logPrefix;
private final boolean isStandby;
private final ChangelogReader changelogReader;
private final boolean eosEnabled;
private final Map<String, StateStore> stores;
private final Map<String, StateStore> globalStores;
private final Map<TopicPartition, Long> offsetLimits;
Expand Down Expand Up @@ -98,6 +103,7 @@ public ProcessorStateManager(final TaskId taskId,
checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
checkpointedOffsets = new HashMap<>(checkpoint.read());

this.eosEnabled = eosEnabled;
if (eosEnabled) {
// delete the checkpoint file after finish loading its stored offsets
checkpoint.delete();
Expand Down Expand Up @@ -176,6 +182,61 @@ public Map<TopicPartition, Long> checkpointed() {
return partitionsAndOffsets;
}

void reinitializeStateStoresForPartitions(final TopicPartition topicPartition,
final InternalProcessorContext processorContext) {
final Map<String, String> changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic);
final Set<String> storeToBeReinitialized = new HashSet<>();
final Map<String, StateStore> storesCopy = new HashMap<>(stores);

checkpointedOffsets.remove(topicPartition);
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));

if (!eosEnabled) {
try {
checkpoint.write(checkpointedOffsets);
} catch (final IOException fatalException) {
log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stores, fatalException);
throw new StreamsException("Failed to reinitialize stores.", fatalException);
}
}

for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) {
final StateStore stateStore = entry.getValue();
final String storeName = stateStore.name();
if (storeToBeReinitialized.contains(storeName)) {
try {
stateStore.close();
} catch (final RuntimeException ignoreAndSwallow) { /* ignore */ }
processorContext.uninitialize();
stores.remove(entry.getKey());

try {
Utils.delete(new File(baseDir + File.separator + "rocksdb" + File.separator + storeName));
} catch (final IOException fatalException) {
log.error("Failed to reinitialize store {}.", storeName, fatalException);
throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException);
}

try {
Utils.delete(new File(baseDir + File.separator + storeName));
} catch (final IOException fatalException) {
log.error("Failed to reinitialize store {}.", storeName, fatalException);
throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException);
}

stateStore.init(processorContext, stateStore);
}
}
}

private Map<String, String> inverseOneToOneMap(final Map<String, String> origin) {
final Map<String, String> reversedMap = new HashMap<>();
for (final Map.Entry<String, String> entry : origin.entrySet()) {
reversedMap.put(entry.getValue(), entry.getKey());
}
return reversedMap;
}

List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(final TopicPartition storePartition,
final List<ConsumerRecord<byte[], byte[]>> records) {
final long limit = offsetLimit(storePartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
import org.apache.kafka.common.TopicPartition;

public interface RestoringTasks {
Task restoringTaskFor(final TopicPartition partition);
StreamTask restoringTaskFor(final TopicPartition partition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public class StateRestorer {

static final int NO_CHECKPOINT = -1;

private final Long checkpoint;
private final long offsetLimit;
private final boolean persistent;
private final String storeName;
private final TopicPartition partition;
private final CompositeRestoreListener compositeRestoreListener;

private long checkpointOffset;
private long restoredOffset;
private long startingOffset;
private long endingOffset;
Expand All @@ -45,7 +45,7 @@ public class StateRestorer {
final String storeName) {
this.partition = partition;
this.compositeRestoreListener = compositeRestoreListener;
this.checkpoint = checkpoint;
this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint;
this.offsetLimit = offsetLimit;
this.persistent = persistent;
this.storeName = storeName;
Expand All @@ -56,7 +56,15 @@ public TopicPartition partition() {
}

long checkpoint() {
return checkpoint == null ? NO_CHECKPOINT : checkpoint;
return checkpointOffset;
}

void setCheckpointOffset(final long checkpointOffset) {
this.checkpointOffset = checkpointOffset;
}

public String storeName() {
return storeName;
}

void restoreStarted() {
Expand All @@ -67,7 +75,8 @@ void restoreDone() {
compositeRestoreListener.onRestoreEnd(partition, storeName, restoredNumRecords());
}

void restoreBatchCompleted(long currentRestoredOffset, int numRestored) {
void restoreBatchCompleted(final long currentRestoredOffset,
final int numRestored) {
compositeRestoreListener.onBatchRestored(partition, storeName, currentRestoredOffset, numRestored);
}

Expand All @@ -79,7 +88,7 @@ boolean isPersistent() {
return persistent;
}

void setUserRestoreListener(StateRestoreListener userRestoreListener) {
void setUserRestoreListener(final StateRestoreListener userRestoreListener) {
this.compositeRestoreListener.setUserRestoreListener(userRestoreListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void register(final StateRestorer restorer) {
*/
public Collection<TopicPartition> restore(final RestoringTasks active) {
if (!needsInitializing.isEmpty()) {
initialize();
initialize(active);
}

if (needsRestoring.isEmpty()) {
Expand All @@ -90,7 +90,7 @@ public Collection<TopicPartition> restore(final RestoringTasks active) {
return completed();
}

private void initialize() {
private void initialize(final RestoringTasks active) {
if (!consumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + consumer.subscription() + ")");
}
Expand All @@ -99,8 +99,8 @@ private void initialize() {
// the needsInitializing map is not empty, meaning we do not know the metadata for some of them yet
refreshChangelogInfo();

Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
for (Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet()) {
final Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
for (final Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet()) {
final TopicPartition topicPartition = entry.getKey();
if (hasPartition(topicPartition)) {
initializable.put(entry.getKey(), entry.getValue());
Expand Down Expand Up @@ -144,11 +144,12 @@ private void initialize() {

// set up restorer for those initializable
if (!initializable.isEmpty()) {
startRestoration(initializable);
startRestoration(initializable, active);
}
}

private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) {
private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
final RestoringTasks active) {
log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());

final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
Expand All @@ -157,26 +158,47 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ

final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
for (final StateRestorer restorer : initialized.values()) {
final TopicPartition restoringPartition = restorer.partition();
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
consumer.seek(restorer.partition(), restorer.checkpoint());
logRestoreOffsets(restorer.partition(),
restorer.checkpoint(),
endOffsets.get(restorer.partition()));
restorer.setStartingOffset(consumer.position(restorer.partition()));
consumer.seek(restoringPartition, restorer.checkpoint());
logRestoreOffsets(restoringPartition,
restorer.checkpoint(),
endOffsets.get(restoringPartition));
restorer.setStartingOffset(consumer.position(restoringPartition));
restorer.restoreStarted();
} else {
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
consumer.seekToBeginning(Collections.singletonList(restoringPartition));
needsPositionUpdate.add(restorer);
}
}

for (final StateRestorer restorer : needsPositionUpdate) {
final long position = consumer.position(restorer.partition());
logRestoreOffsets(restorer.partition(),
position,
endOffsets.get(restorer.partition()));
restorer.setStartingOffset(position);
restorer.restoreStarted();
final TopicPartition restoringPartition = restorer.partition();
final StreamTask task = active.restoringTaskFor(restoringPartition);

// If checkpoint does not exist it means the task was not shutdown gracefully before;
// and in this case if EOS is turned on we should wipe out the state and re-initialize the task
if (task.eosEnabled) {
log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " +
"Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restoringPartition);

// we move the partitions here, because they will be added back within
// `task.reinitializeStateStoresForPartitions()` that calls `register()` internally again
needsInitializing.remove(restoringPartition);
restorer.setCheckpointOffset(consumer.position(restoringPartition));

task.reinitializeStateStoresForPartitions(restoringPartition);
stateRestorers.get(restoringPartition).restoreStarted();
} else {
log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restoringPartition);

final long position = consumer.position(restoringPartition);
logRestoreOffsets(restoringPartition,
position,
endOffsets.get(restoringPartition));
restorer.setStartingOffset(position);
restorer.restoreStarted();
}
}

needsRestoring.putAll(initialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
// the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
//
// the failure gets inject after 20 committed and 30 uncommitted records got received
// the failure gets inject after 20 committed and 10 uncommitted records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records and the state stores should contain the correct sums
// per key (even if some records got processed twice)
Expand All @@ -402,7 +402,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
streams.start();

final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L);

final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
dataBeforeFailure.addAll(committedDataBeforeFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

public class AssignedTasksTest {

private final Task t1 = EasyMock.createMock(Task.class);
private final Task t2 = EasyMock.createMock(Task.class);
private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
private final StreamTask t2 = EasyMock.createMock(StreamTask.class);
private final TopicPartition tp1 = new TopicPartition("t1", 0);
private final TopicPartition tp2 = new TopicPartition("t2", 0);
private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
Expand Down
Loading

0 comments on commit 88bca08

Please sign in to comment.