Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assert no exceptions during state application #47090

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
public interface ClusterStateApplier {

/**
* Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied
* Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied. The cluster state to be applied is already
* committed when this method is called, so an applier must therefore be prepared to deal with any state it receives without throwing
* an exception. Throwing an exception from an applier is very bad because it will stop the application of this state before it has
* reached all the other appliers, and will likely result in another attempt to apply the same (or very similar) cluster state which
* might continue until this node is removed from the cluster.
*/
void applyClusterState(ClusterChangedEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private static boolean assertNotCalledFromClusterStateApplier(String reason) {
return true;
}

protected void runTask(UpdateTask task) {
private void runTask(UpdateTask task) {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
return;
Expand Down Expand Up @@ -447,6 +447,9 @@ protected void runTask(UpdateTask task) {
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
}
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
// continue we will retry with the same cluster state but that might not help.
assert applicationMayFail();
task.listener.onFailure(task.source, e);
}
}
Expand Down Expand Up @@ -661,4 +664,8 @@ protected long currentTimeInMillis() {
return threadPool.relativeTimeInMillis();
}

// overridden by tests that need to check behaviour in the event of an application failure
protected boolean applicationMayFail() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ public synchronized Settings applySettings(Settings newSettings) {
} catch (Exception ex) {
logger.warn("failed to apply settings", ex);
throw ex;
} finally {
}
return lastSettingsApplied = newSettings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,23 +586,32 @@ private void updateIndices(ClusterChangedEvent event) {
final IndexMetaData newIndexMetaData = state.metaData().index(index);
assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices";
if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) {
indexService.updateMetaData(currentIndexMetaData, newIndexMetaData);
String reason = null;
try {
reason = "metadata update failed";
try {
indexService.updateMetaData(currentIndexMetaData, newIndexMetaData);
} catch (Exception e) {
assert false : e;
throw e;
}

reason = "mapping update failed";
if (indexService.updateMapping(currentIndexMetaData, newIndexMetaData) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetaData.getIndex().getName(),
newIndexMetaData.getIndexUUID(), state.nodes().getLocalNodeId())
);
}
} catch (Exception e) {
indicesService.removeIndex(indexService.index(), FAILURE, "removing index (mapping update failed)");
indicesService.removeIndex(indexService.index(), FAILURE, "removing index (" + reason + ")");

// fail shards that would be created or updated by createOrUpdateShards
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (localRoutingNode != null) {
for (final ShardRouting shardRouting : localRoutingNode) {
if (shardRouting.index().equals(index) && failedShardsCache.containsKey(shardRouting.shardId()) == false) {
sendFailShard(shardRouting, "failed to update mapping for index", e, state);
sendFailShard(shardRouting, "failed to update index (" + reason + ")", e, state);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ public void testAckListenerReceivesNackFromFollower() {
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);

follower0.allowClusterStateApplicationFailure();
follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
Expand All @@ -604,6 +605,7 @@ public void testAckListenerReceivesNackFromLeader() {
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
final long startingTerm = leader.coordinator.getCurrentTerm();

leader.allowClusterStateApplicationFailure();
leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ public void testClusterStateApplierBubblesUpExceptionsInApplier() throws Interru
clusterApplierService.addStateApplier(event -> {
throw new RuntimeException("dummy exception");
});
clusterApplierService.allowClusterStateApplicationFailure();

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
Expand Down Expand Up @@ -387,6 +388,7 @@ public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws
AtomicReference<Throwable> error = new AtomicReference<>();
clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
v -> {});
clusterApplierService.allowClusterStateApplicationFailure();

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state())
Expand Down Expand Up @@ -497,6 +499,7 @@ static class TimedClusterApplierService extends ClusterApplierService {

final ClusterSettings clusterSettings;
volatile Long currentTimeOverride = null;
boolean applicationMayFail;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super("test_node", settings, clusterSettings, threadPool);
Expand All @@ -507,6 +510,15 @@ static class TimedClusterApplierService extends ClusterApplierService {
protected long currentTimeInMillis() {
return Objects.requireNonNullElseGet(currentTimeOverride, super::currentTimeInMillis);
}

@Override
protected boolean applicationMayFail() {
return this.applicationMayFail;
}

void allowClusterStateApplicationFailure() {
this.applicationMayFail = true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,10 @@ void applyInitialConfiguration() {
private boolean isNotUsefullyBootstrapped() {
return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false;
}

void allowClusterStateApplicationFailure() {
clusterApplierService.allowClusterStateApplicationFailure();
}
}

private List<TransportAddress> provideSeedHosts(SeedHostsProvider.HostsResolver ignored) {
Expand Down Expand Up @@ -1282,6 +1286,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
private final String nodeName;
private final DeterministicTaskQueue deterministicTaskQueue;
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
private boolean applicationMayFail;

DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
Expand Down Expand Up @@ -1326,6 +1331,15 @@ public void onNewClusterState(String source, Supplier<ClusterState> clusterState
protected void connectToNodesAndWait(ClusterState newClusterState) {
// don't do anything, and don't block
}

@Override
protected boolean applicationMayFail() {
return this.applicationMayFail;
}

void allowClusterStateApplicationFailure() {
this.applicationMayFail = true;
}
}

protected DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {
Expand Down