Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean GatewayAllocator when stepping down as master #38885

Merged
merged 3 commits into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalClusterUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Settings settings;
private final TransportService transportService;
private final MasterService masterService;
private final AllocationService allocationService;
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
Expand Down Expand Up @@ -143,6 +145,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
Expand Down Expand Up @@ -497,6 +500,7 @@ void becomeCandidate(String method) {
method, getCurrentTerm(), mode, lastKnownLeader);

if (mode != Mode.CANDIDATE) {
final Mode prevMode = mode;
mode = Mode.CANDIDATE;
cancelActivePublication("become candidate: " + method);
joinAccumulator.close(mode);
Expand All @@ -512,6 +516,10 @@ void becomeCandidate(String method) {
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
lagDetector.clearTrackedNodes();

if (prevMode == Mode.LEADER) {
cleanMasterService();
}

if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
Expand Down Expand Up @@ -547,6 +555,7 @@ void becomeLeader(String method) {
void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible";
assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)";

if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) {
logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}",
Expand Down Expand Up @@ -581,6 +590,26 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
lagDetector.clearTrackedNodes();
}

private void cleanMasterService() {
masterService.submitStateUpdateTask("clean-up after stepping down as master",
new LocalClusterUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
// ignore
logger.trace("failed to clean-up after stepping down as master", e);
}

@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) {
if (currentState.nodes().isLocalNodeElectedMaster() == false) {
allocationService.cleanCaches();
}
return unchanged();
}

});
}

private PreVoteResponse getPreVoteResponse() {
return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(),
coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
.build();
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
allocationService.cleanCaches();
tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState);
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ protected long currentNanoTime() {
return System.nanoTime();
}

public void cleanCaches() {
gatewayAllocator.cleanCaches();
}

/**
* this class is used to describe results of applying a set of
* {@link org.elasticsearch.cluster.routing.allocation.command.AllocationCommand}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -55,28 +53,19 @@ public class GatewayAllocator {
asyncFetchStore = ConcurrentCollections.newConcurrentMap();

@Inject
public GatewayAllocator(ClusterService clusterService, RoutingService routingService,
TransportNodesListGatewayStartedShards startedAction, TransportNodesListShardStoreMetaData storeAction) {
public GatewayAllocator(RoutingService routingService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetaData storeAction) {
this.routingService = routingService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
clusterService.addStateApplier(event -> {
boolean cleanCache = false;
DiscoveryNode localNode = event.state().nodes().getLocalNode();
if (localNode != null) {
if (localNode.isMasterNode() && event.localNodeMaster() == false) {
cleanCache = true;
}
} else {
cleanCache = true;
}
if (cleanCache) {
Releasables.close(asyncFetchStarted.values());
asyncFetchStarted.clear();
Releasables.close(asyncFetchStore.values());
asyncFetchStore.clear();
}
});
}

public void cleanCaches() {
Releasables.close(asyncFetchStarted.values());
asyncFetchStarted.clear();
Releasables.close(asyncFetchStore.values());
asyncFetchStore.clear();
}

// for tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -577,4 +579,16 @@ public Settings onNodeStopped(String nodeName) throws Exception {
// start another node so cluster consistency checks won't time out due to the lack of state
internalCluster().startNode();
}

public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
internalCluster().startNodes(3,
Settings.builder().put(ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING.getKey(),
"2ms").build());
createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms").build());
ensureGreen("test");
internalCluster().fullRestart();
ensureGreen("test");
}
}