Skip to content

Commit

Permalink
Timeout health API on busy master (#57587)
Browse files Browse the repository at this point in the history
Today `GET _cluster/health?wait_for_events=...&timeout=...` will wait
indefinitely for the master to process the pending cluster health task,
ignoring the specified timeout. This could take a very long time if the master
is overloaded. This commit fixes this by adding a timeout to the pending
cluster health task.
  • Loading branch information
DaveCTurner authored Jun 4, 2020
1 parent 6e4606b commit d81ea8e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

Expand Down Expand Up @@ -261,12 +263,13 @@ public void run() {
clusterHealthThread.join();
}

public void testWaitForEventsRetriesIfOtherConditionsNotMet() throws Exception {
public void testWaitForEventsRetriesIfOtherConditionsNotMet() {
final ActionFuture<ClusterHealthResponse> healthResponseFuture
= client().admin().cluster().prepareHealth("index").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute();

final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -275,26 +278,33 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
completionFuture.onFailure(e);
throw new AssertionError(source, e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (keepSubmittingTasks.get()) {
clusterService.submitStateUpdateTask("looping task", this);
} else {
completionFuture.onResponse(null);
}
}
});

createIndex("index");
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());
try {
createIndex("index");
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());

// at this point the original health response should not have returned: there was never a point where the index was green AND
// the master had processed all pending tasks above LANGUID priority.
assertFalse(healthResponseFuture.isDone());

keepSubmittingTasks.set(false);
assertFalse(healthResponseFuture.get().isTimedOut());
// at this point the original health response should not have returned: there was never a point where the index was green AND
// the master had processed all pending tasks above LANGUID priority.
assertFalse(healthResponseFuture.isDone());
keepSubmittingTasks.set(false);
assertFalse(healthResponseFuture.actionGet(TimeValue.timeValueSeconds(30)).isTimedOut());
} finally {
keepSubmittingTasks.set(false);
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
}
}

public void testHealthOnMasterFailover() throws Exception {
Expand All @@ -311,4 +321,42 @@ public void testHealthOnMasterFailover() throws Exception {
assertSame(responseFuture.get().getStatus(), ClusterHealthStatus.GREEN);
}
}

public void testWaitForEventsTimesOutIfMasterBusy() {
final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
completionFuture.onFailure(e);
throw new AssertionError(source, e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (keepSubmittingTasks.get()) {
clusterService.submitStateUpdateTask("looping task", this);
} else {
completionFuture.onResponse(null);
}
}
});

try {
final ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(TimeValue.timeValueSeconds(1))
.get(TimeValue.timeValueSeconds(30));
assertTrue(clusterHealthResponse.isTimedOut());
} finally {
keepSubmittingTasks.set(false);
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -96,7 +97,7 @@ protected void masterOperation(final Task task,
waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis());
} else {
executeHealth(request, clusterService.state(), listener, waitCount,
clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, false)));
clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, TimeoutState.OK)));
}
}

Expand Down Expand Up @@ -129,13 +130,19 @@ public void onFailure(String source, Exception e) {
}
});
} else {
final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])",
new ClusterStateUpdateTask(request.waitForEvents()) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public TimeValue timeout() {
return taskTimeout;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
Expand All @@ -161,8 +168,12 @@ public void onNoLongerMaster(String source) {

@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
listener.onFailure(e);
if (e instanceof ProcessClusterEventTimeoutException) {
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
} else {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
listener.onFailure(e);
}
}
});
}
Expand All @@ -175,13 +186,13 @@ private void executeHealth(final ClusterHealthRequest request,
final Consumer<ClusterState> onNewClusterStateAfterDelay) {

if (request.timeout().millis() == 0) {
listener.onResponse(getResponse(request, currentState, waitCount, true));
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.ZERO_TIMEOUT));
return;
}

final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
if (validationPredicate.test(currentState)) {
listener.onResponse(getResponse(request, currentState, waitCount, false));
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.OK));
} else {
final ClusterStateObserver observer
= new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
Expand All @@ -198,7 +209,7 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, true));
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, TimeoutState.TIMED_OUT));
}
};
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
Expand Down Expand Up @@ -234,19 +245,23 @@ private boolean validateRequest(final ClusterHealthRequest request, ClusterState
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
}

private enum TimeoutState {
OK,
TIMED_OUT,
ZERO_TIMEOUT
}

private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
final int waitFor, boolean timedOut) {
final int waitFor, TimeoutState timeoutState) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
boolean valid = (readyCounter == waitFor);
assert valid || timedOut;
// we check for a timeout here since this method might be called from the wait_for_events
// response handler which might have timed out already.
// if the state is sufficient for what we where waiting for we don't need to mark this as timedOut.
// We spend too much time in waiting for events such that we might already reached a valid state.
// this should not mark the request as timed out
response.setTimedOut(timedOut && valid == false);
assert valid || (timeoutState != TimeoutState.OK);
// If valid && timeoutState == TimeoutState.ZERO_TIMEOUT then we immediately found **and processed** a valid state, so we don't
// consider this a timeout. However if timeoutState == TimeoutState.TIMED_OUT then we didn't process a valid state (perhaps we
// failed on wait_for_events) so this does count as a timeout.
response.setTimedOut(valid == false || timeoutState == TimeoutState.TIMED_OUT);
return response;
}

Expand Down

0 comments on commit d81ea8e

Please sign in to comment.