From 0f08edb9b9cdfe591f27b55f950eb03b1b2eefb1 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 22 Mar 2021 11:28:27 +0100 Subject: [PATCH] [Transform] make shouldStopAtCheckpoint more robust (#70461) shouldStopAtCheckpoint tells transform to stop at the next checkpoint, if this API is called while a checkpoint is finishing, it can cause a race condition in state persistence. This is similar to #69551, but this time in a different place. With this change _stop?shouldStopAtCheckpoint=true does not call doSaveState if indexer is shutting down. Still it ensures the job stops after the indexer has shutdown. Apart from that the change fixes: a logging problem, it adds error handling in case of a timeout during _stop?shouldStopAtCheckpoint=true. Some logic has been moved from the task to the indexer. fixes #70416 --- .../transforms/TransformContext.java | 4 +- .../transforms/TransformIndexer.java | 46 +++++-- .../transform/transforms/TransformTask.java | 7 +- .../transforms/TransformIndexerTests.java | 114 +++++++++++++++++- 4 files changed, 153 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index c6877a3d75344..f5383e264e8a5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -84,8 +84,8 @@ long getCheckpoint() { return currentCheckpoint.get(); } - long getAndIncrementCheckpoint() { - return currentCheckpoint.getAndIncrement(); + long incrementAndGetCheckpoint() { + return currentCheckpoint.incrementAndGet(); } void setNumFailureRetries(int numFailureRetries) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index c1a1c63ce1aa3..33050a4bd2c0a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -125,7 +125,7 @@ private enum RunState { private volatile long lastCheckpointCleanup = 0L; protected volatile boolean indexerThreadShuttingDown = false; - protected volatile boolean stopCalledDuringIndexerThreadShutdown = false; + protected volatile boolean saveStateRequestedDuringIndexerThreadShutdown = false; public TransformIndexer( ThreadPool threadPool, @@ -490,7 +490,7 @@ private void finalizeCheckpoint(ActionListener listener) { changeCollector.clear(); } - long checkpoint = context.getAndIncrementCheckpoint(); + long checkpoint = context.incrementAndGetCheckpoint(); lastCheckpoint = getNextCheckpoint(); nextCheckpoint = null; // Reset our failure count as we have finished and may start again with a new checkpoint @@ -647,7 +647,7 @@ final void setStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener shouldStopAtCheckpointListener ) throws InterruptedException { + + // in case the indexer is already shutting down + if (indexerThreadShuttingDown) { + context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); + saveStateRequestedDuringIndexerThreadShutdown = true; + return false; + } + IndexerState state = getState(); // in case the indexer isn't running, respond immediately @@ -680,9 +688,19 @@ private synchronized boolean addSetStopAtCheckpointListener( // because save state is async we need to block the call until state is persisted, so that the job can not // be triggered (ensured by synchronized) CountDownLatch latch = new CountDownLatch(1); + logger.debug("[{}] persisiting stop at checkpoint", getJobId()); + doSaveState(IndexerState.STARTED, getPosition(), () -> { latch.countDown(); }); - latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS); + if (latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS) == false) { + logger.error( + new ParameterizedMessage( + "[{}] Timed out ({}s) waiting for transform state to be stored.", + getJobId(), + PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC + ) + ); + } return false; } @@ -717,11 +735,14 @@ private synchronized boolean addSetStopAtCheckpointListener( return true; } - synchronized void stopAndSaveState() { + synchronized void stopAndMaybeSaveState() { onStop(); + IndexerState state = stop(); + if (indexerThreadShuttingDown) { - stopCalledDuringIndexerThreadShutdown = true; - } else { + saveStateRequestedDuringIndexerThreadShutdown = true; + // if stop() returned STOPPED we need to persist state, otherwise the indexer does it for us + } else if (state == IndexerState.STOPPED) { doSaveState(IndexerState.STOPPED, getPosition(), () -> {}); } } @@ -1109,13 +1130,18 @@ private void configurePageSize(Integer newPageSize) { private synchronized void startIndexerThreadShutdown() { indexerThreadShuttingDown = true; - stopCalledDuringIndexerThreadShutdown = false; + saveStateRequestedDuringIndexerThreadShutdown = false; } private synchronized void finishIndexerThreadShutdown() { indexerThreadShuttingDown = false; - if (stopCalledDuringIndexerThreadShutdown) { - doSaveState(IndexerState.STOPPED, getPosition(), () -> {}); + if (saveStateRequestedDuringIndexerThreadShutdown) { + // if stop has been called and set shouldStopAtCheckpoint to true, + // we should stop if we just finished a checkpoint + if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) { + stop(); + } + doSaveState(getState(), getPosition(), () -> {}); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index c33d057b43d67..a575aa1bebdeb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -346,7 +346,7 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { // If state was in a failed state, we should stop immediately if (wasFailed) { - getIndexer().stopAndSaveState(); + getIndexer().stopAndMaybeSaveState(); return; } @@ -363,10 +363,7 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { // or has yet to even start one. // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time). (indexerState == IndexerState.STARTED && getIndexer().initialRun())) { - IndexerState state = getIndexer().stop(); - if (state == IndexerState.STOPPED) { - getIndexer().stopAndSaveState(); - } + getIndexer().stopAndMaybeSaveState(); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 4da79b9fa3c9c..55dc482cd356c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -9,6 +9,7 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -98,6 +100,9 @@ class MockedTransformIndexer extends TransformIndexer { // used for synchronizing with the test private CountDownLatch searchLatch; private CountDownLatch doProcessLatch; + private CountDownLatch doSaveStateLatch; + + private AtomicBoolean saveStateInProgress = new AtomicBoolean(false); // how many loops to execute until reporting done private int numberOfLoops; @@ -146,6 +151,10 @@ public CountDownLatch createCountDownOnResponseLatch(int count) { return doProcessLatch = new CountDownLatch(count); } + public CountDownLatch createAwaitForDoSaveStateLatch(int count) { + return doSaveStateLatch = new CountDownLatch(count); + } + @Override void doGetInitialProgress(SearchRequest request, ActionListener responseListener) { responseListener.onResponse(ONE_HIT_SEARCH_RESPONSE); @@ -214,7 +223,21 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, TransformIndexerPosition position, Runnable next) { + // assert that the indexer does not call doSaveState again, while it is still saving state + // this is only useful together with the doSaveStateLatch + assertTrue("doSaveState called again while still in progress", saveStateInProgress.compareAndSet(false, true)); + if (doSaveStateLatch != null) { + try { + doSaveStateLatch.await(); + + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED; + + assertTrue(saveStateInProgress.compareAndSet(true, false)); next.run(); } @@ -288,7 +311,7 @@ public void testRetentionPolicyExecution() throws Exception { assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), oneOf(IndexerState.INDEXING, IndexerState.STARTED)); - assertBusy(() -> assertEquals(1L, indexer.getLastCheckpoint().getCheckpoint()), 5, TimeUnit.HOURS); + assertBusy(() -> assertEquals(1L, indexer.getLastCheckpoint().getCheckpoint()), 5, TimeUnit.SECONDS); // delete by query has been executed assertEquals(1, indexer.getDeleteByQueryCallCount()); @@ -340,6 +363,63 @@ public void testRetentionPolicyExecution() throws Exception { } } + /** + * This test ensures correct handling of async behavior during indexer shutdown + * + * Indexer shutdown is not atomic: 1st the state is set back to e.g. STARTED, afterwards state is stored. + * State is stored async and is IO based, therefore it can take time until this is done. + * + * Between setting the state and storing it, some race condition occurred, this test acts + * as regression test. + */ + public void testInterActionWhileIndexerShutsdown() throws Exception { + TransformConfig config = new TransformConfig( + randomAlphaOfLength(10), + randomSourceConfig(), + randomDestConfig(), + null, + new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)), + null, + randomPivotConfig(), + null, + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + null, + null, + null, + null + ); + AtomicReference state = new AtomicReference<>(IndexerState.STARTED); + + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + final MockedTransformIndexer indexer = createMockIndexer( + 5, + config, + state, + null, + threadPool, + auditor, + new TransformIndexerStats(), + context + ); + + // add a latch at doSaveState + CountDownLatch saveStateLatch = indexer.createAwaitForDoSaveStateLatch(1); + + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertEquals(indexer.getState(), IndexerState.INDEXING); + + assertBusy(() -> assertEquals(IndexerState.STARTED, indexer.getState()), 5, TimeUnit.SECONDS); + + // the indexer thread is shutting down, the trigger should be ignored + assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + this.assertAsync(listener -> setStopAtCheckpoint(indexer, true, listener), v -> {}); + saveStateLatch.countDown(); + + // after the indexer has shutdown, it should check for stop at checkpoint and shutdown + assertBusy(() -> assertEquals(IndexerState.STOPPED, indexer.getState()), 5, TimeUnit.SECONDS); + } + private MockedTransformIndexer createMockIndexer( int numberOfLoops, TransformConfig config, @@ -370,4 +450,36 @@ private MockedTransformIndexer createMockIndexer( indexer.initialize(); return indexer; } + + private void setStopAtCheckpoint( + TransformIndexer indexer, + boolean shouldStopAtCheckpoint, + ActionListener shouldStopAtCheckpointListener + ) { + // we need to simulate that this is called from the task, which offloads it to the generic threadpool + CountDownLatch latch = new CountDownLatch(1); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + indexer.setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener); + latch.countDown(); + }); + try { + assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + fail("timed out after 5s"); + } + } + + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + function.accept(listener); + assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS)); + } + }