Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GatewayService #99994

Merged
merged 19 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 79 additions & 62 deletions server/src/main/java/org/elasticsearch/gateway/GatewayService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -30,14 +32,12 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(GatewayService.class);

Expand Down Expand Up @@ -81,8 +81,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final int recoverAfterDataNodes;
private final int expectedDataNodes;

private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
@Nullable
private SubscribableListener<Void> recoveryPlanned;

@Inject
public GatewayService(
Expand Down Expand Up @@ -131,8 +131,14 @@ public void clusterChanged(final ClusterChangedEvent event) {
}

final ClusterState state = event.state();
final DiscoveryNodes nodes = state.nodes();

if (nodes.getMasterNodeId() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is already covered by the check on nodes.isLocalNodeElectedMaster().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I kept it because it has a separate logging message. I have now moved it inside isLocalNodeElectedMaster check to retain the logging message.

logger.debug("not recovering from gateway, no master elected yet");
return;
}

if (state.nodes().isLocalNodeElectedMaster() == false) {
if (nodes.isLocalNodeElectedMaster() == false) {
// not our job to recover
return;
}
Expand All @@ -141,77 +147,92 @@ public void clusterChanged(final ClusterChangedEvent event) {
return;
}

final DiscoveryNodes nodes = state.nodes();
if (state.nodes().getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this condition into the per-term check too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the check inside the new PendingStateRecovery class before scheduling the recovery. I assume this is what you mean by "per-term", it is not about checking it inside ClusterStateUpdateTask#execute. The former allows us to keep the same semantics as of today, i.e. no action at all until the required number of data nodes is met.

logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
nodes.getDataNodes().size(),
recoverAfterDataNodes
);
} else {
boolean enforceRecoverAfterTime;
String reason;
if (expectedDataNodes == -1) {
// no expected is set, honor recover_after_data_nodes
enforceRecoverAfterTime = true;
reason = "recover_after_time was set to [" + recoverAfterTime + "]";
} else if (expectedDataNodes <= nodes.getDataNodes().size()) {
// expected is set and satisfied so recover immediately
enforceRecoverAfterTime = false;
reason = "";
return;
}

// At this point, we know this node is qualified for state recovery
// But we still need to check whether a previous one is running already
final SubscribableListener<Void> thisRecoveryPlanned;
synchronized (this) {
if (recoveryPlanned == null) {
recoveryPlanned = thisRecoveryPlanned = new SubscribableListener<>();
} else {
// expected is set but not satisfied so wait until it is satisfied or times out
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
thisRecoveryPlanned = null;
}
performStateRecovery(enforceRecoverAfterTime, reason);
}
}

private void performStateRecovery(final boolean enforceRecoverAfterTime, final String reason) {
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery failed", e);
resetRecoveredFlags();
}

@Override
protected void doRun() {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
}
}
}, recoverAfterTime, threadPool.generic());
if (thisRecoveryPlanned == null) {
logger.debug("state recovery is in progress");
return;
}

// This node is ready to schedule state recovery
thisRecoveryPlanned.addListener(new ActionListener<>() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to use andThen but it does not expose exception to the consumer while we need the exception here to handle timeout.

@Override
public void onResponse(Void ignore) {
runRecovery();
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {
try {
logger.debug("performing state recovery...");

@Override
public void onFailure(Exception e) {
if (e instanceof ElasticsearchTimeoutException) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
} else {
onUnexpectedFailure(e);
}
}

private void onUnexpectedFailure(Exception e) {
logger.warn("state recovery failed", e);
resetState();
}

private void runRecovery() {
try {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(this::resetState));
} catch (Exception e) {
logger.warn("state recovery failed", e);
resetRecoveredFlags();
onUnexpectedFailure(e);
}
}
}
}

private void resetRecoveredFlags() {
recoveryInProgress.set(false);
scheduledRecovery.set(false);
private void resetState() {
synchronized (GatewayService.this) {
assert recoveryPlanned == thisRecoveryPlanned;
recoveryPlanned = null;
}
}
});

if (recoverAfterTime == null) {
logger.debug("performing state recovery, no delay time is configured");
thisRecoveryPlanned.onResponse(null);
} else if (expectedDataNodes != -1 && expectedDataNodes <= nodes.getDataNodes().size()) {
logger.debug("performing state recovery, expected data nodes [{}] is reached", expectedDataNodes);
thisRecoveryPlanned.onResponse(null);
} else {
final String reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
thisRecoveryPlanned.addTimeout(recoverAfterTime, threadPool, threadPool.generic());
}
}

private static final String TASK_SOURCE = "local-gateway-elected-state";

class RecoverStateUpdateTask extends ClusterStateUpdateTask {

private final Runnable runAfter;

RecoverStateUpdateTask(Runnable runAfter) {
this.runAfter = runAfter;
}

@Override
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
Expand All @@ -228,7 +249,7 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
runAfter.run();
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.noop());
}

Expand All @@ -239,7 +260,7 @@ public void onFailure(final Exception e) {
() -> "unexpected failure during [" + TASK_SOURCE + "]",
e
);
resetRecoveredFlags();
runAfter.run();
}
}

Expand All @@ -248,10 +269,6 @@ TimeValue recoverAfterTime() {
return recoverAfterTime;
}

private void runRecovery() {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask());
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testDefaultRecoverAfterTime() {

public void testRecoverStateUpdateTask() throws Exception {
GatewayService service = createService(Settings.builder());
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask();
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(() -> {});
String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(
settings(IndexVersion.current()).put(masterNode()).build(),
Expand Down