From 5409734a32db2f845f2a94a5b7672224eac34709 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 15 Mar 2023 22:23:11 -0700 Subject: [PATCH] [Segment Replication] Override segment replication handler for duplicate request from replica (#6693) (#6699) * [Segment Replication] Override segment replication handler for new request from same replica * Spotless and use map.compute --------- (cherry picked from commit 6bbe31a34a60faa44d4c3af8c6c89427f7262e81) Signed-off-by: Suraj Singh Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../replication/OngoingSegmentReplications.java | 17 +++++++---------- .../OngoingSegmentReplicationsTests.java | 4 ++-- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 60078c082a7e3..bf9e4607b695a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -147,16 +147,13 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { + if (segrepHandler != null) { + logger.warn("Override handler for allocation id {}", request.getTargetAllocationId()); + cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry"); + } + return createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter); + }); return copyState; } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 838dabef1cf75..8cb7a5f6d8929 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication; import org.junit.Assert; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -277,7 +276,8 @@ public void testShardAlreadyReplicatingToNode() throws IOException { listener.onResponse(null); }; replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); }); + CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertEquals(1, copyState.refCount()); } public void testStartReplicationWithNoFilesToFetch() throws IOException {