Skip to content

Commit

Permalink
Merge creating indices and shards methods in IndicesClusterStateServi…
Browse files Browse the repository at this point in the history
…ce (elastic#88184)

Looping over the local routing node can be quite expensive for e.g.
large warm/cold/frozen nodes. We can save one full iteration as well
as a couple of map lookups by dealing with index and shard creation+updates
in a single loop.
  • Loading branch information
original-brownbear authored Jun 29, 2022
1 parent 17305fc commit 100f5ac
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {

updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache

createIndices(state);

createOrUpdateShards(state);
createIndicesAndUpdateShards(state);
}

/**
Expand Down Expand Up @@ -469,19 +467,26 @@ private void removeShards(final ClusterState state) {
}
}

private void createIndices(final ClusterState state) {
// we only create indices for shards that are allocated
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
private void createIndicesAndUpdateShards(final ClusterState state) {
DiscoveryNodes nodes = state.nodes();
RoutingNode localRoutingNode = state.getRoutingNodes().node(nodes.getLocalNodeId());
if (localRoutingNode == null) {
return;
}
// create map of indices to create with shards to fail if index creation fails

RoutingTable routingTable = state.routingTable();

// create map of indices to create with shards to fail if index creation fails or create or update shards if an existing index
// service is found
final Map<Index, List<ShardRouting>> indicesToCreate = new HashMap<>();
for (ShardRouting shardRouting : localRoutingNode) {
if (failedShardsCache.containsKey(shardRouting.shardId()) == false) {
final Index index = shardRouting.index();
if (indicesService.indexService(index) == null) {
final var indexService = indicesService.indexService(index);
if (indexService == null) {
indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting);
} else {
createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService);
}
}
}
Expand All @@ -506,10 +511,31 @@ private void createIndices(final ClusterState state) {
for (ShardRouting shardRouting : entry.getValue()) {
sendFailShard(shardRouting, failShardReason, e, state);
}
continue;
}
// we succeeded in creating the index service, so now we can create the missing shards assigned to this node
for (ShardRouting shardRouting : entry.getValue()) {
createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService);
}
}
}

private void createOrUpdateShard(
ClusterState state,
DiscoveryNodes nodes,
RoutingTable routingTable,
ShardRouting shardRouting,
AllocatedIndex<? extends Shard> indexService
) {
Shard shard = indexService.getShardOrNull(shardRouting.shardId().id());
if (shard == null) {
assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
createShard(nodes, routingTable, shardRouting, state);
} else {
updateShard(nodes, shardRouting, shard, routingTable, state);
}
}

private void updateIndices(ClusterChangedEvent event) {
if (event.metadataChanged() == false) {
return;
Expand Down Expand Up @@ -550,31 +576,6 @@ private void updateIndices(ClusterChangedEvent event) {
}
}

private void createOrUpdateShards(final ClusterState state) {
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (localRoutingNode == null) {
return;
}

DiscoveryNodes nodes = state.nodes();
RoutingTable routingTable = state.routingTable();

for (final ShardRouting shardRouting : localRoutingNode) {
ShardId shardId = shardRouting.shardId();
if (failedShardsCache.containsKey(shardId) == false) {
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";
Shard shard = indexService.getShardOrNull(shardId.id());
if (shard == null) {
assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
createShard(nodes, routingTable, shardRouting, state);
} else {
updateShard(nodes, shardRouting, shard, routingTable, state);
}
}
}
}

private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* Recoveries are started on data nodes as a result of data node discovering shard assignments to themselves in the cluster state. The
* master node sets up these shard allocations in the cluster state (see {@link org.elasticsearch.cluster.routing.ShardRouting}).
* If a data node finds shard allocations that require recovery on itself, it will execute the required recoveries by executing the
* logic starting at {@link org.elasticsearch.indices.cluster.IndicesClusterStateService#createOrUpdateShards}. As the data nodes execute
* the steps of the recovery state machine they report back success or failure to do so to the master node via the transport actions in
* {@link org.elasticsearch.cluster.action.shard.ShardStateAction}, which will then update the shard routing in the cluster state
* logic starting at {@link org.elasticsearch.indices.cluster.IndicesClusterStateService#createIndicesAndUpdateShards}. As the data nodes
* execute the steps of the recovery state machine they report back success or failure to do so to the master node via the transport
* actions in {@link org.elasticsearch.cluster.action.shard.ShardStateAction}, which will then update the shard routing in the cluster state
* accordingly to reflect the status of the recovered shards or to handle failures in the recovery process. Recoveries can have various
* kinds of sources that are modeled via the {@link org.elasticsearch.cluster.routing.RecoverySource} that is communicated to the recovery
* target by {@link org.elasticsearch.cluster.routing.ShardRouting#recoverySource()} for each shard routing. These sources and their state
Expand Down

0 comments on commit 100f5ac

Please sign in to comment.