Skip to content

Commit

Permalink
[7.x][Transform] Improve robustness when saving state (#62927)
Browse files Browse the repository at this point in the history
refactor how state is persisted, call doSaveState only from the indexer thread, except there is none.

fixes #60781
fixes #52931
fixes #51629
fixes #52035
  • Loading branch information
Hendrik Muhs authored Sep 28, 2020
1 parent 21e534e commit b1a8437
Show file tree
Hide file tree
Showing 10 changed files with 931 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,7 @@ public synchronized IndexerState stop() {
});

// a throttled search might be waiting to be executed, stop it
if (scheduledNextSearch != null) {
scheduledNextSearch.reschedule(TimeValue.ZERO);
}
runSearchImmediately();

return indexerState;
}
Expand Down Expand Up @@ -243,6 +241,17 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
}
}

/**
* Checks if the state should be persisted, if true doSaveState is called before continuing. Inherited classes
* can override this, to provide a better logic, when state should be saved.
*
* @return true if state should be saved, false if not.
*/
protected boolean triggerSaveState() {
// implementors can overwrite this with something more intelligent than every-50
return (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0);
}

/**
* Re-schedules the search request if necessary, this method can be called to apply a change
* in maximumRequestsPerSecond immediately
Expand All @@ -256,11 +265,28 @@ protected void rethrottle() {
reQueueThrottledSearch();
}

/**
* Re-schedules the current search request to run immediately, iff one is scheduled.
*
* Call this if you need the indexer to fast forward a scheduled(in case it's throttled) search once in order to
* complete a full cycle.
*/
protected void runSearchImmediately() {
if (scheduledNextSearch != null) {
scheduledNextSearch.reschedule(TimeValue.ZERO);
}
}

// protected, so it can be overwritten by tests
protected long getTimeNanos() {
return System.nanoTime();
}

// only for testing purposes
protected ScheduledRunnable getScheduledNextSearch() {
return scheduledNextSearch;
}

