From a8376f44a88cf5c1356751430640d025f41706ab Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 25 Sep 2019 12:31:36 +0100 Subject: [PATCH] Assert no exceptions during state application (#47090) Today we log and swallow exceptions during cluster state application, but such an exception should not occur. This commit adds assertions of this fact, and updates the Javadocs to explain it. Relates #47038 --- .../cluster/ClusterStateApplier.java | 6 +++++- .../cluster/service/ClusterApplierService.java | 9 ++++++++- .../common/settings/AbstractScopedSettings.java | 1 - .../cluster/IndicesClusterStateService.java | 15 ++++++++++++--- .../cluster/coordination/CoordinatorTests.java | 2 ++ .../service/ClusterApplierServiceTests.java | 12 ++++++++++++ .../coordination/AbstractCoordinatorTestCase.java | 14 ++++++++++++++ 7 files changed, 53 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java index c339a8ed97e7a..ad983f43bf2cf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java @@ -28,7 +28,11 @@ public interface ClusterStateApplier { /** - * Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied + * Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied. The cluster state to be applied is already + * committed when this method is called, so an applier must therefore be prepared to deal with any state it receives without throwing + * an exception. Throwing an exception from an applier is very bad because it will stop the application of this state before it has + * reached all the other appliers, and will likely result in another attempt to apply the same (or very similar) cluster state which + * might continue until this node is removed from the cluster. */ void applyClusterState(ClusterChangedEvent event); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index eb41e710cca67..f5bbe2d420be9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -390,7 +390,7 @@ private static boolean assertNotCalledFromClusterStateApplier(String reason) { return true; } - protected void runTask(UpdateTask task) { + private void runTask(UpdateTask task) { if (!lifecycle.started()) { logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source); return; @@ -447,6 +447,9 @@ protected void runTask(UpdateTask task) { "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]", executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e); } + // failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we + // continue we will retry with the same cluster state but that might not help. + assert applicationMayFail(); task.listener.onFailure(task.source, e); } } @@ -667,4 +670,8 @@ protected long currentTimeInMillis() { return threadPool.relativeTimeInMillis(); } + // overridden by tests that need to check behaviour in the event of an application failure + protected boolean applicationMayFail() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 9d3e278e889f6..9db52b9eb9301 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -193,7 +193,6 @@ public synchronized Settings applySettings(Settings newSettings) { } catch (Exception ex) { logger.warn("failed to apply settings", ex); throw ex; - } finally { } return lastSettingsApplied = newSettings; } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 11b7f03da1042..21cf49a949dfa 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -532,8 +532,17 @@ private void updateIndices(ClusterChangedEvent event) { final IndexMetaData newIndexMetaData = state.metaData().index(index); assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices"; if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) { - indexService.updateMetaData(currentIndexMetaData, newIndexMetaData); + String reason = null; try { + reason = "metadata update failed"; + try { + indexService.updateMetaData(currentIndexMetaData, newIndexMetaData); + } catch (Exception e) { + assert false : e; + throw e; + } + + reason = "mapping update failed"; if (indexService.updateMapping(currentIndexMetaData, newIndexMetaData) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetaData.getIndex().getName(), @@ -541,14 +550,14 @@ private void updateIndices(ClusterChangedEvent event) { ); } } catch (Exception e) { - indicesService.removeIndex(indexService.index(), FAILURE, "removing index (mapping update failed)"); + indicesService.removeIndex(indexService.index(), FAILURE, "removing index (" + reason + ")"); // fail shards that would be created or updated by createOrUpdateShards RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); if (localRoutingNode != null) { for (final ShardRouting shardRouting : localRoutingNode) { if (shardRouting.index().equals(index) && failedShardsCache.containsKey(shardRouting.shardId()) == false) { - sendFailShard(shardRouting, "failed to update mapping for index", e, state); + sendFailShard(shardRouting, "failed to update index (" + reason + ")", e, state); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index f968f6f4742d6..1735c8ba033ea 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -586,6 +586,7 @@ public void testAckListenerReceivesNackFromFollower() { final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + follower0.allowClusterStateApplicationFailure(); follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); @@ -605,6 +606,7 @@ public void testAckListenerReceivesNackFromLeader() { final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); final long startingTerm = leader.coordinator.getCurrentTerm(); + leader.allowClusterStateApplicationFailure(); leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 4da5de7941ecc..a851cb7069ec0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -358,6 +358,7 @@ public void testClusterStateApplierBubblesUpExceptionsInApplier() throws Interru clusterApplierService.addStateApplier(event -> { throw new RuntimeException("dummy exception"); }); + clusterApplierService.allowClusterStateApplicationFailure(); CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), @@ -386,6 +387,7 @@ public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws AtomicReference error = new AtomicReference<>(); clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, v -> {}); + clusterApplierService.allowClusterStateApplicationFailure(); CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()) @@ -496,6 +498,7 @@ static class TimedClusterApplierService extends ClusterApplierService { final ClusterSettings clusterSettings; volatile Long currentTimeOverride = null; + boolean applicationMayFail; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { super("test_node", settings, clusterSettings, threadPool); @@ -509,6 +512,15 @@ protected long currentTimeInMillis() { } return super.currentTimeInMillis(); } + + @Override + protected boolean applicationMayFail() { + return this.applicationMayFail; + } + + void allowClusterStateApplicationFailure() { + this.applicationMayFail = true; + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 0bf7e2ed2167f..cf5f719589dca 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1170,6 +1170,10 @@ void applyInitialConfiguration() { private boolean isNotUsefullyBootstrapped() { return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false; } + + void allowClusterStateApplicationFailure() { + clusterApplierService.allowClusterStateApplicationFailure(); + } } private List provideSeedHosts(SeedHostsProvider.HostsResolver ignored) { @@ -1280,6 +1284,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService { private final String nodeName; private final DeterministicTaskQueue deterministicTaskQueue; ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; + private boolean applicationMayFail; DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, DeterministicTaskQueue deterministicTaskQueue, Function runnableWrapper) { @@ -1324,6 +1329,15 @@ public void onNewClusterState(String source, Supplier clusterState protected void connectToNodesAndWait(ClusterState newClusterState) { // don't do anything, and don't block } + + @Override + protected boolean applicationMayFail() { + return this.applicationMayFail; + } + + void allowClusterStateApplicationFailure() { + this.applicationMayFail = true; + } } protected DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {