Skip to content

Commit

Permalink
support realtime TermVector and MultiTermVector requests with segment…
Browse files Browse the repository at this point in the history
… replication.

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Aug 28, 2023
1 parent 569d5c2 commit 4012c16
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static boolean isSegmentReplicationEnabled(ClusterState state, String indexName)
/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
public static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class MultiTermVectorsRequest extends ActionRequest
RealtimeRequest {

String preference;

boolean realtime = true;
List<TermVectorsRequest> requests = new ArrayList<>();

final Set<String> ids = new HashSet<>();
Expand Down Expand Up @@ -186,9 +188,18 @@ public int size() {

@Override
public MultiTermVectorsRequest realtime(boolean realtime) {
this.realtime = realtime;

Check warning on line 191 in server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java#L191

Added line #L191 was not covered by tests
for (TermVectorsRequest request : requests) {
request.realtime(realtime);
}
return this;
}

public MultiTermVectorsRequest preference(String preference) {
this.preference = preference;

Check warning on line 199 in server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java#L199

Added line #L199 was not covered by tests
for (TermVectorsRequest request : requests) {
request.preference(preference);
}
return this;

Check warning on line 203 in server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/termvectors/MultiTermVectorsRequest.java#L201-L203

Added lines #L201 - L203 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
// TODO: change to String[]
private Set<String> selectedFields;

private boolean realtime = true;
boolean realtime = true;

private Map<String, String> perFieldAnalyzer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.AtomicArray;
Expand All @@ -51,6 +52,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting;

/**
* Performs the multi term get operation.
*
Expand Down Expand Up @@ -123,6 +126,9 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final
.shardId(clusterState, concreteSingleIndex, termVectorsRequest.id(), termVectorsRequest.routing());
MultiTermVectorsShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) {
request.preference(Preference.PRIMARY.type());

Check warning on line 130 in server/src/main/java/org/opensearch/action/termvectors/TransportMultiTermVectorsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/termvectors/TransportMultiTermVectorsAction.java#L130

Added line #L130 was not covered by tests
}
shardRequest = new MultiTermVectorsShardRequest(shardId.getIndexName(), shardId.id());
shardRequest.preference(request.preference);
shardRequests.put(shardId, shardRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -53,6 +54,8 @@

import java.io.IOException;

import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting;

/**
* Performs the get operation.
*
Expand Down Expand Up @@ -87,15 +90,25 @@ public TransportTermVectorsAction(

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {

final String preference;
// route realtime TermVector requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) {
preference = Preference.PRIMARY.type();

Check warning on line 98 in server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java#L98

Added line #L98 was not covered by tests
} else {
preference = request.request().preference();
}

if (request.request().doc() != null && request.request().routing() == null) {
// artificial document without routing specified, ignore its "id" and use either random shard or according to preference
GroupShardsIterator<ShardIterator> groupShardsIter = clusterService.operationRouting()
.searchShards(state, new String[] { request.concreteIndex() }, null, request.request().preference());
.searchShards(state, new String[] { request.concreteIndex() }, null, preference);

Check warning on line 106 in server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/termvectors/TransportTermVectorsAction.java#L106

Added line #L106 was not covered by tests
return groupShardsIter.iterator().next();
}

return clusterService.operationRouting()
.getShards(state, request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference());
.getShards(state, request.concreteIndex(), request.request().id(), request.request().routing(), preference);
}

@Override
Expand Down

0 comments on commit 4012c16

Please sign in to comment.