Skip to content

Commit

Permalink
Refactor GatewayService
Browse files Browse the repository at this point in the history
This PR refactors GatewayService with the goal to make it easier to add
new features.

Resolves: elastic#89310
  • Loading branch information
ywangd committed Sep 28, 2023
1 parent ae17505 commit 786062a
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 63 deletions.
141 changes: 79 additions & 62 deletions server/src/main/java/org/elasticsearch/gateway/GatewayService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -30,14 +32,12 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(GatewayService.class);

Expand Down Expand Up @@ -81,8 +81,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final int recoverAfterDataNodes;
private final int expectedDataNodes;

private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
@Nullable
private SubscribableListener<Void> recoveryPlanned;

@Inject
public GatewayService(
Expand Down Expand Up @@ -131,8 +131,14 @@ public void clusterChanged(final ClusterChangedEvent event) {
}

final ClusterState state = event.state();
final DiscoveryNodes nodes = state.nodes();

if (nodes.getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
return;
}

if (state.nodes().isLocalNodeElectedMaster() == false) {
if (nodes.isLocalNodeElectedMaster() == false) {
// not our job to recover
return;
}
Expand All @@ -141,77 +147,92 @@ public void clusterChanged(final ClusterChangedEvent event) {
return;
}

final DiscoveryNodes nodes = state.nodes();
if (state.nodes().getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
nodes.getDataNodes().size(),
recoverAfterDataNodes
);
} else {
boolean enforceRecoverAfterTime;
String reason;
if (expectedDataNodes == -1) {
// no expected is set, honor recover_after_data_nodes
enforceRecoverAfterTime = true;
reason = "recover_after_time was set to [" + recoverAfterTime + "]";
} else if (expectedDataNodes <= nodes.getDataNodes().size()) {
// expected is set and satisfied so recover immediately
enforceRecoverAfterTime = false;
reason = "";
return;
}

// At this point, we know this node is qualified for state recovery
// But we still need to check whether a previous one is running already
final SubscribableListener<Void> thisRecoveryPlanned;
synchronized (this) {
if (recoveryPlanned == null) {
recoveryPlanned = thisRecoveryPlanned = new SubscribableListener<>();
} else {
// expected is set but not satisfied so wait until it is satisfied or times out
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
thisRecoveryPlanned = null;
}
performStateRecovery(enforceRecoverAfterTime, reason);
}
}

private void performStateRecovery(final boolean enforceRecoverAfterTime, final String reason) {
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery failed", e);
resetRecoveredFlags();
}

@Override
protected void doRun() {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
}
}
}, recoverAfterTime, threadPool.generic());
if (thisRecoveryPlanned == null) {
logger.debug("state recovery is in progress");
return;
}

// This node is ready to schedule state recovery
thisRecoveryPlanned.addListener(new ActionListener<>() {
@Override
public void onResponse(Void ignore) {
runRecovery();
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {
try {
logger.debug("performing state recovery...");

@Override
public void onFailure(Exception e) {
if (e instanceof ElasticsearchTimeoutException) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
} else {
onUnexpectedFailure(e);
}
}

private void onUnexpectedFailure(Exception e) {
logger.warn("state recovery failed", e);
resetState();
}

private void runRecovery() {
try {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(this::resetState));
} catch (Exception e) {
logger.warn("state recovery failed", e);
resetRecoveredFlags();
onUnexpectedFailure(e);
}
}
}
}

private void resetRecoveredFlags() {
recoveryInProgress.set(false);
scheduledRecovery.set(false);
private void resetState() {
synchronized (GatewayService.this) {
assert recoveryPlanned == thisRecoveryPlanned;
recoveryPlanned = null;
}
}
});

if (recoverAfterTime == null) {
logger.debug("performing state recovery, no delay time is configured");
thisRecoveryPlanned.onResponse(null);
} else if (expectedDataNodes != -1 && expectedDataNodes <= nodes.getDataNodes().size()) {
logger.debug("performing state recovery, expected data nodes [{}] is reached", expectedDataNodes);
thisRecoveryPlanned.onResponse(null);
} else {
final String reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
thisRecoveryPlanned.addTimeout(recoverAfterTime, threadPool, threadPool.generic());
}
}

private static final String TASK_SOURCE = "local-gateway-elected-state";

class RecoverStateUpdateTask extends ClusterStateUpdateTask {

private final Runnable runAfter;

RecoverStateUpdateTask(Runnable runAfter) {
this.runAfter = runAfter;
}

@Override
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
Expand All @@ -228,7 +249,7 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
runAfter.run();
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.noop());
}

Expand All @@ -239,7 +260,7 @@ public void onFailure(final Exception e) {
() -> "unexpected failure during [" + TASK_SOURCE + "]",
e
);
resetRecoveredFlags();
runAfter.run();
}
}

Expand All @@ -248,10 +269,6 @@ TimeValue recoverAfterTime() {
return recoverAfterTime;
}

private void runRecovery() {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask());
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testDefaultRecoverAfterTime() {

public void testRecoverStateUpdateTask() throws Exception {
GatewayService service = createService(Settings.builder());
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask();
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(() -> {});
String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(
settings(IndexVersion.current()).put(masterNode()).build(),
Expand Down

0 comments on commit 786062a

Please sign in to comment.