Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly support task batches in ReservedStateUpdateTaskExecutor #116353

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -357,7 +360,7 @@ private void assertClusterStateNotSaved(CountDownLatch savedClusterState, Atomic
updateClusterSettings(Settings.builder().put("search.allow_expensive_queries", "false"));
}

public void testErrorSaved() throws Exception {
public void testErrorNotSaved() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
Expand All @@ -381,6 +384,40 @@ public void testErrorSaved() throws Exception {
assertClusterStateNotSaved(savedClusterState.v1(), savedClusterState.v2());
}

public void testLastSettingsInBatchApplied() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode();
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
var savedClusterState = setupClusterStateListener(masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var barrier = new CyclicBarrier(2);
masterNodeClusterService.createTaskQueue("block", Priority.NORMAL, batchExecutionContext -> {
safeAwait(barrier);
safeAwait(barrier);
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
return batchExecutionContext.initialState();
}).submitTask("block", ESTestCase::fail, null);

safeAwait(barrier);
writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet()); // Valid but skipped
writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.incrementAndGet()); // The last valid setting
safeAwait(barrier);
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
}

public void testErrorCanRecoverOnRestart() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -70,6 +71,13 @@ public ReservedStateUpdateTask(
this.listener = listener;
}

/**
* @return true if {@code this} would overwrite the effects of {@code prev} assuming {@code this} is executed later.
*/
public boolean supersedes(ReservedStateUpdateTask prev) {
return versionCheck.test(prev.stateChunk.metadata().version(), this.stateChunk.metadata().version());
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -213,4 +221,13 @@ static boolean checkMetadataVersion(
return false;
}

@Override
public String toString() {
return "ReservedStateUpdateTask{" + "namespace='" + namespace + '\'' + ", metadata=" + stateChunk.metadata() + '}';
}

/**
* x < y if x.{@linkplain #supersedes}(y)
*/
public static final Comparator<ReservedStateUpdateTask> SUPERSEDING_FIRST = (x, y) -> x.supersedes(y) ? -1 : y.supersedes(x) ? 1 : 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Tuple;

import java.util.ArrayList;

import static java.util.Comparator.comparing;
import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.SUPERSEDING_FIRST;

/**
* Reserved cluster state update task executor
*/
public class ReservedStateUpdateTaskExecutor extends SimpleBatchedExecutor<ReservedStateUpdateTask, Void> {
public class ReservedStateUpdateTaskExecutor implements ClusterStateTaskExecutor<ReservedStateUpdateTask> {

private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTaskExecutor.class);

Expand All @@ -34,17 +38,56 @@ public ReservedStateUpdateTaskExecutor(RerouteService rerouteService) {
}

@Override
public Tuple<ClusterState, Void> executeTask(ReservedStateUpdateTask task, ClusterState clusterState) {
return Tuple.tuple(task.execute(clusterState), null);
}
public final ClusterState execute(BatchExecutionContext<ReservedStateUpdateTask> batchExecutionContext) throws Exception {
var initState = batchExecutionContext.initialState();
var taskContexts = batchExecutionContext.taskContexts();
if (taskContexts.isEmpty()) {
return initState;
}

@Override
public void taskSucceeded(ReservedStateUpdateTask task, Void unused) {
task.listener().onResponse(ActionResponse.Empty.INSTANCE);
// In a given batch of update tasks, only one will actually take effect,
// and we want to execute only that task, because if we execute all the tasks
// one after another, that will require all the tasks to be capable of executing
// correctly even without prior state updates being applied.
//
// The correct task to run would be whichever one would take effect if we were to
// run the tasks one-per-batch. In effect, this is the task with the highest version number;
// if multiple tasks have the same version number, their ReservedStateVersionCheck fields
// will be used to break the tie.
//
// One wrinkle is: if the task fails, then we will know retroactively that it was
// not the task that actually took effect, and we must then identify which of the
// remaining tasks would have taken effect. We achieve this by sorting the tasks
// using the SUPERSEDING_FIRST comparator.

var candidates = new ArrayList<>(taskContexts);
candidates.sort(comparing(TaskContext::getTask, SUPERSEDING_FIRST));
for (var iter = candidates.iterator(); iter.hasNext();) {
TaskContext<ReservedStateUpdateTask> taskContext = iter.next();
logger.info("Effective task: {}", taskContext.getTask());
ClusterState clusterState = initState;
try (var ignored = taskContext.captureResponseHeaders()) {
var task = taskContext.getTask();
clusterState = task.execute(clusterState);
taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE));
logger.debug("-> Update task succeeded");
// All the others conceptually "succeeded" and then were superseded by the effective task
iter.forEachRemaining(c -> c.success(() -> c.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE)));
return clusterState;
} catch (Exception e) {
taskContext.onFailure(e);
if (candidates.isEmpty() == false) {
logger.warn("-> Update task failed; will try the previous update task");
}
}
}

logger.warn("All {} update tasks failed; returning initial state", taskContexts.size());
return initState;
}

@Override
public void clusterStatePublished() {
public final void clusterStatePublished(ClusterState newClusterState) {
rerouteService.reroute(
"reroute after saving and reserving part of the cluster state",
Priority.NORMAL,
Expand All @@ -54,4 +97,5 @@ public void clusterStatePublished() {
)
);
}

}
Loading