Skip to content

Commit

Permalink
[ML] results persisting service should respect node shutdown (elastic…
Browse files Browse the repository at this point in the history
…#66128)

When a node is shutting down, any scheduled retry tasks should be cancelled.

Also, any currently finishing but failed tasks should not be retried.
  • Loading branch information
benwtrent authored Dec 10, 2020
1 parent 887bb16 commit 9c99b5b
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -104,6 +106,26 @@ public void testFailOver() throws Exception {
});
}

@Before
public void setLogging() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("logger.org.elasticsearch.xpack.ml.utils.persistence", "TRACE")
.build()).get();
}

@After
public void unsetLogging() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.putNull("logger.org.elasticsearch.xpack.ml.utils.persistence")
.build()).get();
}

public void testLoseDedicatedMasterNode() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("Starting dedicated master node...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -82,7 +85,9 @@ public class ResultsPersisterService {

private final ThreadPool threadPool;
private final OriginSettingClient client;
private final Map<Object, RetryableAction<?>> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap();
private volatile int maxFailureRetries;
private volatile boolean isShutdown = false;

// Visible for testing
public ResultsPersisterService(ThreadPool threadPool,
Expand All @@ -94,6 +99,24 @@ public ResultsPersisterService(ThreadPool threadPool,
this.maxFailureRetries = PERSIST_RESULTS_MAX_RETRIES.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(PERSIST_RESULTS_MAX_RETRIES, this::setMaxFailureRetries);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
shutdown();
}
});
}

void shutdown() {
isShutdown = true;
if (onGoingRetryableActions.isEmpty()) {
return;
}
final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("Node is shutting down");
for (RetryableAction<?> action : onGoingRetryableActions.values()) {
action.cancel(exception);
}
onGoingRetryableActions.clear();
}

void setMaxFailureRetries(int value) {
Expand Down Expand Up @@ -150,35 +173,57 @@ private BulkResponse bulkIndexWithRetry(BulkRequest bulkRequest,
Supplier<Boolean> shouldRetry,
Consumer<String> retryMsgHandler,
BiConsumer<BulkRequest, ActionListener<BulkResponse>> actionExecutor) {
PlainActionFuture<BulkResponse> getResponse = PlainActionFuture.newFuture();
final PlainActionFuture<BulkResponse> getResponse = PlainActionFuture.newFuture();
final Object key = new Object();
final ActionListener<BulkResponse> removeListener = ActionListener.runBefore(
getResponse,
() -> onGoingRetryableActions.remove(key)
);
BulkRetryableAction bulkRetryableAction = new BulkRetryableAction(
jobId,
new BulkRequestRewriter(bulkRequest),
shouldRetry,
shouldRetryWrapper(shouldRetry),
retryMsgHandler,
actionExecutor,
getResponse
removeListener
);
onGoingRetryableActions.put(key, bulkRetryableAction);
bulkRetryableAction.run();
if (isShutdown) {
bulkRetryableAction.cancel(new CancellableThreads.ExecutionCancelledException("Node is shutting down"));
}
return getResponse.actionGet();
}

public SearchResponse searchWithRetry(SearchRequest searchRequest,
String jobId,
Supplier<Boolean> shouldRetry,
Consumer<String> retryMsgHandler) {
PlainActionFuture<SearchResponse> getResponse = PlainActionFuture.newFuture();
final PlainActionFuture<SearchResponse> getResponse = PlainActionFuture.newFuture();
final Object key = new Object();
final ActionListener<SearchResponse> removeListener = ActionListener.runBefore(
getResponse,
() -> onGoingRetryableActions.remove(key)
);
SearchRetryableAction mlRetryableAction = new SearchRetryableAction(
jobId,
searchRequest,
client,
shouldRetry,
shouldRetryWrapper(shouldRetry),
retryMsgHandler,
getResponse);
removeListener);
onGoingRetryableActions.put(key, mlRetryableAction);
mlRetryableAction.run();
if (isShutdown) {
mlRetryableAction.cancel(new CancellableThreads.ExecutionCancelledException("Node is shutting down"));
}
return getResponse.actionGet();
}

private Supplier<Boolean> shouldRetryWrapper(Supplier<Boolean> shouldRetry) {
return () -> (isShutdown == false) && shouldRetry.get();
}

static class RecoverableException extends Exception { }
static class IrrecoverableException extends ElasticsearchStatusException {
IrrecoverableException(String msg, RestStatus status, Throwable cause, Object... args) {
Expand Down Expand Up @@ -408,6 +453,12 @@ protected long calculateDelay(long previousDelay) {
protected long minimumDelayMillis() {
return currentMin;
}

@Override
public void cancel(Exception e) {
super.cancel(e);
LOGGER.debug(() -> new ParameterizedMessage("[{}] retrying cancelled for action [{}]", jobId, getName()), e);
}
}

private static BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) {
Expand Down

0 comments on commit 9c99b5b

Please sign in to comment.