Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GatewayService #99994

Merged
merged 19 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 99 additions & 64 deletions server/src/main/java/org/elasticsearch/gateway/GatewayService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -30,14 +32,12 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(GatewayService.class);

Expand Down Expand Up @@ -80,9 +80,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
private final TimeValue recoverAfterTime;
private final int recoverAfterDataNodes;
private final int expectedDataNodes;

private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
private PendingStateRecovery pendingStateRecovery = new PendingStateRecovery(0);

@Inject
public GatewayService(
Expand Down Expand Up @@ -131,8 +129,14 @@ public void clusterChanged(final ClusterChangedEvent event) {
}

final ClusterState state = event.state();
final DiscoveryNodes nodes = state.nodes();

if (state.nodes().isLocalNodeElectedMaster() == false) {
if (nodes.getMasterNodeId() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is already covered by the check on nodes.isLocalNodeElectedMaster().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I kept it because it has a separate logging message. I have now moved it inside isLocalNodeElectedMaster check to retain the logging message.

logger.debug("not recovering from gateway, no master elected yet");
return;
}

if (nodes.isLocalNodeElectedMaster() == false) {
// not our job to recover
return;
}
Expand All @@ -141,77 +145,112 @@ public void clusterChanged(final ClusterChangedEvent event) {
return;
}

final DiscoveryNodes nodes = state.nodes();
if (state.nodes().getMasterNodeId() == null) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this condition into the per-term check too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the check inside the new PendingStateRecovery class before scheduling the recovery. I assume this is what you mean by "per-term", it is not about checking it inside ClusterStateUpdateTask#execute. The former allows us to keep the same semantics as of today, i.e. no action at all until the required number of data nodes is met.

logger.debug(
"not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]",
nodes.getDataNodes().size(),
recoverAfterDataNodes
);
} else {
boolean enforceRecoverAfterTime;
String reason;
if (expectedDataNodes == -1) {
// no expected is set, honor recover_after_data_nodes
enforceRecoverAfterTime = true;
reason = "recover_after_time was set to [" + recoverAfterTime + "]";
} else if (expectedDataNodes <= nodes.getDataNodes().size()) {
// expected is set and satisfied so recover immediately
enforceRecoverAfterTime = false;
reason = "";
} else {
// expected is set but not satisfied so wait until it is satisfied or times out
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
}
performStateRecovery(enforceRecoverAfterTime, reason);
return;
}

// At this point, we know the state is not recovered and this node is qualified for state recovery
// But we still need to check whether a previous one is running already
final long currentTerm = state.term();
if (pendingStateRecovery.term < currentTerm) {
// Always start a new state recovery if the master term changes
// If there is a previous one still waiting, both will run but at most one of them will
// actually make changes to cluster state
pendingStateRecovery = new PendingStateRecovery(currentTerm);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the code always schedule a new state recovery if it sees a new term (and the cluster state does need recovery). Not sure if this is what you mean by

I suggested better if we started a new timeout when a new master is elected

This code does not try to cancel the previous pending state recovery since it can only be a best effort. The task may have already been submitted when we try to cancel it. Please let me know if you disagree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's the direction of which I was thinking indeed. Looking better.

I think I'd like the RecoverStateUpdateTask to remember the corresponding term with which it was registered, and verify that the term hasn't changed before it does anything. That way we don't need to worry about spurious recoveries from older terms that are no longer correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. I added the term check.

}
assert pendingStateRecovery.term == currentTerm;
pendingStateRecovery.maybeStart(nodes.getDataNodes().size());
}

private void performStateRecovery(final boolean enforceRecoverAfterTime, final String reason) {
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("delayed state recovery failed", e);
resetRecoveredFlags();
}
class PendingStateRecovery {
private final long term;
@Nullable
private SubscribableListener<Void> recoveryPlanned;

@Override
protected void doRun() {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
runRecovery();
}
}
}, recoverAfterTime, threadPool.generic());
PendingStateRecovery(long term) {
this.term = term;
}

void maybeStart(int dataNodeSize) {
final SubscribableListener<Void> thisRecoveryPlanned;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reflection doing this with a SubscribableListener actually seems more awkward than just sticking with the original threadPool.schedule.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I changed it to use just threadPool.schedule. It might make SubscribableListener more suitable for this use case if it does not have to use Exception to signal timeout.

synchronized (this) {
if (recoveryPlanned == null) {
recoveryPlanned = thisRecoveryPlanned = new SubscribableListener<>();
} else {
thisRecoveryPlanned = null;
}
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {
try {
logger.debug("performing state recovery...");

if (thisRecoveryPlanned == null) {
logger.debug("state recovery is in progress for term [{}]", term);
return;
}
recoveryPlanned.addListener(new ActionListener<>() {
@Override
public void onResponse(Void ignore) {
runRecovery();
} catch (Exception e) {
logger.warn("state recovery failed", e);
resetRecoveredFlags();
}

@Override
public void onFailure(Exception e) {
if (e instanceof ElasticsearchTimeoutException) {
logger.info("recover_after_time [{}] elapsed. performing state recovery of term [{}]", recoverAfterTime, term);
runRecovery();
} else {
onUnexpectedFailure(e);
}
}

private void onUnexpectedFailure(Exception e) {
logger.warn("state recovery of term [" + term + "] failed", e);
resetState();
}

private void runRecovery() {
try {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(this::resetState));
} catch (Exception e) {
onUnexpectedFailure(e);
}
}

private void resetState() {
synchronized (GatewayService.this) {
assert recoveryPlanned == thisRecoveryPlanned;
recoveryPlanned = null;
}
}
});

if (recoverAfterTime == null) {
logger.debug("performing state recovery of term [{}], no delay time is configured", term);
thisRecoveryPlanned.onResponse(null);
} else if (expectedDataNodes != -1 && expectedDataNodes <= dataNodeSize) {
logger.debug("performing state recovery of term [{}], expected data nodes [{}] is reached", term, expectedDataNodes);
thisRecoveryPlanned.onResponse(null);
} else {
final String reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + dataNodeSize + "]";
logger.info("delaying initial state recovery for [{}] of term [{}]. {}", recoverAfterTime, term, reason);
thisRecoveryPlanned.addTimeout(recoverAfterTime, threadPool, threadPool.generic());
}
}
}

private void resetRecoveredFlags() {
recoveryInProgress.set(false);
scheduledRecovery.set(false);
}

private static final String TASK_SOURCE = "local-gateway-elected-state";

class RecoverStateUpdateTask extends ClusterStateUpdateTask {

private final Runnable runAfter;

RecoverStateUpdateTask(Runnable runAfter) {
this.runAfter = runAfter;
}

@Override
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
Expand All @@ -228,7 +267,7 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
runAfter.run();
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.noop());
}

Expand All @@ -239,7 +278,7 @@ public void onFailure(final Exception e) {
() -> "unexpected failure during [" + TASK_SOURCE + "]",
e
);
resetRecoveredFlags();
runAfter.run();
}
}

Expand All @@ -248,10 +287,6 @@ TimeValue recoverAfterTime() {
return recoverAfterTime;
}

private void runRecovery() {
submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask());
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testDefaultRecoverAfterTime() {

public void testRecoverStateUpdateTask() throws Exception {
GatewayService service = createService(Settings.builder());
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask();
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(() -> {});
String nodeId = randomAlphaOfLength(10);
DiscoveryNode masterNode = DiscoveryNode.createLocal(
settings(IndexVersion.current()).put(masterNode()).build(),
Expand Down