Skip to content

Commit

Permalink
Handle shard failures by triggereing a reroute
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 1, 2024
1 parent 222f68b commit f71f426
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,83 @@
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Implementation of AsyncShardFetchAbstract with batching support.
* cache will be created using ShardBatchCache class as that can handle the caching strategy correctly for a
* batch of shards. Other necessary functions are also stored so cache can store or get the data for both primary
* and replicas.
*
* @param <T> Response type of the transport action.
* @param <V> Data type of shard level response.
*/
public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V extends BaseShardResponse>
extends AsyncShardFetch<T>{
extends AsyncShardFetch<T> {

@SuppressWarnings("unchecked")
AsyncShardBatchFetch(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardToCustomDataPath,
Map<ShardId, ShardAttributes> shardAttributesMap,
AsyncShardFetch.Lister<? extends BaseNodesResponse<T>, T> action,
String batchId,
Class<V> clazz,
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseGetter,
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseConstructor,
Function<T, Map<ShardId, V>> shardsBatchDataGetter,
Supplier<V> emptyResponseBuilder
Supplier<V> emptyResponseBuilder,
Consumer<ShardId> handleFailedShard
) {
super(logger, type, shardToCustomDataPath, action, batchId);
this.shardCache = new ShardBatchCache<>(logger, type, shardToCustomDataPath, "BatchID=[" + batchId+ "]"
, clazz, responseGetter, shardsBatchDataGetter, emptyResponseBuilder);
super(logger, type, shardAttributesMap, action, batchId);
this.cache = new ShardBatchCache<>(logger, type, shardAttributesMap, "BatchID=[" + batchId + "]"
, clazz, responseConstructor, shardsBatchDataGetter, emptyResponseBuilder, handleFailedShard);
}

/**
* Fetch the data for a batch of shards, this uses the already written {@link AsyncShardFetch} fetchData method.
* Based on the shards failed in last round, it makes sure to trigger a reroute for them.
*
* @param nodes all the nodes where transport call should be sent
* @param ignoreNodes nodes to update based on failures received from transport actions
* @return data received from the transport actions
*/
public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId, Set<String>> ignoreNodes) {
List<ShardId> failedShards = cleanUpFailedShards();
if (failedShards.isEmpty() == false) {
// trigger a reroute if there are any shards failed, to make sure they're picked up in next run
logger.trace("triggering another reroute for failed shards in {}", reroutingKey);
reroute("shards-failed", "shards failed in " + reroutingKey);
}
return super.fetchData(nodes, ignoreNodes);
}

/**
* Remove the shard from shardAttributesMap so we don't send it in next fetching round.
*
* @return return the failed shards so a reroute can be triggered.
*/
private List<ShardId> cleanUpFailedShards() {
List<ShardId> failedShards = cache.getFailedShards();
if (failedShards != null && failedShards.isEmpty() == false) {
shardAttributesMap.keySet().removeIf(failedShards::contains);
}
return failedShards;
}

/**
* Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's
* assigned or failed.
*
* @param shardId shardId to be removed from the batch.
*/
public void clearShard(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, N
private final Map<String, NodeEntry<T>> cache = new HashMap<>();
private final AtomicLong round = new AtomicLong();
private boolean closed;
private final String reroutingKey;
protected final String reroutingKey;
private final Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

private final boolean enableBatchMode;
Expand Down
26 changes: 18 additions & 8 deletions server/src/main/java/org/opensearch/gateway/ShardBatchCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,39 +29,48 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Cache implementation of transport actions returning batch of shards data in the response. Cache uses a specific
* NodeEntry class that stores the data in array format. To keep the class generic for primary or replica, all
* functions are stored during object creation.
*
* @param <T> Response type of transport action.
* @param <V> Data type of shard level response.
*/
public class ShardBatchCache<T extends BaseNodeResponse, V extends BaseShardResponse> extends BaseShardCache<T> {
private final Map<String, NodeEntry<V>> cache = new HashMap<>();
private final Map<ShardId, Integer> shardIdToArray = new HashMap<>(); // used for mapping array index for a shard
private final AtomicInteger shardIdIndex = new AtomicInteger();
private final Map<String, NodeEntry<V>> cache;
private final Map<ShardId, Integer> shardIdToArray; // used for mapping array index for a shard
private final AtomicInteger shardIdIndex;
private final int batchSize;
private final Class<V> shardResponseClass;
private final BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseConstructor;
private final Map<Integer, ShardId> arrayToShardId = new HashMap<>();
private final Map<Integer, ShardId> arrayToShardId;
private final Function<T, Map<ShardId, V>> shardsBatchDataGetter;
private final Supplier<V> emptyResponseBuilder;
private final Set<ShardId> failedShards;
private final Consumer<ShardId> handleFailedShard;

public ShardBatchCache(Logger logger, String type,
Map<ShardId, ShardAttributes> shardToCustomDataPath, String logKey, Class<V> clazz,
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseGetter, Function<T,
Map<ShardId, V>> shardsBatchDataGetter, Supplier<V> emptyResponseBuilder) {
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseConstructor, Function<T,
Map<ShardId, V>> shardsBatchDataGetter, Supplier<V> emptyResponseBuilder, Consumer<ShardId> handleFailedShard) {
super(logger, logKey, type);
this.batchSize = shardToCustomDataPath.size();
fillShardIdKeys(shardToCustomDataPath.keySet());
this.shardResponseClass = clazz;
this.responseConstructor = responseGetter;
this.responseConstructor = responseConstructor;
this.shardsBatchDataGetter = shardsBatchDataGetter;
this.emptyResponseBuilder = emptyResponseBuilder;
failedShards = new HashSet<>();
cache = new HashMap<>();
shardIdToArray = new HashMap<>();
arrayToShardId = new HashMap<>();
shardIdIndex = new AtomicInteger();
this.handleFailedShard = handleFailedShard;
}

@Override
Expand Down Expand Up @@ -139,6 +148,7 @@ private List<ShardId> filterFailedShards(Map<ShardId, V> batchResponse) {
logger.trace("got unhandled retryable exception for shard {} {}", shardId.toString(),
shardException.toString());
failedShards.add(shardId);
handleFailedShard.accept(shardId);
// remove this failed entry. So, while storing the data, we don't need to re-process it.
it.remove();
}
Expand Down

0 comments on commit f71f426

Please sign in to comment.