/**
* Called to get max docs per second. To be overwritten if
* throttling is implemented, the default -1 turns off throttling.
Expand Down Expand Up @@ -490,7 +516,11 @@ private void onSearchResponse(SearchResponse searchResponse) {
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);

nextSearch();
if (triggerSaveState()) {
doSaveState(IndexerState.INDEXING, newPosition, () -> { nextSearch(); });
} else {
nextSearch();
}
} catch (Exception e) {
finishWithFailure(e);
}
Expand All @@ -509,11 +539,8 @@ private void onBulkResponse(BulkResponse response, JobPosition position) {
}

try {
// TODO probably something more intelligent than every-50 is needed
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
doSaveState(IndexerState.INDEXING, position, () -> {
nextSearch();
});
if (triggerSaveState()) {
doSaveState(IndexerState.INDEXING, position, () -> { nextSearch(); });
} else {
nextSearch();
}
Expand Down Expand Up @@ -547,8 +574,8 @@ protected void nextSearch() {
() -> triggerNextSearch(executionDelay.getNanos())
);

// corner case: if for whatever reason stop() has been called meanwhile fast forward
if (getState().equals(IndexerState.STOPPING)) {
// corner case: if meanwhile stop() has been called or state persistence has been requested: fast forward, run search now
if (getState().equals(IndexerState.STOPPING) || triggerSaveState()) {
scheduledNextSearch.reschedule(TimeValue.ZERO);
}
return;
Expand All @@ -563,6 +590,8 @@ private void triggerNextSearch(long waitTimeInNanos) {
return;
}

// cleanup the scheduled runnable
scheduledNextSearch = null;
stats.markStartSearch();
lastSearchStartTimeNanos = getTimeNanos();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void testContinuousTransformUpdate() throws Exception {

public void testStopWaitForCheckpoint() throws Exception {
String indexName = "wait-for-checkpoint-reviews";
String transformId = "data-frame-transform-wait-for-checkpoint";
String transformId = "transform-wait-for-checkpoint";
createReviewsIndex(indexName, 1000);

Map<String, SingleGroupSource> groups = new HashMap<>();
Expand All @@ -243,10 +243,11 @@ public void testStopWaitForCheckpoint() throws Exception {
).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build();

assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());

assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());

// waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop
stopTransform(transformId, false, null, true);
assertTrue(stopTransform(transformId, false, null, true).isAcknowledged());

// Wait until the first checkpoint
waitUntilCheckpoint(config.getId(), 1L);
Expand All @@ -258,7 +259,31 @@ public void testStopWaitForCheckpoint() throws Exception {
assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), equalTo(1000L));
});

stopTransform(config.getId());
int additionalRuns = randomIntBetween(1, 10);

for (int i = 0; i < additionalRuns; ++i) {
// index some more docs using a new user
long timeStamp = Instant.now().toEpochMilli() - 1_000;
long user = 42 + i;
indexMoreDocs(timeStamp, user, indexName);
assertTrue(startTransformWithRetryOnConflict(config.getId(), RequestOptions.DEFAULT).isAcknowledged());

boolean waitForCompletion = randomBoolean();
assertTrue(stopTransform(transformId, waitForCompletion, null, true).isAcknowledged());

assertBusy(() -> {
TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0);
assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED));
});
TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0);
assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED));
}

TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0);
assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED));
assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), greaterThan(1000L));

assertTrue(stopTransform(transformId).isAcknowledged());
deleteTransform(config.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.transform.integration;

import org.apache.logging.log4j.Level;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -140,6 +142,24 @@ protected StartTransformResponse startTransform(String id, RequestOptions option
}
}

// workaround for https://github.com/elastic/elasticsearch/issues/62204
protected StartTransformResponse startTransformWithRetryOnConflict(String id, RequestOptions options) throws Exception {
ElasticsearchStatusException lastConflict = null;
for (int retries = 10; retries > 0; --retries) {
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
return restClient.transform().startTransform(new StartTransformRequest(id), options);
} catch (ElasticsearchStatusException e) {
if (RestStatus.CONFLICT.equals(e.status()) == false) {
throw e;
}

lastConflict = e;
Thread.sleep(5);
}
}
throw lastConflict;
}

protected AcknowledgedResponse deleteTransform(String id) throws IOException {
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
AcknowledgedResponse response = restClient.transform().deleteTransform(new DeleteTransformRequest(id), RequestOptions.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.transform.integration.continuous;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -38,7 +37,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/60781")
public class DateHistogramGroupByIT extends ContinuousTestCase {
private static final String NAME = "continuous-date-histogram-pivot-test";
private static final String MISSING_BUCKET_KEY = ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.transform.integration.continuous;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -34,7 +33,6 @@

import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/60781")
public class TermsGroupByIT extends ContinuousTestCase {

private static final String NAME = "continuous-terms-pivot-test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -97,34 +98,6 @@ class ClientTransformIndexer extends TransformIndexer {
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
}

void persistShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener<Void> shouldStopAtCheckpointListener) {
if (context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint
|| getState() == IndexerState.STOPPED
|| getState() == IndexerState.STOPPING) {
shouldStopAtCheckpointListener.onResponse(null);
return;
}
TransformState state = new TransformState(
context.getTaskState(),
getState(),
getPosition(),
context.getCheckpoint(),
context.getStateReason(),
getProgress(),
null, // Node attributes
shouldStopAtCheckpoint
);
doSaveState(state, ActionListener.wrap(r -> {
// We only want to update this internal value if it is persisted as such
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", getJobId(), shouldStopAtCheckpoint);
shouldStopAtCheckpointListener.onResponse(null);
}, statsExc -> {
logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]", getJobId(), shouldStopAtCheckpoint);
shouldStopAtCheckpointListener.onFailure(statsExc);
}));
}

@Override
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
if (context.getTaskState() == TransformTaskState.FAILED) {
Expand Down Expand Up @@ -249,6 +222,9 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p
return;
}

// getting the listeners that registered till now, in theory a new listener could sneak in between this line
// and the next, however this is benign: we would store `shouldStopAtCheckpoint = true` twice which is ok
Collection<ActionListener<Void>> saveStateListenersAtTheMomentOfCalling = saveStateListeners.getAndSet(null);
boolean shouldStopAtCheckpoint = context.shouldStopAtCheckpoint();

// If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states
Expand All @@ -267,6 +243,9 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p
// If the state is `STOPPED` this means that TransformTask#stop was called while we were checking for changes.
// Allow the stop call path to continue
if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) {
if (saveStateListenersAtTheMomentOfCalling != null) {
ActionListener.onResponse(saveStateListenersAtTheMomentOfCalling, null);
}
next.run();
return;
}
Expand Down Expand Up @@ -308,11 +287,33 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p
);
logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString());

doSaveState(state, ActionListener.wrap(r -> next.run(), e -> next.run()));
// we might need to call the save state listeners, but do not want to stop rolling
doSaveState(state, ActionListener.wrap(r -> {
try {
if (saveStateListenersAtTheMomentOfCalling != null) {
ActionListener.onResponse(saveStateListenersAtTheMomentOfCalling, r);
}
} catch (Exception onResponseException) {
String msg = LoggerMessageFormat.format("[{}] failed notifying saveState listeners, ignoring.", getJobId());
logger.warn(msg, onResponseException);
} finally {
next.run();
}
}, e -> {
try {
if (saveStateListenersAtTheMomentOfCalling != null) {
ActionListener.onFailure(saveStateListenersAtTheMomentOfCalling, e);
}
} catch (Exception onFailureException) {
String msg = LoggerMessageFormat.format("[{}] failed notifying saveState listeners, ignoring.", getJobId());
logger.warn(msg, onFailureException);
} finally {
next.run();
}
}));
}

private void doSaveState(TransformState state, ActionListener<Void> listener) {

// This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = getSeqNoPrimaryTermAndIndex();

Expand All @@ -328,6 +329,7 @@ private void doSaveState(TransformState state, ActionListener<Void> listener) {
if (state.getTaskState().equals(TransformTaskState.STOPPED)) {
context.shutdown();
}

// Only do this clean up once, if it succeeded, no reason to do the query again.
if (oldStatsCleanedUp.compareAndSet(false, true)) {
transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap(nil -> {
Expand Down
Loading

0 comments on commit b1a8437

Please sign in to comment.