From a91e9b7ce27de063449bb6429f7f9223d5e6dc43 Mon Sep 17 00:00:00 2001 From: Yang Cheng <64069615+chengyang14@users.noreply.github.com> Date: Wed, 29 Apr 2020 18:54:58 +0800 Subject: [PATCH] Avoid double-recovery when state recovery delayed Today if state recovery is delayed by the `gateway.recover_after_*` settings then we may end up performing state recovery twice: once when enough nodes have joined the cluster, and again when the timeout elapses. The second state recovery reinitializes the routing table, effectively discarding all recovered/recovering shards and starting again from scratch. This commit adds a check to prevent this second state recovery. Closes #55564 --- .../elasticsearch/gateway/GatewayService.java | 6 +++ .../gateway/GatewayServiceTests.java | 49 ++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index 43bc841eb630b..635848d71ee5a 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -232,6 +232,7 @@ public void onFailure(final Exception e) { @Override protected void doRun() { + logger.debug("performing state recovery..."); recoveryRunnable.run(); } }); @@ -248,6 +249,11 @@ class RecoverStateUpdateTask extends ClusterStateUpdateTask { @Override public ClusterState execute(final ClusterState currentState) { + if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { + logger.debug("cluster is already recovered"); + return currentState; + } + final ClusterState newState = Function.identity() .andThen(ClusterStateUpdaters::updateRoutingTable) .andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock) diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index 41c6dc6f72ae7..22c26aee07a24 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -19,21 +19,49 @@ package org.elasticsearch.gateway; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; +import java.util.Arrays; +import java.util.HashSet; + +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.hasItem; + public class GatewayServiceTests extends ESTestCase { private GatewayService createService(final Settings.Builder settings) { final ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "GatewayServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null); - return new GatewayService(settings.build(), null, clusterService, null, null, null); + final AllocationService allocationService = new AllocationService(new AllocationDeciders(new HashSet<>( + Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), new ReplicaAfterPrimaryActiveAllocationDecider()))), + new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); + return new GatewayService(settings.build(), allocationService, clusterService, null, null, null); } public void testDefaultRecoverAfterTime() { @@ -69,4 +97,23 @@ public void testDeprecatedSettings() { assertSettingDeprecationsAndWarnings(new Setting[] {GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING }); } + public void testRecoverStateUpdateTask() throws Exception { + GatewayService service = createService(Settings.builder()); + ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(); + String nodeId = randomAlphaOfLength(10); + DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT) + .put(Node.NODE_MASTER_SETTING.getKey(), true).build(), + new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId); + ClusterState stateWithBlock = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()). + blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).build()).build(); + + ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock); + assertNotEquals(recoveredState, stateWithBlock); + assertThat(recoveredState.blocks().global(ClusterBlockLevel.METADATA_WRITE), not(hasItem(STATE_NOT_RECOVERED_BLOCK))); + + ClusterState clusterState = clusterStateUpdateTask.execute(recoveredState); + assertSame(recoveredState, clusterState); + } + }