Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GatewayService #99994

Merged
merged 19 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 132 additions & 67 deletions server/src/main/java/org/elasticsearch/gateway/GatewayService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -80,9 +82,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final TimeValue recoverAfterTime;
private final int recoverAfterDataNodes;
private final int expectedDataNodes;

private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
volatile PendingStateRecovery currentPendingStateRecovery;

@Inject
public GatewayService(
Expand Down Expand Up @@ -131,8 +131,9 @@ public void clusterChanged(final ClusterChangedEvent event) {
}

final ClusterState state = event.state();
final DiscoveryNodes nodes = state.nodes();

if (state.nodes().isLocalNodeElectedMaster() == false) {
if (nodes.isLocalNodeElectedMaster() == false) {
// not our job to recover
return;
}
Expand All @@ -141,83 +142,153 @@ public void clusterChanged(final ClusterChangedEvent event) {
return;
}

final DiscoveryNodes nodes = state.nodes();
if (state.nodes().getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
nodes.getDataNodes().size(),
recoverAfterDataNodes
);
} else {
boolean enforceRecoverAfterTime;
String reason;
if (expectedDataNodes == -1) {
// no expected is set, honor recover_after_data_nodes
enforceRecoverAfterTime = true;
reason = "recover_after_time was set to [" + recoverAfterTime + "]";
} else if (expectedDataNodes <= nodes.getDataNodes().size()) {
// expected is set and satisfied so recover immediately
enforceRecoverAfterTime = false;
reason = "";
// At this point, we know the state is not recovered and this node is qualified for state recovery
// But we still need to check whether a previous one is running already
final long currentTerm = state.term();
final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery;

// Always start a new state recovery if the master term changes
// If there is a previous one still waiting, both will probably run but at most one of them will
// actually make changes to cluster state because either:
// 1. The previous recovers the cluster state and the current one will be skipped
// 2. The previous one sees a new cluster term and skips its own execution
if (existingPendingStateRecovery == null || existingPendingStateRecovery.expectedTerm < currentTerm) {
currentPendingStateRecovery = new PendingStateRecovery(currentTerm);
}
currentPendingStateRecovery.onDataNodeSize(nodes.getDataNodes().size());
}

/**
* This class manages the cluster state recovery behaviours. It has two major scenarios depending
* on whether {@code recoverAfterDataNodes} is configured.
*
* <p> <b>When</b> {@code recoverAfterDataNodes} is configured:
* <ol>
* <li>Nothing can happen until it is reached
* <li>When {@code recoverAfterDataNodes} is reached, the cluster either:
* <ul>
* <li>Recover immediately when {@code expectedDataNodes} is reached or
* both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
* <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
* </ul>
* <li>The scheduled recovery can be cancelled if {@code recoverAfterDataNodes} drops below required number
* before the recovery can happen. When this happens, the process goes back to the beginning (step 1).
* <li>The recovery is scheduled only once each time {@code recoverAfterDataNodes} crosses the required number
* </ol>
*
* <p> <b>When</b> {@code recoverAfterDataNodes} is <b>Not</b> configured, the cluster either:
* <ul>
* <li>Recover immediately when {@code expectedDataNodes} is reached or
* both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
* <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
* </ul>
*/
class PendingStateRecovery {
private final long expectedTerm;
@Nullable
private Scheduler.ScheduledCancellable scheduledRecovery;
private final AtomicBoolean taskSubmitted = new AtomicBoolean();

PendingStateRecovery(long expectedTerm) {
this.expectedTerm = expectedTerm;
}

void onDataNodeSize(int currentDataNodeSize) {
if (recoverAfterDataNodes != -1 && currentDataNodeSize < recoverAfterDataNodes) {
logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
currentDataNodeSize,
recoverAfterDataNodes
);
cancelScheduledRecovery();
} else {
// expected is set but not satisfied so wait until it is satisfied or times out
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
maybePerformOrScheduleRecovery(currentDataNodeSize);
}
performStateRecovery(enforceRecoverAfterTime, reason);
}
}

private void performStateRecovery(final boolean enforceRecoverAfterTime, final String reason) {
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery failed", e);
resetRecoveredFlags();
}

@Override
protected void doRun() {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
void maybePerformOrScheduleRecovery(int currentDataNodeSize) {
if (expectedDataNodes != -1 && expectedDataNodes <= currentDataNodeSize) {
logger.debug(
"performing state recovery of term [{}], expected data nodes [{}] is reached",
expectedTerm,
expectedDataNodes
);
cancelScheduledRecovery();
runRecoveryImmediately();
} else if (recoverAfterTime == null) {
logger.debug("performing state recovery of term [{}], no delay time is configured", expectedTerm);
cancelScheduledRecovery();
runRecoveryImmediately();
} else {
if (scheduledRecovery == null) {
logger.info(
"delaying initial state recovery for [{}] of term [{}]. expecting [{}] data nodes, but only have [{}]",
recoverAfterTime,
expectedTerm,
expectedDataNodes,
currentDataNodeSize
);
scheduledRecovery = threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery of term [" + expectedTerm + "] failed", e);
}
}
}, recoverAfterTime, threadPool.generic());
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {
try {
logger.debug("performing state recovery...");
runRecovery();
} catch (Exception e) {
logger.warn("state recovery failed", e);
resetRecoveredFlags();

@Override
protected void doRun() {
final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery;
if (PendingStateRecovery.this == existingPendingStateRecovery) {
runRecoveryImmediately();
} else {
logger.debug(
"skip scheduled state recovery since a new one of term [{}] has started",
existingPendingStateRecovery.expectedTerm
);
}
}
}, recoverAfterTime, threadPool.generic());
} else {
logger.debug("state recovery is in already scheduled for term [{}]", expectedTerm);
}
}
}
}

private void resetRecoveredFlags() {
recoveryInProgress.set(false);
scheduledRecovery.set(false);
void runRecoveryImmediately() {
if (taskSubmitted.compareAndSet(false, true)) {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(expectedTerm));
} else {
logger.debug("state recovery task is already submitted");
}
}

void cancelScheduledRecovery() {
if (scheduledRecovery != null) {
scheduledRecovery.cancel();
scheduledRecovery = null;
}
}
}

private static final String TASK_SOURCE = "local-gateway-elected-state";

class RecoverStateUpdateTask extends ClusterStateUpdateTask {

private final long expectedTerm;

RecoverStateUpdateTask(long expectedTerm) {
this.expectedTerm = expectedTerm;
}

@Override
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
logger.debug("cluster is already recovered");
return currentState;
}
if (expectedTerm != currentState.term()) {
logger.debug("skip state recovery since current term [{}] != expected term [{}]", currentState.term(), expectedTerm);
return currentState;
}
return ClusterStateUpdaters.removeStateNotRecoveredBlock(
ClusterStateUpdaters.updateRoutingTable(currentState, shardRoutingRoleStrategy)
);
Expand All @@ -228,7 +299,6 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.noop());
}

Expand All @@ -239,7 +309,6 @@ public void onFailure(final Exception e) {
() -> "unexpected failure during [" + TASK_SOURCE + "]",
e
);
resetRecoveredFlags();
}
}

Expand All @@ -248,12 +317,8 @@ TimeValue recoverAfterTime() {
return recoverAfterTime;
}

private void runRecovery() {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask());
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}
}
Loading