Skip to content

Commit

Permalink
Assert no exceptions during state application (#47090)
Browse files Browse the repository at this point in the history
Today we log and swallow exceptions during cluster state application, but such
an exception should not occur. This commit adds assertions of this fact, and
updates the Javadocs to explain it.

Relates #47038
  • Loading branch information
DaveCTurner committed Sep 25, 2019
1 parent a439743 commit a8376f4
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
public interface ClusterStateApplier {

/**
* Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied
* Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied. The cluster state to be applied is already
* committed when this method is called, so an applier must therefore be prepared to deal with any state it receives without throwing
* an exception. Throwing an exception from an applier is very bad because it will stop the application of this state before it has
* reached all the other appliers, and will likely result in another attempt to apply the same (or very similar) cluster state which
* might continue until this node is removed from the cluster.
*/
void applyClusterState(ClusterChangedEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private static boolean assertNotCalledFromClusterStateApplier(String reason) {
return true;
}

protected void runTask(UpdateTask task) {
private void runTask(UpdateTask task) {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
return;
Expand Down Expand Up @@ -447,6 +447,9 @@ protected void runTask(UpdateTask task) {
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
}
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
// continue we will retry with the same cluster state but that might not help.
assert applicationMayFail();
task.listener.onFailure(task.source, e);
}
}
Expand Down Expand Up @@ -667,4 +670,8 @@ protected long currentTimeInMillis() {
return threadPool.relativeTimeInMillis();
}

// overridden by tests that need to check behaviour in the event of an application failure
protected boolean applicationMayFail() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ public synchronized Settings applySettings(Settings newSettings) {
} catch (Exception ex) {
logger.warn("failed to apply settings", ex);
throw ex;
} finally {
}
return lastSettingsApplied = newSettings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,23 +532,32 @@ private void updateIndices(ClusterChangedEvent event) {
final IndexMetaData newIndexMetaData = state.metaData().index(index);
assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices";
if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) {
indexService.updateMetaData(currentIndexMetaData, newIndexMetaData);
String reason = null;
try {
reason = "metadata update failed";
try {
indexService.updateMetaData(currentIndexMetaData, newIndexMetaData);
} catch (Exception e) {
assert false : e;
throw e;
}

reason = "mapping update failed";
if (indexService.updateMapping(currentIndexMetaData, newIndexMetaData) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetaData.getIndex().getName(),
newIndexMetaData.getIndexUUID(), state.nodes().getLocalNodeId())
);
}
} catch (Exception e) {
indicesService.removeIndex(indexService.index(), FAILURE, "removing index (mapping update failed)");
indicesService.removeIndex(indexService.index(), FAILURE, "removing index (" + reason + ")");

// fail shards that would be created or updated by createOrUpdateShards
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (localRoutingNode != null) {
for (final ShardRouting shardRouting : localRoutingNode) {
if (shardRouting.index().equals(index) && failedShardsCache.containsKey(shardRouting.shardId()) == false) {
sendFailShard(shardRouting, "failed to update mapping for index", e, state);
sendFailShard(shardRouting, "failed to update index (" + reason + ")", e, state);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ public void testAckListenerReceivesNackFromFollower() {
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);

follower0.allowClusterStateApplicationFailure();
follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
Expand All @@ -605,6 +606,7 @@ public void testAckListenerReceivesNackFromLeader() {
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
final long startingTerm = leader.coordinator.getCurrentTerm();

leader.allowClusterStateApplicationFailure();
leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public void testClusterStateApplierBubblesUpExceptionsInApplier() throws Interru
clusterApplierService.addStateApplier(event -> {
throw new RuntimeException("dummy exception");
});
clusterApplierService.allowClusterStateApplicationFailure();

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
Expand Down Expand Up @@ -386,6 +387,7 @@ public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws
AtomicReference<Throwable> error = new AtomicReference<>();
clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
v -> {});
clusterApplierService.allowClusterStateApplicationFailure();

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state())
Expand Down Expand Up @@ -496,6 +498,7 @@ static class TimedClusterApplierService extends ClusterApplierService {

final ClusterSettings clusterSettings;
volatile Long currentTimeOverride = null;
boolean applicationMayFail;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super("test_node", settings, clusterSettings, threadPool);
Expand All @@ -509,6 +512,15 @@ protected long currentTimeInMillis() {
}
return super.currentTimeInMillis();
}

@Override
protected boolean applicationMayFail() {
return this.applicationMayFail;
}

void allowClusterStateApplicationFailure() {
this.applicationMayFail = true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,10 @@ void applyInitialConfiguration() {
private boolean isNotUsefullyBootstrapped() {
return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false;
}

void allowClusterStateApplicationFailure() {
clusterApplierService.allowClusterStateApplicationFailure();
}
}

private List<TransportAddress> provideSeedHosts(SeedHostsProvider.HostsResolver ignored) {
Expand Down Expand Up @@ -1280,6 +1284,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
private final String nodeName;
private final DeterministicTaskQueue deterministicTaskQueue;
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
private boolean applicationMayFail;

DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
Expand Down Expand Up @@ -1324,6 +1329,15 @@ public void onNewClusterState(String source, Supplier<ClusterState> clusterState
protected void connectToNodesAndWait(ClusterState newClusterState) {
// don't do anything, and don't block
}

@Override
protected boolean applicationMayFail() {
return this.applicationMayFail;
}

void allowClusterStateApplicationFailure() {
this.applicationMayFail = true;
}
}

protected DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {
Expand Down

0 comments on commit a8376f4

Please sign in to comment.