diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index 24ec4043531ee..c44d06b813c71 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -12,42 +12,83 @@ import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.store.ShardAttributes; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; /** * Implementation of AsyncShardFetchAbstract with batching support. + * cache will be created using ShardBatchCache class as that can handle the caching strategy correctly for a + * batch of shards. Other necessary functions are also stored so cache can store or get the data for both primary + * and replicas. + * * @param Response type of the transport action. * @param Data type of shard level response. */ public abstract class AsyncShardBatchFetch - extends AsyncShardFetch{ + extends AsyncShardFetch { @SuppressWarnings("unchecked") AsyncShardBatchFetch( Logger logger, String type, - Map shardToCustomDataPath, + Map shardAttributesMap, AsyncShardFetch.Lister, T> action, String batchId, Class clazz, - BiFunction, T> responseGetter, + BiFunction, T> responseConstructor, Function> shardsBatchDataGetter, - Supplier emptyResponseBuilder + Supplier emptyResponseBuilder, + Consumer handleFailedShard ) { - super(logger, type, shardToCustomDataPath, action, batchId); - this.shardCache = new ShardBatchCache<>(logger, type, shardToCustomDataPath, "BatchID=[" + batchId+ "]" - , clazz, responseGetter, shardsBatchDataGetter, emptyResponseBuilder); + super(logger, type, shardAttributesMap, action, batchId); + this.cache = new ShardBatchCache<>(logger, type, shardAttributesMap, "BatchID=[" + batchId + "]" + , clazz, responseConstructor, shardsBatchDataGetter, emptyResponseBuilder, handleFailedShard); + } + + /** + * Fetch the data for a batch of shards, this uses the already written {@link AsyncShardFetch} fetchData method. + * Based on the shards failed in last round, it makes sure to trigger a reroute for them. + * + * @param nodes all the nodes where transport call should be sent + * @param ignoreNodes nodes to update based on failures received from transport actions + * @return data received from the transport actions + */ + public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { + List failedShards = cleanUpFailedShards(); + if (failedShards.isEmpty() == false) { + // trigger a reroute if there are any shards failed, to make sure they're picked up in next run + logger.trace("triggering another reroute for failed shards in {}", reroutingKey); + reroute("shards-failed", "shards failed in " + reroutingKey); + } + return super.fetchData(nodes, ignoreNodes); + } + + /** + * Remove the shard from shardAttributesMap so we don't send it in next fetching round. + * + * @return return the failed shards so a reroute can be triggered. + */ + private List cleanUpFailedShards() { + List failedShards = cache.getFailedShards(); + if (failedShards != null && failedShards.isEmpty() == false) { + shardAttributesMap.keySet().removeIf(failedShards::contains); + } + return failedShards; } /** * Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's * assigned or failed. + * * @param shardId shardId to be removed from the batch. */ public void clearShard(ShardId shardId) { diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 50774f7e0cb1c..bcadc5be1d1e2 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -89,7 +89,7 @@ public interface Lister, N private final Map> cache = new HashMap<>(); private final AtomicLong round = new AtomicLong(); private boolean closed; - private final String reroutingKey; + protected final String reroutingKey; private final Map> shardToIgnoreNodes = new HashMap<>(); private final boolean enableBatchMode; diff --git a/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java b/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java index c19232d0101ff..5749bf5a32a86 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java @@ -29,39 +29,48 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; /** - * Cache implementation of transport actions returning batch of shards related data in the response. + * Cache implementation of transport actions returning batch of shards data in the response. Cache uses a specific + * NodeEntry class that stores the data in array format. To keep the class generic for primary or replica, all + * functions are stored during object creation. * * @param Response type of transport action. * @param Data type of shard level response. */ public class ShardBatchCache extends BaseShardCache { - private final Map> cache = new HashMap<>(); - private final Map shardIdToArray = new HashMap<>(); // used for mapping array index for a shard - private final AtomicInteger shardIdIndex = new AtomicInteger(); + private final Map> cache; + private final Map shardIdToArray; // used for mapping array index for a shard + private final AtomicInteger shardIdIndex; private final int batchSize; private final Class shardResponseClass; private final BiFunction, T> responseConstructor; - private final Map arrayToShardId = new HashMap<>(); + private final Map arrayToShardId; private final Function> shardsBatchDataGetter; private final Supplier emptyResponseBuilder; private final Set failedShards; + private final Consumer handleFailedShard; public ShardBatchCache(Logger logger, String type, Map shardToCustomDataPath, String logKey, Class clazz, - BiFunction, T> responseGetter, Function> shardsBatchDataGetter, Supplier emptyResponseBuilder) { + BiFunction, T> responseConstructor, Function> shardsBatchDataGetter, Supplier emptyResponseBuilder, Consumer handleFailedShard) { super(logger, logKey, type); this.batchSize = shardToCustomDataPath.size(); fillShardIdKeys(shardToCustomDataPath.keySet()); this.shardResponseClass = clazz; - this.responseConstructor = responseGetter; + this.responseConstructor = responseConstructor; this.shardsBatchDataGetter = shardsBatchDataGetter; this.emptyResponseBuilder = emptyResponseBuilder; failedShards = new HashSet<>(); + cache = new HashMap<>(); + shardIdToArray = new HashMap<>(); + arrayToShardId = new HashMap<>(); + shardIdIndex = new AtomicInteger(); + this.handleFailedShard = handleFailedShard; } @Override @@ -139,6 +148,7 @@ private List filterFailedShards(Map batchResponse) { logger.trace("got unhandled retryable exception for shard {} {}", shardId.toString(), shardException.toString()); failedShards.add(shardId); + handleFailedShard.accept(shardId); // remove this failed entry. So, while storing the data, we don't need to re-process it. it.remove(); }