Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GatewayService #99994

Merged
merged 19 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 145 additions & 69 deletions server/src/main/java/org/elasticsearch/gateway/GatewayService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(GatewayService.class);

Expand Down Expand Up @@ -80,9 +80,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final TimeValue recoverAfterTime;
private final int recoverAfterDataNodes;
private final int expectedDataNodes;

private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
volatile PendingStateRecovery currentPendingStateRecovery;

@Inject
public GatewayService(
Expand Down Expand Up @@ -131,8 +129,13 @@ public void clusterChanged(final ClusterChangedEvent event) {
}

final ClusterState state = event.state();
final DiscoveryNodes nodes = state.nodes();

if (state.nodes().isLocalNodeElectedMaster() == false) {
if (nodes.isLocalNodeElectedMaster() == false) {
if (nodes.getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
return;
}
// not our job to recover
return;
}
Expand All @@ -141,83 +144,162 @@ public void clusterChanged(final ClusterChangedEvent event) {
return;
}

final DiscoveryNodes nodes = state.nodes();
if (state.nodes().getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
nodes.getDataNodes().size(),
recoverAfterDataNodes
);
} else {
boolean enforceRecoverAfterTime;
String reason;
if (expectedDataNodes == -1) {
// no expected is set, honor recover_after_data_nodes
enforceRecoverAfterTime = true;
reason = "recover_after_time was set to [" + recoverAfterTime + "]";
} else if (expectedDataNodes <= nodes.getDataNodes().size()) {
// expected is set and satisfied so recover immediately
enforceRecoverAfterTime = false;
reason = "";
} else {
// expected is set but not satisfied so wait until it is satisfied or times out
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
}
performStateRecovery(enforceRecoverAfterTime, reason);
// At this point, we know the state is not recovered and this node is qualified for state recovery
// But we still need to check whether a previous one is running already
final long currentTerm = state.term();
final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery;

// Always start a new state recovery if the master term changes
// If there is a previous one still waiting, both will probably run but at most one of them will
// actually make changes to cluster state because either:
// 1. The previous recovers the cluster state and the current one will be skipped
// 2. The previous one sees a new cluster term and skips its own execution
if (existingPendingStateRecovery == null || existingPendingStateRecovery.expectedTerm < currentTerm) {
currentPendingStateRecovery = new PendingStateRecovery(currentTerm);
}
currentPendingStateRecovery.onDataNodeSize(nodes.getDataNodes().size());
}

private void performStateRecovery(final boolean enforceRecoverAfterTime, final String reason) {
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery failed", e);
resetRecoveredFlags();
}
/**
* This class manages the cluster state recovery behaviours. It has two major scenarios depending
* on whether {@code recoverAfterDataNodes} is configured.
*
* <p> <b>When</b> {@code recoverAfterDataNodes} is configured:
* <ol>
* <li>Nothing can happen until it is reached
* <li>When {@code recoverAfterDataNodes} is reached, the cluster either:
* <ul>
* <li>Recover immediately when {@code expectedDataNodes} is reached or
* both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
* <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
* </ul>
* <li>The scheduled recovery can be cancelled if {@code recoverAfterDataNodes} drops below required number
* before the recovery can happen. When this happens, the process goes back to the beginning (step 1).
* <li>The recovery is scheduled only once (??) each time {@code recoverAfterDataNodes} crosses the required number
* </ol>
*
* <p> <b>When</b> {@code recoverAfterDataNodes} is <b>Not</b> configured, the cluster either:
* <ul>
* <li>Recover immediately when {@code expectedDataNodes} is reached or
* both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
* <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
* </ul>
*/
class PendingStateRecovery {
private final long expectedTerm;
@Nullable
private volatile Scheduler.ScheduledCancellable scheduledRecovery;

@Override
protected void doRun() {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
}
}
}, recoverAfterTime, threadPool.generic());
PendingStateRecovery(long expectedTerm) {
this.expectedTerm = expectedTerm;
}

