Skip to content

Commit

Permalink
Send unhandled exception as it is and send null for known exceptions
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 1, 2024
1 parent f71f426 commit c8fc6d1
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
* @param <T> Response type of the transport action.
* @param <V> Data type of shard level response.
*/
public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V extends BaseShardResponse>
extends AsyncShardFetch<T> {
public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V extends BaseShardResponse> extends AsyncShardFetch<T> {

@SuppressWarnings("unchecked")
AsyncShardBatchFetch(
Expand All @@ -50,8 +49,17 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V extends
Consumer<ShardId> handleFailedShard
) {
super(logger, type, shardAttributesMap, action, batchId);
this.cache = new ShardBatchCache<>(logger, type, shardAttributesMap, "BatchID=[" + batchId + "]"
, clazz, responseConstructor, shardsBatchDataGetter, emptyResponseBuilder, handleFailedShard);
this.cache = new ShardBatchCache<>(
logger,
type,
shardAttributesMap,
"BatchID=[" + batchId + "]",
clazz,
responseConstructor,
shardsBatchDataGetter,
emptyResponseBuilder,
handleFailedShard
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* @opensearch.internal
*/
public abstract class BaseShardResponse extends TransportResponse {
public BaseShardResponse(){}
public BaseShardResponse() {}

public abstract boolean isEmpty();

Expand All @@ -32,6 +32,5 @@ public BaseShardResponse(StreamInput in) throws IOException {
}

@Override
public void writeTo(StreamOutput out) throws IOException {
}
public void writeTo(StreamOutput out) throws IOException {}
}
28 changes: 15 additions & 13 deletions server/src/main/java/org/opensearch/gateway/ShardBatchCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,17 @@ public class ShardBatchCache<T extends BaseNodeResponse, V extends BaseShardResp
private final Set<ShardId> failedShards;
private final Consumer<ShardId> handleFailedShard;

public ShardBatchCache(Logger logger, String type,
Map<ShardId, ShardAttributes> shardToCustomDataPath, String logKey, Class<V> clazz,
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseConstructor, Function<T,
Map<ShardId, V>> shardsBatchDataGetter, Supplier<V> emptyResponseBuilder, Consumer<ShardId> handleFailedShard) {
public ShardBatchCache(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardToCustomDataPath,
String logKey,
Class<V> clazz,
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseConstructor,
Function<T, Map<ShardId, V>> shardsBatchDataGetter,
Supplier<V> emptyResponseBuilder,
Consumer<ShardId> handleFailedShard
) {
super(logger, logKey, type);
this.batchSize = shardToCustomDataPath.size();
fillShardIdKeys(shardToCustomDataPath.keySet());
Expand Down Expand Up @@ -134,7 +141,7 @@ public void putData(DiscoveryNode node, T response) {
private List<ShardId> filterFailedShards(Map<ShardId, V> batchResponse) {
logger.trace("filtering failed shards");
List<ShardId> failedShards = new ArrayList<>();
for (Iterator<ShardId> it = batchResponse.keySet().iterator(); it.hasNext(); ) {
for (Iterator<ShardId> it = batchResponse.keySet().iterator(); it.hasNext();) {
ShardId shardId = it.next();
if (batchResponse.get(shardId) != null) {
if (batchResponse.get(shardId).getException() != null) {
Expand All @@ -145,8 +152,7 @@ private List<ShardId> filterFailedShards(Map<ShardId, V> batchResponse) {
if (shardException instanceof OpenSearchRejectedExecutionException
|| shardException instanceof ReceiveTimeoutTransportException
|| shardException instanceof OpenSearchTimeoutException) {
logger.trace("got unhandled retryable exception for shard {} {}", shardId.toString(),
shardException.toString());
logger.trace("got unhandled retryable exception for shard {} {}", shardId.toString(), shardException.toString());
failedShards.add(shardId);
handleFailedShard.accept(shardId);
// remove this failed entry. So, while storing the data, we don't need to re-process it.
Expand Down Expand Up @@ -179,8 +185,7 @@ private HashMap<ShardId, V> getBatchData(NodeEntry<V> nodeEntry) {
shardData.put(arrayToShardId.get(shardIdIndex), emptyResponseBuilder.get());
} else if (nodeShardEntries[shardIdIndex] != null) {
// ignore null responses here
shardData.put(arrayToShardId.get(shardIdIndex),
nodeShardEntries[shardIdIndex]);
shardData.put(arrayToShardId.get(shardIdIndex), nodeShardEntries[shardIdIndex]);
}
}
return shardData;
Expand Down Expand Up @@ -233,7 +238,6 @@ boolean[] getEmptyShardResponse() {
return emptyShardResponse;
}


private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
for (ShardId shardId : shardDataFromNode.keySet()) {
if (shardDataFromNode.get(shardId) != null) {
Expand All @@ -243,12 +247,10 @@ private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integ
} else if (shardDataFromNode.get(shardId).getException() == null) {
this.shardData[shardIdKey.get(shardId)] = shardDataFromNode.get(shardId);
}
//if exception is not null, we got unhandled failure for the shard which needs to be ignored
// if exception is not null, we got unhandled failure for the shard which needs to be ignored
}
}
}
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
* @opensearch.internal
*/
public class TransportNodesGatewayStartedShardHelper {

public static final String INDEX_NOT_FOUND = "node doesn't have meta data for index";
public static TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard getShardInfoOnLocalNode(
Logger logger,
final ShardId shardId,
Expand Down Expand Up @@ -67,7 +69,7 @@ public static TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShar
customDataPath = new IndexSettings(metadata, settings).customDataPath();
} else {
logger.trace("{} node doesn't have meta data for the requests index", shardId);
throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex());
throw new OpenSearchException(INDEX_NOT_FOUND + " " + shardId.getIndex());
}
}
// we don't have an open shard on the store, validate the files on disk are openable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.gateway;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionType;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
Expand Down Expand Up @@ -40,6 +39,7 @@
import java.util.Map;
import java.util.Objects;

import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.INDEX_NOT_FOUND;
import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode;

/**
Expand Down Expand Up @@ -153,10 +153,13 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) {
)
);
} catch (Exception e) {
shardsOnNode.put(
shardId,
new NodeGatewayStartedShard(null, false, null, new OpenSearchException("failed to load started shards", e))
);
// should return null in case of known exceptions being returned from getShardInfoOnLocalNode method.
if (e instanceof IllegalStateException || e.getMessage().contains(INDEX_NOT_FOUND)) {
shardsOnNode.put(shardId, null);
} else {
// return actual exception as it is for unknown exceptions
shardsOnNode.put(shardId, new NodeGatewayStartedShard(null, false, null, e));
}
}
}
return new NodeGatewayStartedShardsBatch(clusterService.localNode(), shardsOnNode);
Expand Down

0 comments on commit c8fc6d1

Please sign in to comment.