Skip to content

Commit

Permalink
[Transform] make shouldStopAtCheckpoint more robust (#70461)
Browse files Browse the repository at this point in the history
shouldStopAtCheckpoint tells transform to stop at the next checkpoint, if
this API is called while a checkpoint is finishing, it can cause a race condition
in state persistence. This is similar to #69551, but this time in a different
place.

With this change _stop?shouldStopAtCheckpoint=true does not call doSaveState
if indexer is shutting down. Still it ensures the job stops after the indexer has
shutdown. Apart from that the change fixes: a logging problem, it adds error
handling in case of a timeout during _stop?shouldStopAtCheckpoint=true. Some
logic has been moved from the task to the indexer.

fixes #70416
  • Loading branch information
Hendrik Muhs committed Mar 22, 2021
1 parent 8d3ca4e commit 0f08edb
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ long getCheckpoint() {
return currentCheckpoint.get();
}

long getAndIncrementCheckpoint() {
return currentCheckpoint.getAndIncrement();
long incrementAndGetCheckpoint() {
return currentCheckpoint.incrementAndGet();
}

void setNumFailureRetries(int numFailureRetries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private enum RunState {
private volatile long lastCheckpointCleanup = 0L;

protected volatile boolean indexerThreadShuttingDown = false;
protected volatile boolean stopCalledDuringIndexerThreadShutdown = false;
protected volatile boolean saveStateRequestedDuringIndexerThreadShutdown = false;

public TransformIndexer(
ThreadPool threadPool,
Expand Down Expand Up @@ -490,7 +490,7 @@ private void finalizeCheckpoint(ActionListener<Void> listener) {
changeCollector.clear();
}

long checkpoint = context.getAndIncrementCheckpoint();
long checkpoint = context.incrementAndGetCheckpoint();
lastCheckpoint = getNextCheckpoint();
nextCheckpoint = null;
// Reset our failure count as we have finished and may start again with a new checkpoint
Expand Down Expand Up @@ -647,7 +647,7 @@ final void setStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener<Vo
} catch (InterruptedException e) {
logger.error(
new ParameterizedMessage(
"[{}] Timed out ({}s) waiting for transform state to be stored.",
"[{}] Interrupt waiting ({}s) for transform state to be stored.",
getJobId(),
PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC
),
Expand All @@ -671,6 +671,14 @@ private synchronized boolean addSetStopAtCheckpointListener(
boolean shouldStopAtCheckpoint,
ActionListener<Void> shouldStopAtCheckpointListener
) throws InterruptedException {

// in case the indexer is already shutting down
if (indexerThreadShuttingDown) {
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
saveStateRequestedDuringIndexerThreadShutdown = true;
return false;
}

IndexerState state = getState();

// in case the indexer isn't running, respond immediately
Expand All @@ -680,9 +688,19 @@ private synchronized boolean addSetStopAtCheckpointListener(
// because save state is async we need to block the call until state is persisted, so that the job can not
// be triggered (ensured by synchronized)
CountDownLatch latch = new CountDownLatch(1);
logger.debug("[{}] persisiting stop at checkpoint", getJobId());

doSaveState(IndexerState.STARTED, getPosition(), () -> { latch.countDown(); });

latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS);
if (latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS) == false) {
logger.error(
new ParameterizedMessage(
"[{}] Timed out ({}s) waiting for transform state to be stored.",
getJobId(),
PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC
)
);
}
return false;
}

Expand Down Expand Up @@ -717,11 +735,14 @@ private synchronized boolean addSetStopAtCheckpointListener(
return true;
}

synchronized void stopAndSaveState() {
synchronized void stopAndMaybeSaveState() {
onStop();
IndexerState state = stop();

if (indexerThreadShuttingDown) {
stopCalledDuringIndexerThreadShutdown = true;
} else {
saveStateRequestedDuringIndexerThreadShutdown = true;
// if stop() returned STOPPED we need to persist state, otherwise the indexer does it for us
} else if (state == IndexerState.STOPPED) {
doSaveState(IndexerState.STOPPED, getPosition(), () -> {});
}
}
Expand Down Expand Up @@ -1109,13 +1130,18 @@ private void configurePageSize(Integer newPageSize) {

private synchronized void startIndexerThreadShutdown() {
indexerThreadShuttingDown = true;
stopCalledDuringIndexerThreadShutdown = false;
saveStateRequestedDuringIndexerThreadShutdown = false;
}

private synchronized void finishIndexerThreadShutdown() {
indexerThreadShuttingDown = false;
if (stopCalledDuringIndexerThreadShutdown) {
doSaveState(IndexerState.STOPPED, getPosition(), () -> {});
if (saveStateRequestedDuringIndexerThreadShutdown) {
// if stop has been called and set shouldStopAtCheckpoint to true,
// we should stop if we just finished a checkpoint
if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) {
stop();
}
doSaveState(getState(), getPosition(), () -> {});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) {

// If state was in a failed state, we should stop immediately
if (wasFailed) {
getIndexer().stopAndSaveState();
getIndexer().stopAndMaybeSaveState();
return;
}

Expand All @@ -363,10 +363,7 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) {
// or has yet to even start one.
// Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time).
(indexerState == IndexerState.STARTED && getIndexer().initialRun())) {
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().stopAndSaveState();
}
getIndexer().stopAndMaybeSaveState();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand Down Expand Up @@ -98,6 +100,9 @@ class MockedTransformIndexer extends TransformIndexer {
// used for synchronizing with the test
private CountDownLatch searchLatch;
private CountDownLatch doProcessLatch;
private CountDownLatch doSaveStateLatch;

private AtomicBoolean saveStateInProgress = new AtomicBoolean(false);

// how many loops to execute until reporting done
private int numberOfLoops;
Expand Down Expand Up @@ -146,6 +151,10 @@ public CountDownLatch createCountDownOnResponseLatch(int count) {
return doProcessLatch = new CountDownLatch(count);
}

public CountDownLatch createAwaitForDoSaveStateLatch(int count) {
return doSaveStateLatch = new CountDownLatch(count);
}

@Override
void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse> responseListener) {
responseListener.onResponse(ONE_HIT_SEARCH_RESPONSE);
Expand Down Expand Up @@ -214,7 +223,21 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next

@Override
protected void doSaveState(IndexerState state, TransformIndexerPosition position, Runnable next) {
// assert that the indexer does not call doSaveState again, while it is still saving state
// this is only useful together with the doSaveStateLatch
assertTrue("doSaveState called again while still in progress", saveStateInProgress.compareAndSet(false, true));
if (doSaveStateLatch != null) {
try {
doSaveStateLatch.await();

} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED;

assertTrue(saveStateInProgress.compareAndSet(true, false));
next.run();
}

Expand Down Expand Up @@ -288,7 +311,7 @@ public void testRetentionPolicyExecution() throws Exception {
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), oneOf(IndexerState.INDEXING, IndexerState.STARTED));

assertBusy(() -> assertEquals(1L, indexer.getLastCheckpoint().getCheckpoint()), 5, TimeUnit.HOURS);
assertBusy(() -> assertEquals(1L, indexer.getLastCheckpoint().getCheckpoint()), 5, TimeUnit.SECONDS);

// delete by query has been executed
assertEquals(1, indexer.getDeleteByQueryCallCount());
Expand Down Expand Up @@ -340,6 +363,63 @@ public void testRetentionPolicyExecution() throws Exception {
}
}

/**
* This test ensures correct handling of async behavior during indexer shutdown
*
* Indexer shutdown is not atomic: 1st the state is set back to e.g. STARTED, afterwards state is stored.
* State is stored async and is IO based, therefore it can take time until this is done.
*
* Between setting the state and storing it, some race condition occurred, this test acts
* as regression test.
*/
public void testInterActionWhileIndexerShutsdown() throws Exception {
TransformConfig config = new TransformConfig(
randomAlphaOfLength(10),
randomSourceConfig(),
randomDestConfig(),
null,
new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)),
null,
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null,
null,
null
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STARTED);

TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
final MockedTransformIndexer indexer = createMockIndexer(
5,
config,
state,
null,
threadPool,
auditor,
new TransformIndexerStats(),
context
);

// add a latch at doSaveState
CountDownLatch saveStateLatch = indexer.createAwaitForDoSaveStateLatch(1);

indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertEquals(indexer.getState(), IndexerState.INDEXING);

assertBusy(() -> assertEquals(IndexerState.STARTED, indexer.getState()), 5, TimeUnit.SECONDS);

// the indexer thread is shutting down, the trigger should be ignored
assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
this.<Void>assertAsync(listener -> setStopAtCheckpoint(indexer, true, listener), v -> {});
saveStateLatch.countDown();

// after the indexer has shutdown, it should check for stop at checkpoint and shutdown
assertBusy(() -> assertEquals(IndexerState.STOPPED, indexer.getState()), 5, TimeUnit.SECONDS);
}

private MockedTransformIndexer createMockIndexer(
int numberOfLoops,
TransformConfig config,
Expand Down Expand Up @@ -370,4 +450,36 @@ private MockedTransformIndexer createMockIndexer(
indexer.initialize();
return indexer;
}

private void setStopAtCheckpoint(
TransformIndexer indexer,
boolean shouldStopAtCheckpoint,
ActionListener<Void> shouldStopAtCheckpointListener
) {
// we need to simulate that this is called from the task, which offloads it to the generic threadpool
CountDownLatch latch = new CountDownLatch(1);
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
indexer.setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener);
latch.countDown();
});
try {
assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail("timed out after 5s");
}
}

private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean listenerCalled = new AtomicBoolean(false);

LatchedActionListener<T> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
furtherTests.accept(r);
}, e -> { fail("got unexpected exception: " + e); }), latch);

function.accept(listener);
assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS));
}

}

0 comments on commit 0f08edb

Please sign in to comment.