void onDataNodeSize(int currentDataNodeSize) {
if (recoverAfterDataNodes != -1 && currentDataNodeSize < recoverAfterDataNodes) {
logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
currentDataNodeSize,
recoverAfterDataNodes
);
cancelScheduledRecovery();
} else {
maybePerformOrScheduleRecovery(currentDataNodeSize);
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {
try {
logger.debug("performing state recovery...");
runRecovery();
} catch (Exception e) {
logger.warn("state recovery failed", e);
resetRecoveredFlags();
}

void maybePerformOrScheduleRecovery(int currentDataNodeSize) {
if (expectedDataNodes != -1 && expectedDataNodes <= currentDataNodeSize) {
logger.debug(
"performing state recovery of term [{}], expected data nodes [{}] is reached",
expectedTerm,
expectedDataNodes
);
runRecoveryImmediately();
} else if (recoverAfterTime == null) {
logger.debug("performing state recovery of term [{}], no delay time is configured", expectedTerm);
runRecoveryImmediately();
} else {
if (scheduledRecovery == null) {
logger.info(
"delaying initial state recovery for [{}] of term [{}]. expecting [{}] data nodes, but only have [{}]",
recoverAfterTime,
expectedTerm,
expectedDataNodes,
currentDataNodeSize
);
scheduledRecovery = threadPool.schedule(getScheduleTask(), recoverAfterTime, threadPool.generic());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner There are still some complexities with this block of code and other related areas.

  1. This class has no state to remember that it has attempt to recover immediately. Therefore if the cluster can recover immediately, it can potentially submit multiple cluster update tasks. Is it a problem?
  2. For the scheduled case, we can mostly avoid multiple schedules by checking whether scheduledRecovery is null. There can still be edge cases where we can schedule more than once due to racing between checking scheduledRecovery and reset it back to null. If submitting multiple update tasks isn't an issue, we may also chose to not check it all and just always schedule?
  3. Because we need to reset scheduledRecovery back to null in the scheduled runnable, it needs to be made volatile as well.

Do we need to address the 1st point? If so, it seems we need another state variable for it. I forgot to mention it during the sync but this was one of the original complexity. Also because the ClusterStateUpdateTask may not run inside execute due to dataNodeSize dropping again, the state needs to be reset from within the task which brings back the need of passing a "runAfter" into the task. To simplify things, I think we don't want to check dataNodeSize again inside the task. It's an edge case anyway and dropping it makes things simpler. But we will still need some other state management if we want to address the 1st point. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Good point. I think we should only ever submit one cluster state update task per term, so we ought to track this with a flag within the per-term state.

  2. I would not expect any races here, or rather I think if we keep track of whether we've submitted the cluster state update task then that solves those races.

  3. Good point, although since we only do that when actually submitting the task again I think the solution is to make this submission a once-only thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed 25bbeba to add a new state variable (AtomicBoolean) solved multiple issues that I had. Thanks!

Please let me know if the main code looks good to you. I'll proceed to add some more tests if you are happy with the main code changes. Please also let me know if you have any ideas for what kinda tests we might need. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now added multiple tests to cover different scenarios in bf8f21a
Now the whole thing looks ready to me.

} else {
logger.debug("state recovery is in already scheduled for term [{}]", expectedTerm);
}
}
}
}

private void resetRecoveredFlags() {
recoveryInProgress.set(false);
scheduledRecovery.set(false);
private AbstractRunnable getScheduleTask() {
return new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery of term [" + expectedTerm + "] failed", e);
}

@Override
protected void doRun() {
final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery;
if (PendingStateRecovery.this == existingPendingStateRecovery) {
// Reset this back to null so that it can be scheduled again in case the cluster
// update task ends up not running due to recoverAfterDataNodes not met
scheduledRecovery = null;
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(expectedTerm));
} else {
logger.debug(
"skip scheduled state recovery since a new one of term [{}] is ongoing",
existingPendingStateRecovery.expectedTerm
);
}
}
};
}

