From 100f5acd345dd13d8c8d17b93392d4b217bc50a2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 29 Jun 2022 18:39:19 +0200 Subject: [PATCH] Merge creating indices and shards methods in IndicesClusterStateService (#88184) Looping over the local routing node can be quite expensive for e.g. large warm/cold/frozen nodes. We can save one full iteration as well as a couple of map lookups by dealing with index and shard creation+updates in a single loop. --- .../cluster/IndicesClusterStateService.java | 67 ++++++++++--------- .../indices/recovery/package-info.java | 6 +- 2 files changed, 37 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 7f5790b8e5366..4e67ace32dda6 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -227,9 +227,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) { updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache - createIndices(state); - - createOrUpdateShards(state); + createIndicesAndUpdateShards(state); } /** @@ -469,19 +467,26 @@ private void removeShards(final ClusterState state) { } } - private void createIndices(final ClusterState state) { - // we only create indices for shards that are allocated - RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + private void createIndicesAndUpdateShards(final ClusterState state) { + DiscoveryNodes nodes = state.nodes(); + RoutingNode localRoutingNode = state.getRoutingNodes().node(nodes.getLocalNodeId()); if (localRoutingNode == null) { return; } - // create map of indices to create with shards to fail if index creation fails + + RoutingTable routingTable = state.routingTable(); + + // create map of indices to create with shards to fail if index creation fails or create or update shards if an existing index + // service is found final Map> indicesToCreate = new HashMap<>(); for (ShardRouting shardRouting : localRoutingNode) { if (failedShardsCache.containsKey(shardRouting.shardId()) == false) { final Index index = shardRouting.index(); - if (indicesService.indexService(index) == null) { + final var indexService = indicesService.indexService(index); + if (indexService == null) { indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting); + } else { + createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService); } } } @@ -506,10 +511,31 @@ private void createIndices(final ClusterState state) { for (ShardRouting shardRouting : entry.getValue()) { sendFailShard(shardRouting, failShardReason, e, state); } + continue; + } + // we succeeded in creating the index service, so now we can create the missing shards assigned to this node + for (ShardRouting shardRouting : entry.getValue()) { + createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService); } } } + private void createOrUpdateShard( + ClusterState state, + DiscoveryNodes nodes, + RoutingTable routingTable, + ShardRouting shardRouting, + AllocatedIndex indexService + ) { + Shard shard = indexService.getShardOrNull(shardRouting.shardId().id()); + if (shard == null) { + assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards"; + createShard(nodes, routingTable, shardRouting, state); + } else { + updateShard(nodes, shardRouting, shard, routingTable, state); + } + } + private void updateIndices(ClusterChangedEvent event) { if (event.metadataChanged() == false) { return; @@ -550,31 +576,6 @@ private void updateIndices(ClusterChangedEvent event) { } } - private void createOrUpdateShards(final ClusterState state) { - RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); - if (localRoutingNode == null) { - return; - } - - DiscoveryNodes nodes = state.nodes(); - RoutingTable routingTable = state.routingTable(); - - for (final ShardRouting shardRouting : localRoutingNode) { - ShardId shardId = shardRouting.shardId(); - if (failedShardsCache.containsKey(shardId) == false) { - AllocatedIndex indexService = indicesService.indexService(shardId.getIndex()); - assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices"; - Shard shard = indexService.getShardOrNull(shardId.id()); - if (shard == null) { - assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards"; - createShard(nodes, routingTable, shardRouting, state); - } else { - updateShard(nodes, shardRouting, shard, routingTable, state); - } - } - } - } - private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) { assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/package-info.java b/server/src/main/java/org/elasticsearch/indices/recovery/package-info.java index 8bd340ba2621e..145d539d78e61 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/package-info.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/package-info.java @@ -14,9 +14,9 @@ * Recoveries are started on data nodes as a result of data node discovering shard assignments to themselves in the cluster state. The * master node sets up these shard allocations in the cluster state (see {@link org.elasticsearch.cluster.routing.ShardRouting}). * If a data node finds shard allocations that require recovery on itself, it will execute the required recoveries by executing the - * logic starting at {@link org.elasticsearch.indices.cluster.IndicesClusterStateService#createOrUpdateShards}. As the data nodes execute - * the steps of the recovery state machine they report back success or failure to do so to the master node via the transport actions in - * {@link org.elasticsearch.cluster.action.shard.ShardStateAction}, which will then update the shard routing in the cluster state + * logic starting at {@link org.elasticsearch.indices.cluster.IndicesClusterStateService#createIndicesAndUpdateShards}. As the data nodes + * execute the steps of the recovery state machine they report back success or failure to do so to the master node via the transport + * actions in {@link org.elasticsearch.cluster.action.shard.ShardStateAction}, which will then update the shard routing in the cluster state * accordingly to reflect the status of the recovered shards or to handle failures in the recovery process. Recoveries can have various * kinds of sources that are modeled via the {@link org.elasticsearch.cluster.routing.RecoverySource} that is communicated to the recovery * target by {@link org.elasticsearch.cluster.routing.ShardRouting#recoverySource()} for each shard routing. These sources and their state