diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 851620032cf7c..c040ebae576f6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -165,9 +165,7 @@ public synchronized IndexerState stop() { }); // a throttled search might be waiting to be executed, stop it - if (scheduledNextSearch != null) { - scheduledNextSearch.reschedule(TimeValue.ZERO); - } + runSearchImmediately(); return indexerState; } @@ -243,6 +241,17 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { } } + /** + * Checks if the state should be persisted, if true doSaveState is called before continuing. Inherited classes + * can override this, to provide a better logic, when state should be saved. + * + * @return true if state should be saved, false if not. + */ + protected boolean triggerSaveState() { + // implementors can overwrite this with something more intelligent than every-50 + return (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0); + } + /** * Re-schedules the search request if necessary, this method can be called to apply a change * in maximumRequestsPerSecond immediately @@ -256,11 +265,28 @@ protected void rethrottle() { reQueueThrottledSearch(); } + /** + * Re-schedules the current search request to run immediately, iff one is scheduled. + * + * Call this if you need the indexer to fast forward a scheduled(in case it's throttled) search once in order to + * complete a full cycle. + */ + protected void runSearchImmediately() { + if (scheduledNextSearch != null) { + scheduledNextSearch.reschedule(TimeValue.ZERO); + } + } + // protected, so it can be overwritten by tests protected long getTimeNanos() { return System.nanoTime(); } + // only for testing purposes + protected ScheduledRunnable getScheduledNextSearch() { + return scheduledNextSearch; + } + /** * Called to get max docs per second. To be overwritten if * throttling is implemented, the default -1 turns off throttling. @@ -490,7 +516,11 @@ private void onSearchResponse(SearchResponse searchResponse) { JobPosition newPosition = iterationResult.getPosition(); position.set(newPosition); - nextSearch(); + if (triggerSaveState()) { + doSaveState(IndexerState.INDEXING, newPosition, () -> { nextSearch(); }); + } else { + nextSearch(); + } } catch (Exception e) { finishWithFailure(e); } @@ -509,11 +539,8 @@ private void onBulkResponse(BulkResponse response, JobPosition position) { } try { - // TODO probably something more intelligent than every-50 is needed - if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) { - doSaveState(IndexerState.INDEXING, position, () -> { - nextSearch(); - }); + if (triggerSaveState()) { + doSaveState(IndexerState.INDEXING, position, () -> { nextSearch(); }); } else { nextSearch(); } @@ -547,8 +574,8 @@ protected void nextSearch() { () -> triggerNextSearch(executionDelay.getNanos()) ); - // corner case: if for whatever reason stop() has been called meanwhile fast forward - if (getState().equals(IndexerState.STOPPING)) { + // corner case: if meanwhile stop() has been called or state persistence has been requested: fast forward, run search now + if (getState().equals(IndexerState.STOPPING) || triggerSaveState()) { scheduledNextSearch.reschedule(TimeValue.ZERO); } return; @@ -563,6 +590,8 @@ private void triggerNextSearch(long waitTimeInNanos) { return; } + // cleanup the scheduled runnable + scheduledNextSearch = null; stats.markStartSearch(); lastSearchStartTimeNanos = getTimeNanos(); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index c52effdf8eb8a..70ef9b9b5de36 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -220,7 +220,7 @@ public void testContinuousTransformUpdate() throws Exception { public void testStopWaitForCheckpoint() throws Exception { String indexName = "wait-for-checkpoint-reviews"; - String transformId = "data-frame-transform-wait-for-checkpoint"; + String transformId = "transform-wait-for-checkpoint"; createReviewsIndex(indexName, 1000); Map groups = new HashMap<>(); @@ -243,10 +243,11 @@ public void testStopWaitForCheckpoint() throws Exception { ).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build(); assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop - stopTransform(transformId, false, null, true); + assertTrue(stopTransform(transformId, false, null, true).isAcknowledged()); // Wait until the first checkpoint waitUntilCheckpoint(config.getId(), 1L); @@ -258,7 +259,31 @@ public void testStopWaitForCheckpoint() throws Exception { assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), equalTo(1000L)); }); - stopTransform(config.getId()); + int additionalRuns = randomIntBetween(1, 10); + + for (int i = 0; i < additionalRuns; ++i) { + // index some more docs using a new user + long timeStamp = Instant.now().toEpochMilli() - 1_000; + long user = 42 + i; + indexMoreDocs(timeStamp, user, indexName); + assertTrue(startTransformWithRetryOnConflict(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + + boolean waitForCompletion = randomBoolean(); + assertTrue(stopTransform(transformId, waitForCompletion, null, true).isAcknowledged()); + + assertBusy(() -> { + TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); + assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); + }); + TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); + assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); + } + + TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); + assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); + assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), greaterThan(1000L)); + + assertTrue(stopTransform(transformId).isAcknowledged()); deleteTransform(config.getId()); } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java index 7cb8d927cfa7d..e51de6a303c6f 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.integration; import org.apache.logging.log4j.Level; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequest; @@ -55,6 +56,7 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -140,6 +142,24 @@ protected StartTransformResponse startTransform(String id, RequestOptions option } } + // workaround for https://github.com/elastic/elasticsearch/issues/62204 + protected StartTransformResponse startTransformWithRetryOnConflict(String id, RequestOptions options) throws Exception { + ElasticsearchStatusException lastConflict = null; + for (int retries = 10; retries > 0; --retries) { + try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { + return restClient.transform().startTransform(new StartTransformRequest(id), options); + } catch (ElasticsearchStatusException e) { + if (RestStatus.CONFLICT.equals(e.status()) == false) { + throw e; + } + + lastConflict = e; + Thread.sleep(5); + } + } + throw lastConflict; + } + protected AcknowledgedResponse deleteTransform(String id) throws IOException { try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { AcknowledgedResponse response = restClient.transform().deleteTransform(new DeleteTransformRequest(id), RequestOptions.DEFAULT); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/DateHistogramGroupByIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/DateHistogramGroupByIT.java index 73ad2263a03f6..f26279602dc78 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/DateHistogramGroupByIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/DateHistogramGroupByIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.transform.integration.continuous; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -38,7 +37,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/60781") public class DateHistogramGroupByIT extends ContinuousTestCase { private static final String NAME = "continuous-date-histogram-pivot-test"; private static final String MISSING_BUCKET_KEY = ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC")) diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TermsGroupByIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TermsGroupByIT.java index 4827cb5f5b591..0b1c189d9c398 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TermsGroupByIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TermsGroupByIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.transform.integration.continuous; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -34,7 +33,6 @@ import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/60781") public class TermsGroupByIT extends ContinuousTestCase { private static final String NAME = "continuous-terms-pivot-test"; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 68770a03e074f..883dcd2f62c22 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -40,6 +40,7 @@ import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; @@ -97,34 +98,6 @@ class ClientTransformIndexer extends TransformIndexer { context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); } - void persistShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener shouldStopAtCheckpointListener) { - if (context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint - || getState() == IndexerState.STOPPED - || getState() == IndexerState.STOPPING) { - shouldStopAtCheckpointListener.onResponse(null); - return; - } - TransformState state = new TransformState( - context.getTaskState(), - getState(), - getPosition(), - context.getCheckpoint(), - context.getStateReason(), - getProgress(), - null, // Node attributes - shouldStopAtCheckpoint - ); - doSaveState(state, ActionListener.wrap(r -> { - // We only want to update this internal value if it is persisted as such - context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); - logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", getJobId(), shouldStopAtCheckpoint); - shouldStopAtCheckpointListener.onResponse(null); - }, statsExc -> { - logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]", getJobId(), shouldStopAtCheckpoint); - shouldStopAtCheckpointListener.onFailure(statsExc); - })); - } - @Override protected void doNextSearch(long waitTimeInNanos, ActionListener nextPhase) { if (context.getTaskState() == TransformTaskState.FAILED) { @@ -249,6 +222,9 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p return; } + // getting the listeners that registered till now, in theory a new listener could sneak in between this line + // and the next, however this is benign: we would store `shouldStopAtCheckpoint = true` twice which is ok + Collection> saveStateListenersAtTheMomentOfCalling = saveStateListeners.getAndSet(null); boolean shouldStopAtCheckpoint = context.shouldStopAtCheckpoint(); // If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states @@ -267,6 +243,9 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p // If the state is `STOPPED` this means that TransformTask#stop was called while we were checking for changes. // Allow the stop call path to continue if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) { + if (saveStateListenersAtTheMomentOfCalling != null) { + ActionListener.onResponse(saveStateListenersAtTheMomentOfCalling, null); + } next.run(); return; } @@ -308,11 +287,33 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p ); logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); - doSaveState(state, ActionListener.wrap(r -> next.run(), e -> next.run())); + // we might need to call the save state listeners, but do not want to stop rolling + doSaveState(state, ActionListener.wrap(r -> { + try { + if (saveStateListenersAtTheMomentOfCalling != null) { + ActionListener.onResponse(saveStateListenersAtTheMomentOfCalling, r); + } + } catch (Exception onResponseException) { + String msg = LoggerMessageFormat.format("[{}] failed notifying saveState listeners, ignoring.", getJobId()); + logger.warn(msg, onResponseException); + } finally { + next.run(); + } + }, e -> { + try { + if (saveStateListenersAtTheMomentOfCalling != null) { + ActionListener.onFailure(saveStateListenersAtTheMomentOfCalling, e); + } + } catch (Exception onFailureException) { + String msg = LoggerMessageFormat.format("[{}] failed notifying saveState listeners, ignoring.", getJobId()); + logger.warn(msg, onFailureException); + } finally { + next.run(); + } + })); } private void doSaveState(TransformState state, ActionListener listener) { - // This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = getSeqNoPrimaryTermAndIndex(); @@ -328,6 +329,7 @@ private void doSaveState(TransformState state, ActionListener listener) { if (state.getTaskState().equals(TransformTaskState.STOPPED)) { context.shutdown(); } + // Only do this clean up once, if it succeeded, no reason to do the query again. if (oldStatsCleanedUp.compareAndSet(false, true)) { transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap(nil -> { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 81e959e13c48e..3af94c7a7a74d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -43,16 +43,22 @@ import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class TransformIndexer extends AsyncTwoPhaseIndexer { + private static final int PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC = 5; + /** * RunState is an internal (non-persisted) state that controls the internal logic * which query filters to run and which index requests to send @@ -87,6 +93,8 @@ private enum RunState { // Indicates that the source has changed for the current run protected volatile boolean hasSourceChanged = true; + protected final AtomicReference>> saveStateListeners = new AtomicReference<>(); + private final Map fieldMappings; // the function of the transform, e.g. pivot @@ -159,6 +167,12 @@ protected float getMaxDocsPerSecond() { return docsPerSecond; } + @Override + protected boolean triggerSaveState() { + // trigger in case of listeners waiting for state being saved + return saveStateListeners.get() != null || super.triggerSaveState(); + } + public TransformConfig getConfig() { return transformConfig; } @@ -508,6 +522,100 @@ protected void onAbort() { context.shutdown(); } + /** + * Let the indexer stop at the next checkpoint and call the listener after the flag has been persisted in state. + * + * If the indexer isn't running, persist state if required and call the listener immediately. + */ + final void setStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener shouldStopAtCheckpointListener) { + // this should be called from the generic threadpool + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + + try { + if (addSetStopAtCheckpointListener(shouldStopAtCheckpoint, shouldStopAtCheckpointListener) == false) { + shouldStopAtCheckpointListener.onResponse(null); + } + } catch (InterruptedException e) { + logger.error( + new ParameterizedMessage( + "[{}] Timed out ({}s) waiting for transform state to be stored.", + getJobId(), + PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC + ), + e + ); + + // the transport wraps this with a REST status code + shouldStopAtCheckpointListener.onFailure( + new RuntimeException( + "Timed out (" + PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC + "s) waiting for transform state to be stored.", + e + ) + ); + } catch (Exception e) { + logger.error(new ParameterizedMessage("[{}] failed to persist transform state.", getJobId()), e); + shouldStopAtCheckpointListener.onFailure(e); + } + } + + private synchronized boolean addSetStopAtCheckpointListener( + boolean shouldStopAtCheckpoint, + ActionListener shouldStopAtCheckpointListener + ) throws InterruptedException { + IndexerState state = getState(); + + // in case the indexer isn't running, respond immediately + if (state == IndexerState.STARTED && context.shouldStopAtCheckpoint() != shouldStopAtCheckpoint) { + context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); + + // because save state is async we need to block the call until state is persisted, so that the job can not + // be triggered (ensured by synchronized) + CountDownLatch latch = new CountDownLatch(1); + doSaveState(IndexerState.STARTED, getPosition(), () -> { latch.countDown(); }); + + latch.await(PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC, TimeUnit.SECONDS); + return false; + } + + if (state != IndexerState.INDEXING) { + return false; + } + + if (saveStateListeners.updateAndGet(currentListeners -> { + // check the state again (optimistic locking), while we checked the last time, the indexing thread could have + // saved the state and is finishing. As it first set the state and _than_ gets saveStateListeners, it's safe + // to just check the indexer state again + if (getState() != IndexerState.INDEXING) { + return null; + } + + if (currentListeners == null) { + // in case shouldStopAtCheckpoint has already the desired value _and_ we know its _persisted_, respond immediately + if (context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint) { + return null; + } + + return Collections.singletonList(shouldStopAtCheckpointListener); + } + + List> newListeners = new ArrayList<>(currentListeners); + newListeners.add(shouldStopAtCheckpointListener); + return newListeners; + }) == null) { + return false; + } + + context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); + // in case of throttling the indexer might wait for the next search, fast forward, so stop listeners do not wait to long + runSearchImmediately(); + return true; + } + + void stopAndSaveState() { + onStop(); + doSaveState(IndexerState.STOPPED, getPosition(), () -> {}); + } + synchronized void handleFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e); Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index d50894bdbea6e..c921d1f541c83 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -286,16 +286,9 @@ synchronized void start(Long startingCheckpoint, ActionListener { getIndexer().setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener); }); } public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { @@ -346,12 +342,13 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { // If state was in a failed state, we should stop immediately if (wasFailed) { - getIndexer().onStop(); - getIndexer().doSaveState(IndexerState.STOPPED, getIndexer().getPosition(), () -> {}); + getIndexer().stopAndSaveState(); return; } - if (getIndexer().getState() == IndexerState.STOPPED || getIndexer().getState() == IndexerState.STOPPING) { + IndexerState indexerState = getIndexer().getState(); + + if (indexerState == IndexerState.STOPPED || indexerState == IndexerState.STOPPING) { return; } @@ -361,11 +358,10 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint, // or has yet to even start one. // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time). - (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) { + (indexerState == IndexerState.STARTED && getIndexer().initialRun())) { IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { - getIndexer().onStop(); - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); + getIndexer().stopAndSaveState(); } } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/MockTimebasedCheckpointProvider.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/MockTimebasedCheckpointProvider.java new file mode 100644 index 0000000000000..13653620c721f --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/MockTimebasedCheckpointProvider.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.checkpoint; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; +import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; + +import java.util.Collections; + +/** + * mock of a time based checkpoint provider for testing + */ +public class MockTimebasedCheckpointProvider implements CheckpointProvider { + + private final TransformConfig transformConfig; + private final TimeSyncConfig timeSyncConfig; + + public MockTimebasedCheckpointProvider(final TransformConfig transformConfig) { + this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); + timeSyncConfig = ExceptionsHelper.requireNonNull((TimeSyncConfig) transformConfig.getSyncConfig(), "timeSyncConfig"); + } + + @Override + public void createNextCheckpoint(TransformCheckpoint lastCheckpoint, ActionListener listener) { + + final long timestamp = System.currentTimeMillis(); + long timeUpperBound = timestamp - timeSyncConfig.getDelay().millis(); + + if (lastCheckpoint == null) { + listener.onResponse(new TransformCheckpoint(transformConfig.getId(), timestamp, 0, Collections.emptyMap(), timeUpperBound)); + } + + listener.onResponse( + new TransformCheckpoint( + transformConfig.getId(), + timestamp, + lastCheckpoint.getCheckpoint() + 1, + Collections.emptyMap(), + timeUpperBound + ) + ); + } + + @Override + public void sourceHasChanged(TransformCheckpoint lastCheckpoint, ActionListener listener) { + // lets pretend there are always changes + listener.onResponse(true); + } + + @Override + public void getCheckpointingInfo( + TransformCheckpoint lastCheckpoint, + TransformCheckpoint nextCheckpoint, + TransformIndexerPosition nextCheckpointPosition, + TransformProgress nextCheckpointProgress, + ActionListener listener + ) { + TransformCheckpointingInfoBuilder checkpointingInfoBuilder = new TransformCheckpointingInfoBuilder(); + checkpointingInfoBuilder.setLastCheckpoint(lastCheckpoint) + .setNextCheckpoint(nextCheckpoint) + .setNextCheckpointPosition(nextCheckpointPosition) + .setNextCheckpointProgress(nextCheckpointProgress); + + long timestamp = System.currentTimeMillis(); + TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, Collections.emptyMap(), 0L); + checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint); + checkpointingInfoBuilder.setOperationsBehind(TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint)); + listener.onResponse(checkpointingInfoBuilder); + } + + @Override + public void getCheckpointingInfo( + long lastCheckpointNumber, + TransformIndexerPosition nextCheckpointPosition, + TransformProgress nextCheckpointProgress, + ActionListener listener + ) { + long timestamp = System.currentTimeMillis(); + + // emulate the 2 last checkpoints as if last run 1 second ago and next right now + TransformCheckpoint lastCheckpoint = new TransformCheckpoint( + transformConfig.getId(), + timestamp - 1000, + lastCheckpointNumber, + Collections.emptyMap(), + timestamp - 1000 - timeSyncConfig.getDelay().millis() + ); + + TransformCheckpoint nextCheckpoint = new TransformCheckpoint( + transformConfig.getId(), + timestamp, + lastCheckpointNumber + 1, + Collections.emptyMap(), + timestamp - timeSyncConfig.getDelay().millis() + ); + + getCheckpointingInfo(lastCheckpoint, nextCheckpoint, nextCheckpointPosition, nextCheckpointProgress, listener); + } + +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java new file mode 100644 index 0000000000000..fc4d2b714d2dc --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -0,0 +1,583 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; +import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider; +import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; +import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; +import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; +import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; +import static org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests.randomPivotConfig; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.mockito.Mockito.mock; + +public class TransformIndexerStateTests extends ESTestCase { + + private static final SearchResponse ONE_HIT_SEARCH_RESPONSE = new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + "", + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + + private Client client; + private ThreadPool threadPool; + private TransformAuditor auditor; + private TransformConfigManager transformConfigManager; + + class MockedTransformIndexer extends TransformIndexer { + + private final ThreadPool threadPool; + private final String executorName; + + private TransformState persistedState; + private int saveStateListenerCallCount = 0; + // used for synchronizing with the test + private CountDownLatch searchLatch; + private CountDownLatch doProcessLatch; + + MockedTransformIndexer( + ThreadPool threadPool, + String executorName, + TransformConfigManager transformsConfigManager, + CheckpointProvider checkpointProvider, + TransformAuditor auditor, + TransformConfig transformConfig, + Map fieldMappings, + AtomicReference initialState, + TransformIndexerPosition initialPosition, + TransformIndexerStats jobStats, + TransformContext context + ) { + super( + threadPool, + executorName, + transformsConfigManager, + checkpointProvider, + auditor, + transformConfig, + fieldMappings, + initialState, + initialPosition, + jobStats, + /* TransformProgress */ null, + TransformCheckpoint.EMPTY, + TransformCheckpoint.EMPTY, + context + ); + this.threadPool = threadPool; + this.executorName = executorName; + + persistedState = new TransformState( + context.getTaskState(), + initialState.get(), + initialPosition, + context.getCheckpoint(), + context.getStateReason(), + getProgress(), + null, + context.shouldStopAtCheckpoint() + ); + } + + public void initialize() { + this.initializeFunction(); + } + + public CountDownLatch createAwaitForSearchLatch(int count) { + return searchLatch = new CountDownLatch(count); + } + + public CountDownLatch createCountDownOnResponseLatch(int count) { + return doProcessLatch = new CountDownLatch(count); + } + + @Override + void doGetInitialProgress(SearchRequest request, ActionListener responseListener) { + responseListener.onResponse(ONE_HIT_SEARCH_RESPONSE); + } + + @Override + protected void doNextSearch(long waitTimeInNanos, ActionListener nextPhase) { + if (searchLatch != null) { + try { + searchLatch.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + threadPool.executor(executorName).execute(() -> nextPhase.onResponse(ONE_HIT_SEARCH_RESPONSE)); + } + + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + if (doProcessLatch != null) { + doProcessLatch.countDown(); + } + threadPool.executor(executorName).execute(() -> nextPhase.onResponse(new BulkResponse(new BulkItemResponse[0], 100))); + } + + @Override + protected void doSaveState(IndexerState state, TransformIndexerPosition position, Runnable next) { + persistedState = new TransformState( + context.getTaskState(), + state, + position, + context.getCheckpoint(), + context.getStateReason(), + getProgress(), + null, + context.shouldStopAtCheckpoint() + ); + + Collection> saveStateListenersAtTheMomentOfCalling = saveStateListeners.getAndSet(null); + try { + if (saveStateListenersAtTheMomentOfCalling != null) { + saveStateListenerCallCount += saveStateListenersAtTheMomentOfCalling.size(); + ActionListener.onResponse(saveStateListenersAtTheMomentOfCalling, null); + } + } catch (Exception onResponseException) { + fail("failed to call save state listeners"); + } finally { + next.run(); + } + } + + @Override + protected IterationResult doProcess(SearchResponse searchResponse) { + // pretend that we processed 10k documents for each call + getStats().incrementNumDocuments(10_000); + return new IterationResult<>(Collections.singletonList(new IndexRequest()), new TransformIndexerPosition(null, null), false); + } + + public boolean waitingForNextSearch() { + return super.getScheduledNextSearch() != null; + } + + public int getSaveStateListenerCallCount() { + return saveStateListenerCallCount; + } + + public TransformState getPersistedState() { + return persistedState; + } + } + + @Before + public void setUpMocks() { + auditor = new MockTransformAuditor(); + transformConfigManager = new InMemoryTransformConfigManager(); + client = new NoOpClient(getTestName()); + threadPool = new TestThreadPool(ThreadPool.Names.GENERIC); + } + + @After + public void tearDownClient() { + client.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + public void testStopAtCheckpoint() throws Exception { + TransformConfig config = new TransformConfig( + randomAlphaOfLength(10), + randomSourceConfig(), + randomDestConfig(), + null, + new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)), + null, + randomPivotConfig(), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + null + ); + + for (IndexerState state : IndexerState.values()) { + // skip indexing case, tested below + if (IndexerState.INDEXING.equals(state)) { + continue; + } + AtomicReference stateRef = new AtomicReference<>(state); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + final MockedTransformIndexer indexer = createMockIndexer( + config, + stateRef, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + new TransformIndexerStats(), + context + ); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertEquals(0, indexer.getSaveStateListenerCallCount()); + if (IndexerState.STARTED.equals(state)) { + assertTrue(context.shouldStopAtCheckpoint()); + assertTrue(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + } else { + // shouldStopAtCheckpoint should not be set, because the indexer is already stopped, stopping or aborting + assertFalse(context.shouldStopAtCheckpoint()); + assertFalse(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + } + } + + // lets test a running indexer + AtomicReference state = new AtomicReference<>(IndexerState.STARTED); + { + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + final MockedTransformIndexer indexer = createMockIndexer( + config, + state, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + new TransformIndexerStats(), + context + ); + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertEquals(indexer.getState(), IndexerState.INDEXING); + + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + + indexer.stop(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)), 5, TimeUnit.SECONDS); + + // listener must have been called by the indexing thread + assertEquals(1, indexer.getSaveStateListenerCallCount()); + + // as the state is stopped it should go back to directly + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertEquals(1, indexer.getSaveStateListenerCallCount()); + } + + // do another round + { + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + final MockedTransformIndexer indexer = createMockIndexer( + config, + state, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + new TransformIndexerStats(), + context + ); + + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertEquals(indexer.getState(), IndexerState.INDEXING); + + // this time call it 3 times + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + + indexer.stop(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)), 5, TimeUnit.SECONDS); + + // listener must have been called by the indexing thread between 1 and 3 times + assertThat(indexer.getSaveStateListenerCallCount(), greaterThanOrEqualTo(1)); + assertThat(indexer.getSaveStateListenerCallCount(), lessThanOrEqualTo(3)); + } + + // 3rd round with some back and forth + { + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + final MockedTransformIndexer indexer = createMockIndexer( + config, + state, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + new TransformIndexerStats(), + context + ); + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertEquals(indexer.getState(), IndexerState.INDEXING); + + // slow down the indexer + CountDownLatch searchLatch = indexer.createAwaitForSearchLatch(1); + + // this time call 5 times and change stopAtCheckpoint every time + List responseLatches = new ArrayList<>(); + for (int i = 0; i < 5; ++i) { + CountDownLatch latch = new CountDownLatch(1); + boolean stopAtCheckpoint = i % 2 == 0; + countResponse(listener -> setStopAtCheckpoint(indexer, stopAtCheckpoint, listener), latch); + responseLatches.add(latch); + } + + // now let the indexer run again + searchLatch.countDown(); + + indexer.stop(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)), 5, TimeUnit.SECONDS); + + // wait for all listeners + for (CountDownLatch l : responseLatches) { + assertTrue("timed out after 5s", l.await(5, TimeUnit.SECONDS)); + } + + // listener must have been called 5 times, because the value changed every time and we slowed down the indexer + assertThat(indexer.getSaveStateListenerCallCount(), equalTo(5)); + } + + // 4th round: go wild + { + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + final MockedTransformIndexer indexer = createMockIndexer( + config, + state, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + new TransformIndexerStats(), + context + ); + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertEquals(indexer.getState(), IndexerState.INDEXING); + + // slow down the indexer + CountDownLatch searchLatch = indexer.createAwaitForSearchLatch(1); + + List responseLatches = new ArrayList<>(); + for (int i = 0; i < 3; ++i) { + CountDownLatch latch = new CountDownLatch(1); + boolean stopAtCheckpoint = randomBoolean(); + countResponse(listener -> setStopAtCheckpoint(indexer, stopAtCheckpoint, listener), latch); + responseLatches.add(latch); + } + + // now let the indexer run again + searchLatch.countDown(); + + // this time call it 3 times + assertResponse(listener -> setStopAtCheckpoint(indexer, randomBoolean(), listener)); + assertResponse(listener -> setStopAtCheckpoint(indexer, randomBoolean(), listener)); + assertResponse(listener -> setStopAtCheckpoint(indexer, randomBoolean(), listener)); + + indexer.stop(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)), 5, TimeUnit.SECONDS); + + // wait for all listeners + for (CountDownLatch l : responseLatches) { + assertTrue("timed out after 5s", l.await(5, TimeUnit.SECONDS)); + } + + // listener must have been called by the indexing thread between 1 and 6 times + assertThat(indexer.getSaveStateListenerCallCount(), greaterThanOrEqualTo(1)); + assertThat(indexer.getSaveStateListenerCallCount(), lessThanOrEqualTo(6)); + } + } + + public void testStopAtCheckpointForThrottledTransform() throws Exception { + TransformConfig config = new TransformConfig( + randomAlphaOfLength(10), + randomSourceConfig(), + randomDestConfig(), + null, + new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)), + null, + randomPivotConfig(), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + new SettingsConfig(null, Float.valueOf(1.0f)) + ); + AtomicReference state = new AtomicReference<>(IndexerState.STARTED); + + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + final MockedTransformIndexer indexer = createMockIndexer( + config, + state, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + new TransformIndexerStats(), + context + ); + + // create a latch to wait until data has been processed once + CountDownLatch onResponseLatch = indexer.createCountDownOnResponseLatch(1); + + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertEquals(indexer.getState(), IndexerState.INDEXING); + + // wait until we processed something, the indexer should throttle + onResponseLatch.await(); + // re-create the latch for the next use (before setStop, otherwise the other thread might overtake) + onResponseLatch = indexer.createCountDownOnResponseLatch(1); + + // the calls are likely executed _before_ the next search is even scheduled + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertTrue(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + assertResponse(listener -> setStopAtCheckpoint(indexer, false, listener)); + assertFalse(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertTrue(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertResponse(listener -> setStopAtCheckpoint(indexer, false, listener)); + assertFalse(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + + onResponseLatch.await(); + onResponseLatch = indexer.createCountDownOnResponseLatch(1); + + // wait until a search is scheduled + assertBusy(() -> assertTrue(indexer.waitingForNextSearch()), 5, TimeUnit.SECONDS); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertTrue(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + + onResponseLatch.await(); + onResponseLatch = indexer.createCountDownOnResponseLatch(1); + + assertBusy(() -> assertTrue(indexer.waitingForNextSearch()), 5, TimeUnit.SECONDS); + assertResponse(listener -> setStopAtCheckpoint(indexer, false, listener)); + assertFalse(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + + onResponseLatch.await(); + assertBusy(() -> assertTrue(indexer.waitingForNextSearch()), 5, TimeUnit.SECONDS); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + assertTrue(indexer.getPersistedState().shouldStopAtNextCheckpoint()); + assertResponse(listener -> setStopAtCheckpoint(indexer, true, listener)); + + indexer.stop(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)), 5, TimeUnit.SECONDS); + } + + private void setStopAtCheckpoint( + TransformIndexer indexer, + boolean shouldStopAtCheckpoint, + ActionListener shouldStopAtCheckpointListener + ) { + // we need to simulate that this is called from the task, which offloads it to the generic threadpool + CountDownLatch latch = new CountDownLatch(1); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + indexer.setStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener); + latch.countDown(); + }); + try { + assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + fail("timed out after 5s"); + } + } + + private void assertResponse(Consumer> function) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + countResponse(function, latch); + assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS)); + } + + private void countResponse(Consumer> function, CountDownLatch latch) throws InterruptedException { + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap( + r -> { assertEquals("listener called more than once", 1, latch.getCount()); }, + e -> { fail("got unexpected exception: " + e.getMessage()); } + ), + latch + ); + function.accept(listener); + } + + private MockedTransformIndexer createMockIndexer( + TransformConfig config, + AtomicReference state, + Consumer failureConsumer, + ThreadPool threadPool, + String executorName, + TransformAuditor auditor, + TransformIndexerStats jobStats, + TransformContext context + ) { + CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider(config); + transformConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {}, e -> {})); + + MockedTransformIndexer indexer = new MockedTransformIndexer( + threadPool, + executorName, + transformConfigManager, + checkpointProvider, + auditor, + config, + Collections.emptyMap(), + state, + null, + jobStats, + context + ); + + indexer.initialize(); + return indexer; + } +}