void runRecoveryImmediately() {
cancelScheduledRecovery();
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(expectedTerm));
}

void cancelScheduledRecovery() {
if (scheduledRecovery != null) {
scheduledRecovery.cancel();
scheduledRecovery = null;
}
}
}

private static final String TASK_SOURCE = "local-gateway-elected-state";

class RecoverStateUpdateTask extends ClusterStateUpdateTask {

private final long expectedTerm;

RecoverStateUpdateTask(long expectedTerm) {
this.expectedTerm = expectedTerm;
}

@Override
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
logger.debug("cluster is already recovered");
return currentState;
}
if (expectedTerm != currentState.term()) {
logger.debug("skip state recovery since current term [{}] != expected term [{}]", currentState.term(), expectedTerm);
return currentState;
}
if (recoverAfterDataNodes != -1 && currentState.nodes().getDataNodes().size() < recoverAfterDataNodes) {
logger.debug(
"skip state recovery since data node size [{}] is less than required number [{}]",
currentState.nodes().getDataNodes().size(),
recoverAfterDataNodes
);
return currentState;
}
return ClusterStateUpdaters.removeStateNotRecoveredBlock(
ClusterStateUpdaters.updateRoutingTable(currentState, shardRoutingRoleStrategy)
);
Expand All @@ -228,7 +310,6 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.noop());
}

Expand All @@ -239,7 +320,6 @@ public void onFailure(final Exception e) {
() -> "unexpected failure during [" + TASK_SOURCE + "]",
e
);
resetRecoveredFlags();
}
}

Expand All @@ -248,12 +328,8 @@ TimeValue recoverAfterTime() {
return recoverAfterTime;
}

private void runRecovery() {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask());
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.elasticsearch.cluster.TestShardRoutingRoleStrategies;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -68,17 +71,10 @@ public void testDefaultRecoverAfterTime() {

public void testRecoverStateUpdateTask() throws Exception {
GatewayService service = createService(Settings.builder());
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask();
String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(
settings(IndexVersion.current()).put(masterNode()).build(),
new TransportAddress(TransportAddress.META_ADDRESS, 9300),
nodeId
);
ClusterState stateWithBlock = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).build())
.build();
final long expectedTerm = randomLongBetween(1, 42);
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(expectedTerm);

ClusterState stateWithBlock = buildClusterState(1, expectedTerm);

ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock);
assertNotEquals(recoveredState, stateWithBlock);
Expand All @@ -88,4 +84,38 @@ public void testRecoverStateUpdateTask() throws Exception {
assertSame(recoveredState, clusterState);
}

public void testRecoveryWillAbortIfExpectedTermDoesNotMatch() throws Exception {
GatewayService service = createService(Settings.builder());
final long expectedTerm = randomLongBetween(1, 42);
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(expectedTerm);

ClusterState stateWithBlock = buildClusterState(1, randomLongBetween(43, 99));

ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock);
assertSame(recoveredState, stateWithBlock);
}

private ClusterState buildClusterState(int numberOfNodes, long expectedTerm) {
assert numberOfNodes >= 1;
final String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(
settings(IndexVersion.current()).put(masterNode()).build(),
new TransportAddress(TransportAddress.META_ADDRESS, 9300),
nodeId
);
final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder()
.localNodeId(nodeId)
.masterNodeId(nodeId)
.add(masterNode);
for (int i = 1; i < numberOfNodes; i++) {
discoveryNodesBuilder.add(DiscoveryNodeUtils.create("node-" + i, randomAlphaOfLength(10)));
}

ClusterState stateWithBlock = ClusterState.builder(ClusterName.DEFAULT)
.nodes(discoveryNodesBuilder.build())
.metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(expectedTerm).build()).build())
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).build())
.build();
return stateWithBlock;
}
}