From 4012c1672f26a72e7ee054c8144035a43838d6d5 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 28 Aug 2023 16:58:41 +0000 Subject: [PATCH] support realtime TermVector and MultiTermVector requests with segment replication. Signed-off-by: Rishikesh1159 --- .../action/get/TransportGetAction.java | 2 +- .../termvectors/MultiTermVectorsRequest.java | 11 +++++++++++ .../action/termvectors/TermVectorsRequest.java | 2 +- .../TransportMultiTermVectorsAction.java | 6 ++++++ .../termvectors/TransportTermVectorsAction.java | 17 +++++++++++++++-- 5 files changed, 34 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 583815b91ae68..401bb390a9df1 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -104,7 +104,7 @@ static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) /** * Returns true if GET request should be routed to primary shards, else false. */ - protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { + public static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null; } diff --git a/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java b/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java index c055564c3fcbe..b163f7a255300 100644 --- a/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java +++ b/server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java @@ -63,6 +63,8 @@ public class MultiTermVectorsRequest extends ActionRequest RealtimeRequest { String preference; + + boolean realtime = true; List requests = new ArrayList<>(); final Set ids = new HashSet<>(); @@ -186,9 +188,18 @@ public int size() { @Override public MultiTermVectorsRequest realtime(boolean realtime) { + this.realtime = realtime; for (TermVectorsRequest request : requests) { request.realtime(realtime); } return this; } + + public MultiTermVectorsRequest preference(String preference) { + this.preference = preference; + for (TermVectorsRequest request : requests) { + request.preference(preference); + } + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java b/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java index 825b0b4982880..744fd1c4acff8 100644 --- a/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java +++ b/server/src/main/java/org/opensearch/action/termvectors/TermVectorsRequest.java @@ -109,7 +109,7 @@ public class TermVectorsRequest extends SingleShardRequest i // TODO: change to String[] private Set selectedFields; - private boolean realtime = true; + boolean realtime = true; private Map perFieldAnalyzer; diff --git a/server/src/main/java/org/opensearch/action/termvectors/TransportMultiTermVectorsAction.java b/server/src/main/java/org/opensearch/action/termvectors/TransportMultiTermVectorsAction.java index 0364f36106cb0..e4a5b8f339423 100644 --- a/server/src/main/java/org/opensearch/action/termvectors/TransportMultiTermVectorsAction.java +++ b/server/src/main/java/org/opensearch/action/termvectors/TransportMultiTermVectorsAction.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.AtomicArray; @@ -51,6 +52,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting; + /** * Performs the multi term get operation. * @@ -123,6 +126,9 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final .shardId(clusterState, concreteSingleIndex, termVectorsRequest.id(), termVectorsRequest.routing()); MultiTermVectorsShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { + if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) { + request.preference(Preference.PRIMARY.type()); + } shardRequest = new MultiTermVectorsShardRequest(shardId.getIndexName(), shardId.id()); shardRequest.preference(request.preference); shardRequests.put(shardId, shardRequest); diff --git a/server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java b/server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java index a76506b39f811..c3760c888a39f 100644 --- a/server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java +++ b/server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.routing.GroupShardsIterator; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -53,6 +54,8 @@ import java.io.IOException; +import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting; + /** * Performs the get operation. * @@ -87,15 +90,25 @@ public TransportTermVectorsAction( @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { + + final String preference; + // route realtime TermVector requests when segment replication is enabled to primary shards, + // iff there are no other preferences/routings enabled for routing to a specific shard + if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) { + preference = Preference.PRIMARY.type(); + } else { + preference = request.request().preference(); + } + if (request.request().doc() != null && request.request().routing() == null) { // artificial document without routing specified, ignore its "id" and use either random shard or according to preference GroupShardsIterator groupShardsIter = clusterService.operationRouting() - .searchShards(state, new String[] { request.concreteIndex() }, null, request.request().preference()); + .searchShards(state, new String[] { request.concreteIndex() }, null, preference); return groupShardsIter.iterator().next(); } return clusterService.operationRouting() - .getShards(state, request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference()); + .getShards(state, request.concreteIndex(), request.request().id(), request.request().routing(), preference); } @Override