From c8fc6d15de4de5287989337552d231eb0c3f2829 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 1 Mar 2024 14:21:57 +0530 Subject: [PATCH] Send unhandled exception as it is and send null for known exceptions Signed-off-by: Aman Khare --- .../gateway/AsyncShardBatchFetch.java | 16 ++++++++--- .../opensearch/gateway/BaseShardResponse.java | 5 ++-- .../opensearch/gateway/ShardBatchCache.java | 28 ++++++++++--------- ...ansportNodesGatewayStartedShardHelper.java | 4 ++- ...ortNodesListGatewayStartedShardsBatch.java | 13 +++++---- 5 files changed, 40 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index c44d06b813c71..7d9dcd909e3df 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -33,8 +33,7 @@ * @param Response type of the transport action. * @param Data type of shard level response. */ -public abstract class AsyncShardBatchFetch - extends AsyncShardFetch { +public abstract class AsyncShardBatchFetch extends AsyncShardFetch { @SuppressWarnings("unchecked") AsyncShardBatchFetch( @@ -50,8 +49,17 @@ public abstract class AsyncShardBatchFetch handleFailedShard ) { super(logger, type, shardAttributesMap, action, batchId); - this.cache = new ShardBatchCache<>(logger, type, shardAttributesMap, "BatchID=[" + batchId + "]" - , clazz, responseConstructor, shardsBatchDataGetter, emptyResponseBuilder, handleFailedShard); + this.cache = new ShardBatchCache<>( + logger, + type, + shardAttributesMap, + "BatchID=[" + batchId + "]", + clazz, + responseConstructor, + shardsBatchDataGetter, + emptyResponseBuilder, + handleFailedShard + ); } /** diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardResponse.java b/server/src/main/java/org/opensearch/gateway/BaseShardResponse.java index 7c4ddf28705cb..5d5585678d0a5 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardResponse.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardResponse.java @@ -21,7 +21,7 @@ * @opensearch.internal */ public abstract class BaseShardResponse extends TransportResponse { - public BaseShardResponse(){} + public BaseShardResponse() {} public abstract boolean isEmpty(); @@ -32,6 +32,5 @@ public BaseShardResponse(StreamInput in) throws IOException { } @Override - public void writeTo(StreamOutput out) throws IOException { - } + public void writeTo(StreamOutput out) throws IOException {} } diff --git a/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java b/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java index 5749bf5a32a86..77880092374bd 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardBatchCache.java @@ -54,10 +54,17 @@ public class ShardBatchCache failedShards; private final Consumer handleFailedShard; - public ShardBatchCache(Logger logger, String type, - Map shardToCustomDataPath, String logKey, Class clazz, - BiFunction, T> responseConstructor, Function> shardsBatchDataGetter, Supplier emptyResponseBuilder, Consumer handleFailedShard) { + public ShardBatchCache( + Logger logger, + String type, + Map shardToCustomDataPath, + String logKey, + Class clazz, + BiFunction, T> responseConstructor, + Function> shardsBatchDataGetter, + Supplier emptyResponseBuilder, + Consumer handleFailedShard + ) { super(logger, logKey, type); this.batchSize = shardToCustomDataPath.size(); fillShardIdKeys(shardToCustomDataPath.keySet()); @@ -134,7 +141,7 @@ public void putData(DiscoveryNode node, T response) { private List filterFailedShards(Map batchResponse) { logger.trace("filtering failed shards"); List failedShards = new ArrayList<>(); - for (Iterator it = batchResponse.keySet().iterator(); it.hasNext(); ) { + for (Iterator it = batchResponse.keySet().iterator(); it.hasNext();) { ShardId shardId = it.next(); if (batchResponse.get(shardId) != null) { if (batchResponse.get(shardId).getException() != null) { @@ -145,8 +152,7 @@ private List filterFailedShards(Map batchResponse) { if (shardException instanceof OpenSearchRejectedExecutionException || shardException instanceof ReceiveTimeoutTransportException || shardException instanceof OpenSearchTimeoutException) { - logger.trace("got unhandled retryable exception for shard {} {}", shardId.toString(), - shardException.toString()); + logger.trace("got unhandled retryable exception for shard {} {}", shardId.toString(), shardException.toString()); failedShards.add(shardId); handleFailedShard.accept(shardId); // remove this failed entry. So, while storing the data, we don't need to re-process it. @@ -179,8 +185,7 @@ private HashMap getBatchData(NodeEntry nodeEntry) { shardData.put(arrayToShardId.get(shardIdIndex), emptyResponseBuilder.get()); } else if (nodeShardEntries[shardIdIndex] != null) { // ignore null responses here - shardData.put(arrayToShardId.get(shardIdIndex), - nodeShardEntries[shardIdIndex]); + shardData.put(arrayToShardId.get(shardIdIndex), nodeShardEntries[shardIdIndex]); } } return shardData; @@ -233,7 +238,6 @@ boolean[] getEmptyShardResponse() { return emptyShardResponse; } - private void fillShardData(Map shardDataFromNode, Map shardIdKey) { for (ShardId shardId : shardDataFromNode.keySet()) { if (shardDataFromNode.get(shardId) != null) { @@ -243,12 +247,10 @@ private void fillShardData(Map shardDataFromNode, Map