Skip to content

Commit

Permalink
Convert IndicesShardStoreResponse to use Map (#86281)
Browse files Browse the repository at this point in the history
The IndicesShardStoreResponse class uses ImmutableOpenMap and
ImmutableOpenIntMap to represent the per index -> shard -> status
structure. This commit converts these to Java Maps.

relates #86239
  • Loading branch information
rjernst authored Apr 29, 2022
1 parent bc5cfcf commit 1c836da
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
Expand Down Expand Up @@ -76,7 +74,7 @@ public void testBasic() throws Exception {
// all shards
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index).shardStatuses("all")).get();
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
assertThat(shardStores.size(), equalTo(2));
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.entrySet()) {
for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.getValue()) {
Expand All @@ -98,7 +96,7 @@ public void testBasic() throws Exception {
List<ShardRouting> unassignedShards = clusterState.routingTable().index(index).shardsWithState(ShardRoutingState.UNASSIGNED);
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index)).get();
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size()));
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses.entrySet()) {
assertThat("must report for one store", storesStatus.getValue().size(), equalTo(1));
Expand All @@ -125,8 +123,7 @@ public void testIndices() throws Exception {
.indices()
.shardStores(Requests.indicesShardStoresRequest().shardStatuses("all"))
.get();
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> shardStatuses = response
.getStoreStatuses();
Map<String, Map<Integer, List<IndicesShardStoresResponse.StoreStatus>>> shardStatuses = response.getStoreStatuses();
assertThat(shardStatuses.containsKey(index1), equalTo(true));
assertThat(shardStatuses.containsKey(index2), equalTo(true));
assertThat(shardStatuses.get(index1).size(), equalTo(2));
Expand Down Expand Up @@ -181,7 +178,7 @@ public void testCorruptedShards() throws Exception {

assertBusy(() -> { // IndicesClusterStateService#failAndRemoveShard() called asynchronously but we need it to have completed here.
IndicesShardStoresResponse rsp = client().admin().indices().prepareShardStores(index).setShardStatuses("all").get();
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
assertNotNull(shardStatuses);
assertThat(shardStatuses.size(), greaterThan(0));
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStatus : shardStatuses.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
Expand Down Expand Up @@ -286,7 +285,7 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy
logger.info("--> explicitly promote old primary shard");
final String idxName = "test";
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin()
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin()
.indices()
.prepareShardStores(idxName)
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -110,12 +108,11 @@ private void verifyThenSubmitUpdate(
IndicesShardStoresAction.NAME,
new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)),
new ActionListenerResponseHandler<>(ActionListener.wrap(response -> {
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> status = response
.getStoreStatuses();
Map<String, Map<Integer, List<IndicesShardStoresResponse.StoreStatus>>> status = response.getStoreStatuses();
Exception e = null;
for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
final String index = entry.getKey();
final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
final Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
if (indexStatus == null) {
// The index in the stale primary allocation request was green and hence filtered out by the store status
// request. We ignore it here since the relevant exception will be thrown by the reroute action later on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,10 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
}
}

private final ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses;
private final Map<String, Map<Integer, List<StoreStatus>>> storeStatuses;
private final List<Failure> failures;

public IndicesShardStoresResponse(
ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses,
List<Failure> failures
) {
public IndicesShardStoresResponse(Map<String, Map<Integer, List<StoreStatus>>> storeStatuses, List<Failure> failures) {
this.storeStatuses = storeStatuses;
this.failures = failures;
}
Expand All @@ -264,7 +261,7 @@ public IndicesShardStoresResponse(StreamInput in) throws IOException {
* Returns {@link StoreStatus}s
* grouped by their index names and shard ids.
*/
public ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> getStoreStatuses() {
public Map<String, Map<Integer, List<StoreStatus>>> getStoreStatuses() {
return storeStatuses;
}

Expand Down Expand Up @@ -299,7 +296,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

builder.startObject(Fields.INDICES);
for (Map.Entry<String, ImmutableOpenIntMap<List<StoreStatus>>> indexShards : storeStatuses.entrySet()) {
for (Map.Entry<String, Map<Integer, List<StoreStatus>>> indexShards : storeStatuses.entrySet()) {
builder.startObject(indexShards.getKey());

builder.startObject(Fields.SHARDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.Failure;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus.AllocationStatus;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
Expand All @@ -31,8 +34,6 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Tuple;
Expand All @@ -47,8 +48,10 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -206,81 +209,50 @@ protected synchronized void processAsyncFetch(
}

void finish() {
ImmutableOpenMap.Builder<
String,
ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>>> indicesStoreStatusesBuilder =
ImmutableOpenMap.builder();

java.util.List<IndicesShardStoresResponse.Failure> failureBuilder = new ArrayList<>();
Map<String, Map<Integer, List<StoreStatus>>> indicesStatuses = new HashMap<>();
List<Failure> failures = new ArrayList<>();
for (Response fetchResponse : fetchResponses) {
ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexStoreStatuses =
indicesStoreStatusesBuilder.get(fetchResponse.shardId.getIndexName());
final ImmutableOpenIntMap.Builder<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexShardsBuilder;
if (indexStoreStatuses == null) {
indexShardsBuilder = ImmutableOpenIntMap.builder();
} else {
indexShardsBuilder = ImmutableOpenIntMap.builder(indexStoreStatuses);
}
java.util.List<IndicesShardStoresResponse.StoreStatus> storeStatuses = indexShardsBuilder.get(
fetchResponse.shardId.id()
);
if (storeStatuses == null) {
storeStatuses = new ArrayList<>();
}
for (NodeGatewayStartedShards response : fetchResponse.responses) {
if (shardExistsInNode(response)) {
IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(
fetchResponse.shardId.getIndexName(),
fetchResponse.shardId.id(),
response.getNode()
);
storeStatuses.add(
new IndicesShardStoresResponse.StoreStatus(
response.getNode(),
response.allocationId(),
allocationStatus,
response.storeException()
)
);
var indexName = fetchResponse.shardId.getIndexName();
var shardId = fetchResponse.shardId.id();
var indexStatuses = indicesStatuses.computeIfAbsent(indexName, k -> new HashMap<>());
var storeStatuses = indexStatuses.computeIfAbsent(shardId, k -> new ArrayList<>());

for (NodeGatewayStartedShards r : fetchResponse.responses) {
if (shardExistsInNode(r)) {
var allocationStatus = getAllocationStatus(indexName, shardId, r.getNode());
storeStatuses.add(new StoreStatus(r.getNode(), r.allocationId(), allocationStatus, r.storeException()));
}
}
CollectionUtil.timSort(storeStatuses);
indexShardsBuilder.put(fetchResponse.shardId.id(), storeStatuses);
indicesStoreStatusesBuilder.put(fetchResponse.shardId.getIndexName(), indexShardsBuilder.build());

for (FailedNodeException failure : fetchResponse.failures) {
failureBuilder.add(
new IndicesShardStoresResponse.Failure(
failure.nodeId(),
fetchResponse.shardId.getIndexName(),
fetchResponse.shardId.id(),
failure.getCause()
)
);
failures.add(new Failure(failure.nodeId(), indexName, shardId, failure.getCause()));
}
}
listener.onResponse(
new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), Collections.unmodifiableList(failureBuilder))
);
// make the status structure immutable
indicesStatuses.replaceAll((k, v) -> {
v.replaceAll((s, l) -> {
CollectionUtil.timSort(l);
return List.copyOf(l);
});
return Map.copyOf(v);
});
listener.onResponse(new IndicesShardStoresResponse(Map.copyOf(indicesStatuses), List.copyOf(failures)));
}

private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationStatus(
String index,
int shardID,
DiscoveryNode node
) {
private AllocationStatus getAllocationStatus(String index, int shardID, DiscoveryNode node) {
for (ShardRouting shardRouting : routingNodes.node(node.getId())) {
ShardId shardId = shardRouting.shardId();
if (shardId.id() == shardID && shardId.getIndexName().equals(index)) {
if (shardRouting.primary()) {
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY;
return AllocationStatus.PRIMARY;
} else if (shardRouting.assignedToNode()) {
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA;
return AllocationStatus.REPLICA;
} else {
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
return AllocationStatus.UNUSED;
}
}
}
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
return AllocationStatus.UNUSED;
}

/**
Expand Down
Loading

0 comments on commit 1c836da

Please sign in to comment.