diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/termsenum/CCSTermsEnumIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/termsenum/CCSTermsEnumIT.java new file mode 100644 index 0000000000000..6b6223a1e2caa --- /dev/null +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/termsenum/CCSTermsEnumIT.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class CCSTermsEnumIT extends AbstractMultiClustersTestCase { + + @Override + protected Collection remoteClusterAlias() { + return List.of("remote_cluster"); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + final List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(LocalStateCompositeXPackPlugin.class); + return plugins; + } + + public void testBasic() { + Settings indexSettings = Settings.builder().put("index.number_of_replicas", 0).build(); + final Client localClient = client(LOCAL_CLUSTER); + final Client remoteClient = client("remote_cluster"); + String localIndex = "local_test"; + assertAcked(localClient.admin().indices().prepareCreate(localIndex).setSettings(indexSettings)); + localClient.prepareIndex(localIndex).setSource("foo", "foo").get(); + localClient.prepareIndex(localIndex).setSource("foo", "foobar").get(); + localClient.admin().indices().prepareRefresh(localIndex).get(); + + String remoteIndex = "remote_test"; + assertAcked(remoteClient.admin().indices().prepareCreate(remoteIndex).setSettings(indexSettings)); + remoteClient.prepareIndex(remoteIndex).setSource("foo", "bar").get(); + remoteClient.prepareIndex(remoteIndex).setSource("foo", "foobar").get(); + remoteClient.prepareIndex(remoteIndex).setSource("foo", "zar").get(); + remoteClient.admin().indices().prepareRefresh(remoteIndex).get(); + + // _terms_enum on a remote cluster + TermsEnumRequest req = new TermsEnumRequest("remote_cluster:remote_test") + .field("foo.keyword"); + TermsEnumResponse response = client().execute(TermsEnumAction.INSTANCE, req).actionGet(); + assertTrue(response.isComplete()); + assertThat(response.getTerms().size(), equalTo(3)); + assertThat(response.getTerms().get(0), equalTo("bar")); + assertThat(response.getTerms().get(1), equalTo("foobar")); + assertThat(response.getTerms().get(2), equalTo("zar")); + + // _terms_enum on mixed clusters (local + remote) + req = new TermsEnumRequest("remote_cluster:remote_test", "local_test") + .field("foo.keyword"); + response = client().execute(TermsEnumAction.INSTANCE, req).actionGet(); + assertTrue(response.isComplete()); + assertThat(response.getTerms().size(), equalTo(4)); + assertThat(response.getTerms().get(0), equalTo("bar")); + assertThat(response.getTerms().get(1), equalTo("foo")); + assertThat(response.getTerms().get(2), equalTo("foobar")); + assertThat(response.getTerms().get(3), equalTo("zar")); + + req = new TermsEnumRequest("remote_cluster:remote_test", "local_test") + .field("foo.keyword") + .searchAfter("foobar"); + response = client().execute(TermsEnumAction.INSTANCE, req).actionGet(); + assertTrue(response.isComplete()); + assertThat(response.getTerms().size(), equalTo(1)); + assertThat(response.getTerms().get(0), equalTo("zar")); + + req = new TermsEnumRequest("remote_cluster:remote_test", "local_test") + .field("foo.keyword") + .searchAfter("bar"); + response = client().execute(TermsEnumAction.INSTANCE, req).actionGet(); + assertTrue(response.isComplete()); + assertThat(response.getTerms().size(), equalTo(3)); + assertThat(response.getTerms().get(0), equalTo("foo")); + assertThat(response.getTerms().get(1), equalTo("foobar")); + assertThat(response.getTerms().get(2), equalTo("zar")); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java index 280584c939408..08fe633a3fedb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.TransportRequest; @@ -20,7 +21,7 @@ import java.util.Set; /** - * Internal terms enum request executed directly against a specific node, querying potentially many + * Internal terms enum request executed directly against a specific node, querying potentially many * shards in one request */ public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest { @@ -36,12 +37,27 @@ public class NodeTermsEnumRequest extends TransportRequest implements IndicesReq private final QueryBuilder indexFilter; private Set shardIds; private String nodeId; - + + public NodeTermsEnumRequest(final String nodeId, + final Set shardIds, + TermsEnumRequest request, + long taskStartTimeMillis) { + this.field = request.field(); + this.string = request.string(); + this.searchAfter = request.searchAfter(); + this.caseInsensitive = request.caseInsensitive(); + this.size = request.size(); + this.timeout = request.timeout().getMillis(); + this.taskStartedTimeMillis = taskStartTimeMillis; + this.indexFilter = request.indexFilter(); + this.nodeId = nodeId; + this.shardIds = shardIds; + } public NodeTermsEnumRequest(StreamInput in) throws IOException { super(in); field = in.readString(); - string = in.readString(); + string = in.readOptionalString(); searchAfter = in.readOptionalString(); caseInsensitive = in.readBoolean(); size = in.readVInt(); @@ -56,27 +72,38 @@ public NodeTermsEnumRequest(StreamInput in) throws IOException { } } - public NodeTermsEnumRequest(final String nodeId, final Set shardIds, TermsEnumRequest request) { - this.field = request.field(); - this.string = request.string(); - this.searchAfter = request.searchAfter(); - this.caseInsensitive = request.caseInsensitive(); - this.size = request.size(); - this.timeout = request.timeout().getMillis(); - this.taskStartedTimeMillis = request.taskStartTimeMillis; - this.indexFilter = request.indexFilter(); - this.nodeId = nodeId; - this.shardIds = shardIds; + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(field); + out.writeOptionalString(string); + out.writeOptionalString(searchAfter); + out.writeBoolean(caseInsensitive); + out.writeVInt(size); + // Adjust the amount of permitted time the shard has remaining to gather terms. + long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis; + long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode); + // TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0? + out.writeVLong(remainingTimeForShardToUse); + out.writeVLong(taskStartedTimeMillis); + out.writeOptionalNamedWriteable(indexFilter); + out.writeString(nodeId); + out.writeVInt(shardIds.size()); + for (ShardId shardId : shardIds) { + shardId.writeTo(out); + } } public String field() { return field; } + @Nullable public String string() { return string; } + @Nullable public String searchAfter() { return searchAfter; } @@ -84,8 +111,8 @@ public String searchAfter() { public long taskStartedTimeMillis() { return this.taskStartedTimeMillis; } - - /** + + /** * The time this request was materialized on a node */ long nodeStartedTimeMillis() { @@ -94,12 +121,12 @@ long nodeStartedTimeMillis() { nodeStartedTimeMillis = System.currentTimeMillis(); } return this.nodeStartedTimeMillis; - } - + } + public void startTimerOnDataNode() { nodeStartedTimeMillis = System.currentTimeMillis(); } - + public Set shardIds() { return Collections.unmodifiableSet(shardIds); } @@ -119,28 +146,6 @@ public String nodeId() { return nodeId; } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(field); - out.writeString(string); - out.writeOptionalString(searchAfter); - out.writeBoolean(caseInsensitive); - out.writeVInt(size); - // Adjust the amount of permitted time the shard has remaining to gather terms. - long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis; - long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode); - // TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0? - out.writeVLong(remainingTimeForShardToUse); - out.writeVLong(taskStartedTimeMillis); - out.writeOptionalNamedWriteable(indexFilter); - out.writeString(nodeId); - out.writeVInt(shardIds.size()); - for (ShardId shardId : shardIds) { - shardId.writeTo(out); - } - } - public QueryBuilder indexFilter() { return indexFilter; } @@ -162,5 +167,4 @@ public IndicesOptions indicesOptions() { public boolean remove(ShardId shardId) { return shardIds.remove(shardId); } - } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java index 728a6504c0eb3..2a38a7284d06a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java @@ -41,6 +41,14 @@ class NodeTermsEnumResponse extends TransportResponse { this.complete = complete; } + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(terms); + out.writeOptionalString(error); + out.writeBoolean(complete); + out.writeString(nodeId); + } + public List terms() { return this.terms; } @@ -52,17 +60,8 @@ public String getError() { public String getNodeId() { return nodeId; } - - public boolean getComplete() { - return complete; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeCollection(terms); - out.writeOptionalString(error); - out.writeBoolean(complete); - out.writeString(nodeId); + public boolean isComplete() { + return complete; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequest.java index ddfb441b9d54a..da42a0b3fdbaa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequest.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -30,35 +31,58 @@ public class TermsEnumRequest extends BroadcastRequest impleme public static TimeValue DEFAULT_TIMEOUT = new TimeValue(1000); private String field; - private String string; - private String searchAfter; + private String string = null; + private String searchAfter = null; private int size = DEFAULT_SIZE; private boolean caseInsensitive; - long taskStartTimeMillis; private QueryBuilder indexFilter; public TermsEnumRequest() { this(Strings.EMPTY_ARRAY); } + /** + * Constructs a new term enum request against the provided indices. No indices provided means it will + * run against all indices. + */ + public TermsEnumRequest(String... indices) { + super(indices); + indicesOptions(IndicesOptions.fromOptions(false, false, true, false)); + timeout(DEFAULT_TIMEOUT); + } + + public TermsEnumRequest(TermsEnumRequest clone) { + this.field = clone.field; + this.string = clone.string; + this.searchAfter = clone.searchAfter; + this.caseInsensitive = clone.caseInsensitive; + this.size = clone.size; + this.indexFilter = clone.indexFilter; + indices(clone.indices); + indicesOptions(clone.indicesOptions()); + timeout(clone.timeout()); + setParentTask(clone.getParentTask()); + } + public TermsEnumRequest(StreamInput in) throws IOException { super(in); field = in.readString(); - string = in.readString(); + string = in.readOptionalString(); searchAfter = in.readOptionalString(); caseInsensitive = in.readBoolean(); size = in.readVInt(); indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class); } - /** - * Constructs a new term enum request against the provided indices. No indices provided means it will - * run against all indices. - */ - public TermsEnumRequest(String... indices) { - super(indices); - indicesOptions(IndicesOptions.fromOptions(false, false, true, false)); - timeout(DEFAULT_TIMEOUT); + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(field); + out.writeOptionalString(string); + out.writeOptionalString(searchAfter); + out.writeBoolean(caseInsensitive); + out.writeVInt(size); + out.writeOptionalNamedWriteable(indexFilter); } @Override @@ -81,8 +105,9 @@ public ActionRequestValidationException validate() { /** * The field to look inside for values */ - public void field(String field) { + public TermsEnumRequest field(String field) { this.field = field; + return this; } /** @@ -95,13 +120,15 @@ public String field() { /** * The string required in matching field values */ - public void string(String string) { + public TermsEnumRequest string(String string) { this.string = string; + return this; } /** * The string required in matching field values */ + @Nullable public String string() { return string; } @@ -109,6 +136,7 @@ public String string() { /** * The string after which to find matching field values (enables pagination of previous request) */ + @Nullable public String searchAfter() { return searchAfter; } @@ -116,8 +144,9 @@ public String searchAfter() { /** * The string after which to find matching field values (enables pagination of previous request) */ - public void searchAfter(String searchAfter) { + public TermsEnumRequest searchAfter(String searchAfter) { this.searchAfter = searchAfter; + return this; } /** @@ -130,15 +159,17 @@ public int size() { /** * The number of terms to return */ - public void size(int size) { + public TermsEnumRequest size(int size) { this.size = size; + return this; } /** * If case insensitive matching is required */ - public void caseInsensitive(boolean caseInsensitive) { + public TermsEnumRequest caseInsensitive(boolean caseInsensitive) { this.caseInsensitive = caseInsensitive; + return this; } /** @@ -151,25 +182,15 @@ public boolean caseInsensitive() { /** * Allows to filter shards if the provided {@link QueryBuilder} rewrites to `match_none`. */ - public void indexFilter(QueryBuilder indexFilter) { + public TermsEnumRequest indexFilter(QueryBuilder indexFilter) { this.indexFilter = indexFilter; + return this; } public QueryBuilder indexFilter() { return indexFilter; } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(field); - out.writeString(string); - out.writeOptionalString(searchAfter); - out.writeBoolean(caseInsensitive); - out.writeVInt(size); - out.writeOptionalNamedWriteable(indexFilter); - } - @Override public String toString() { return "[" + Arrays.toString(indices) + "] field[" + field + "], string[" + string + "] " + " size=" + size + " timeout=" diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumResponse.java index cf36745015920..b39171633142f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumResponse.java @@ -81,6 +81,10 @@ public List getTerms() { return terms; } + public boolean isComplete() { + return complete; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java index b92b9326e1f70..45b116c41dd4d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java @@ -9,13 +9,17 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -52,6 +56,8 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestHandler; @@ -65,9 +71,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -76,15 +80,17 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; public class TransportTermsEnumAction extends HandledTransportAction { - protected final ClusterService clusterService; - protected final TransportService transportService; + private final ClusterService clusterService; + private final TransportService transportService; + private final RemoteClusterService remoteClusterService; private final SearchService searchService; private final IndicesService indicesService; private final ScriptService scriptService; - protected final IndexNameExpressionResolver indexNameExpressionResolver; + private final IndexNameExpressionResolver indexNameExpressionResolver; final String transportShardAction; private final String shardExecutor; @@ -92,9 +98,9 @@ public class TransportTermsEnumAction extends HandledTransportAction listener) { - request.taskStartTimeMillis = task.getStartTime(); new AsyncBroadcastAction(task, request, listener).start(); } - protected NodeTermsEnumRequest newNodeRequest(final String nodeId, final Set shardIds, TermsEnumRequest request) { + protected NodeTermsEnumRequest newNodeRequest(final String nodeId, + final Set shardIds, + TermsEnumRequest request, + long taskStartMillis) { // Given we look terms up in the terms dictionary alias filters is another aspect of search (like DLS) that we // currently do not support. // final ClusterState clusterState = clusterService.state(); // final Set indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices()); // final AliasFilter aliasFilter = searchService.buildAliasFilter(clusterState, shard.getIndexName(), indicesAndAliases); - return new NodeTermsEnumRequest(nodeId, shardIds, request); + return new NodeTermsEnumRequest(nodeId, shardIds, request, taskStartMillis); } protected NodeTermsEnumResponse readShardResponse(StreamInput in) throws IOException { @@ -171,7 +180,7 @@ protected Map> getNodeBundles(ClusterState clusterState, Te if (fastNodeBundles.containsKey(nodeId)) { bundle = fastNodeBundles.get(nodeId); } else { - bundle = new HashSet(); + bundle = new HashSet<>(); fastNodeBundles.put(nodeId, bundle); } if (bundle != null) { @@ -190,31 +199,24 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, TermsEnumR return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices); } - protected TermsEnumResponse newResponse( + protected TermsEnumResponse mergeResponses( TermsEnumRequest request, - AtomicReferenceArray nodesResponses, + AtomicReferenceArray atomicResponses, boolean complete, Map> nodeBundles ) { int successfulShards = 0; int failedShards = 0; List shardFailures = null; - Map combinedResults = new HashMap(); - for (int i = 0; i < nodesResponses.length(); i++) { - Object nodeResponse = nodesResponses.get(i); - if (nodeResponse == null) { - // simply ignore non active shards - } else if (nodeResponse instanceof BroadcastShardOperationFailedException) { - complete = false; - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) nodeResponse)); - } else { - NodeTermsEnumResponse str = (NodeTermsEnumResponse) nodeResponse; + List> termsList = new ArrayList<>(); + for (int i = 0; i < atomicResponses.length(); i++) { + Object atomicResponse = atomicResponses.get(i); + if (atomicResponse == null) { + // simply ignore non active operations + } else if (atomicResponse instanceof NodeTermsEnumResponse) { + NodeTermsEnumResponse str = (NodeTermsEnumResponse) atomicResponse; // Only one node response has to be incomplete for the entire result to be labelled incomplete. - if (str.getComplete() == false) { + if (str.isComplete() == false) { complete = false; } @@ -238,34 +240,79 @@ protected TermsEnumResponse newResponse( } else { successfulShards += shards.size(); } - for (TermCount term : str.terms()) { - TermCount existingTc = combinedResults.get(term.getTerm()); - if (existingTc == null) { - combinedResults.put(term.getTerm(), term); - } else { - // add counts - existingTc.addToDocCount(term.getDocCount()); - } + termsList.add(str.terms()); + } else if (atomicResponse instanceof RemoteClusterTermsEnumResponse) { + RemoteClusterTermsEnumResponse rc = (RemoteClusterTermsEnumResponse) atomicResponse; + // Only one node response has to be incomplete for the entire result to be labelled incomplete. + if (rc.resp.isComplete() == false || rc.resp.getFailedShards() > 0) { + complete = false; + } + successfulShards = rc.resp.getSuccessfulShards(); + failedShards = rc.resp.getFailedShards(); + for (DefaultShardOperationFailedException exc : rc.resp.getShardFailures()) { + shardFailures.add(new DefaultShardOperationFailedException(rc.clusterAlias + ":" + exc.index(), + exc.shardId(), exc.getCause())); } + List terms = rc.resp.getTerms().stream() + .map(a -> new TermCount(a, 1)) + .collect(Collectors.toList()); + termsList.add(terms); + } else { + throw new AssertionError("Unknown atomic response type: " + atomicResponse.getClass().getName()); } } - int size = Math.min(request.size(), combinedResults.size()); - List terms = new ArrayList<>(size); - TermCount[] sortedCombinedResults = combinedResults.values().toArray(new TermCount[0]); - // Sort alphabetically - Arrays.sort(sortedCombinedResults, new Comparator() { - public int compare(TermCount t1, TermCount t2) { - return t1.getTerm().compareTo(t2.getTerm()); + + List ans = termsList.size() == 1 ? termsList.get(0).stream() + .map(TermCount::getTerm) + .collect(Collectors.toList()) : mergeResponses(termsList, request.size()); + return new TermsEnumResponse(ans, (failedShards + successfulShards), successfulShards, failedShards, shardFailures, complete); + } + + private List mergeResponses(List> termsList, int size) { + final PriorityQueue pq = new PriorityQueue<>(termsList.size()) { + @Override + protected boolean lessThan(TermCountIterator a, TermCountIterator b) { + return a.compareTo(b) < 0; + } + }; + + for (List terms : termsList) { + Iterator it = terms.iterator(); + if (it.hasNext()) { + pq.add(new TermCountIterator(it)); } - }); + } - for (TermCount term : sortedCombinedResults) { - terms.add(term.getTerm()); - if (terms.size() == size) { - break; + TermCount lastTerm = null; + final List ans = new ArrayList<>(); + while (pq.size() != 0) { + TermCountIterator it = pq.top(); + String term = it.term(); + long docCount = it.docCount(); + if (lastTerm != null && lastTerm.getTerm().compareTo(term) != 0) { + ans.add(lastTerm.getTerm()); + if (ans.size() == size) { + break; + } + lastTerm = null; + } + if (lastTerm == null) { + lastTerm = new TermCount(term, 0); + } + lastTerm.addToDocCount(docCount); + if (it.hasNext()) { + String itTerm = it.term(); + it.next(); + assert itTerm.compareTo(it.term()) <= 0; + pq.updateTop(); + } else { + pq.pop(); } } - return new TermsEnumResponse(terms, (failedShards + successfulShards), successfulShards, failedShards, shardFailures, complete); + if (lastTerm != null && ans.size() < size) { + ans.add(lastTerm.getTerm()); + } + return ans; } protected NodeTermsEnumResponse dataNodeOperation(NodeTermsEnumRequest request, Task task) throws IOException { @@ -421,8 +468,9 @@ protected class AsyncBroadcastAction { private final DiscoveryNodes nodes; private final int expectedOps; private final AtomicInteger counterOps = new AtomicInteger(); - private final AtomicReferenceArray nodesResponses; - private Map> nodeBundles; + private final AtomicReferenceArray atomicResponses; + private final Map> nodeBundles; + private final Map remoteClusterIndices; protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListener listener) { this.task = task; @@ -435,8 +483,13 @@ protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListen if (blockException != null) { throw blockException; } + + this.remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(), request.indices()); + OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + // update to concrete indices - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request); + String[] concreteIndices = localIndices == null ? new String[0] : + indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices); blockException = checkRequestBlock(clusterState, request, concreteIndices); if (blockException != null) { throw blockException; @@ -445,16 +498,16 @@ protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListen nodes = clusterState.nodes(); logger.trace("resolving shards based on cluster state version [{}]", clusterState.version()); nodeBundles = getNodeBundles(clusterState, request, concreteIndices); - expectedOps = nodeBundles.size(); + expectedOps = nodeBundles.size() + remoteClusterIndices.size(); - nodesResponses = new AtomicReferenceArray<>(expectedOps); + atomicResponses = new AtomicReferenceArray<>(expectedOps); } public void start() { - if (nodeBundles.size() == 0) { + if (expectedOps == 0) { // no shards try { - listener.onResponse(newResponse(request, new AtomicReferenceArray<>(0), true, nodeBundles)); + listener.onResponse(mergeResponses(request, new AtomicReferenceArray<>(0), true, nodeBundles)); } catch (Exception e) { listener.onFailure(e); } @@ -462,19 +515,24 @@ public void start() { return; } // count the local operations, and perform the non local ones - int nodeIndex = -1; + int numOps = 0; for (final String nodeId : nodeBundles.keySet()) { if (checkForEarlyFinish()) { return; } - nodeIndex++; Set shardIds = nodeBundles.get(nodeId); if (shardIds.size() > 0) { - performOperation(nodeId, shardIds, nodeIndex); + performOperation(nodeId, shardIds, numOps); } else { // really, no shards active in this group - onNoOperation(nodeId); + onNodeFailure(nodeId, numOps, null); } + ++ numOps; + } + // handle remote clusters + for (String clusterAlias : remoteClusterIndices.keySet()) { + performRemoteClusterOperation(clusterAlias, remoteClusterIndices.get(clusterAlias), numOps); + ++ numOps; } } @@ -488,18 +546,18 @@ boolean checkForEarlyFinish() { return false; } - protected void performOperation(final String nodeId, final Set shardIds, final int nodeIndex) { + protected void performOperation(final String nodeId, final Set shardIds, final int opsIndex) { if (shardIds.size() == 0) { // no more active shards... (we should not really get here, just safety) - onNoOperation(nodeId); + onNodeFailure(nodeId, opsIndex, null); } else { try { - final NodeTermsEnumRequest nodeRequest = newNodeRequest(nodeId, shardIds, request); + final NodeTermsEnumRequest nodeRequest = newNodeRequest(nodeId, shardIds, request, task.getStartTime()); nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); DiscoveryNode node = nodes.get(nodeId); if (node == null) { // no node connected, act as failure - onNoOperation(nodeId); + onNodeFailure(nodeId, opsIndex, null); } else if (checkForEarlyFinish() == false) { transportService.sendRequest( node, @@ -513,25 +571,50 @@ public NodeTermsEnumResponse read(StreamInput in) throws IOException { @Override public void handleResponse(NodeTermsEnumResponse response) { - onOperation(nodeId, nodeIndex, response); + onNodeResponse(nodeId, opsIndex, response); } @Override - public void handleException(TransportException e) { - onNoOperation(nodeId); + public void handleException(TransportException exc) { + onNodeFailure(nodeId, opsIndex, exc); } } ); } - } catch (Exception e) { - onNoOperation(nodeId); + } catch (Exception exc) { + onNodeFailure(nodeId, opsIndex, exc); } } } - protected void onOperation(String nodeId, int nodeIndex, NodeTermsEnumResponse response) { + void performRemoteClusterOperation(final String clusterAlias, + final OriginalIndices remoteIndices, + final int opsIndex) { + try { + TermsEnumRequest req = new TermsEnumRequest(request) + .indices(remoteIndices.indices()); + + Client remoteClient = remoteClusterService.getRemoteClusterClient(transportService.getThreadPool(), clusterAlias); + remoteClient.execute(TermsEnumAction.INSTANCE, req, new ActionListener<>() { + @Override + public void onResponse(TermsEnumResponse termsEnumResponse) { + onRemoteClusterResponse(clusterAlias, opsIndex, + new RemoteClusterTermsEnumResponse(clusterAlias, termsEnumResponse)); + } + + @Override + public void onFailure(Exception exc) { + onRemoteClusterFailure(clusterAlias, opsIndex, exc); + } + }); + } catch (Exception exc) { + onRemoteClusterFailure(clusterAlias, opsIndex, null); + } + } + + private void onNodeResponse(String nodeId, int opsIndex, NodeTermsEnumResponse response) { logger.trace("received response for node {}", nodeId); - nodesResponses.set(nodeIndex, response); + atomicResponses.set(opsIndex, response); if (expectedOps == counterOps.incrementAndGet()) { finishHim(true); } else { @@ -539,7 +622,29 @@ protected void onOperation(String nodeId, int nodeIndex, NodeTermsEnumResponse r } } - void onNoOperation(String nodeId) { + private void onRemoteClusterResponse(String clusterAlias, + int opsIndex, + RemoteClusterTermsEnumResponse response) { + logger.trace("received response for cluster {}", clusterAlias); + atomicResponses.set(opsIndex, response); + if (expectedOps == counterOps.incrementAndGet()) { + finishHim(true); + } else { + checkForEarlyFinish(); + } + } + + private void onNodeFailure(String nodeId, int opsIndex, Exception exc) { + logger.trace("received failure {} for node {}", exc, nodeId); + // TODO: Handle exceptions in the atomic response array + if (expectedOps == counterOps.incrementAndGet()) { + finishHim(true); + } + } + + private void onRemoteClusterFailure(String clusterAlias, int opsIndex, Exception exc) { + logger.trace("received failure {} for cluster {}", exc, clusterAlias); + // TODO: Handle exceptions in the atomic response array if (expectedOps == counterOps.incrementAndGet()) { finishHim(true); } @@ -551,7 +656,7 @@ protected synchronized void finishHim(boolean complete) { return; } try { - listener.onResponse(newResponse(request, nodesResponses, complete, nodeBundles)); + listener.onResponse(mergeResponses(request, atomicResponses, complete, nodeBundles)); } catch (Exception e) { listener.onFailure(e); } finally { @@ -614,4 +719,47 @@ private void asyncNodeOperation(NodeTermsEnumRequest request, Task task, ActionL .execute(ActionRunnable.supply(listener, () -> dataNodeOperation(request, task))); } } + + private static class RemoteClusterTermsEnumResponse { + final String clusterAlias; + final TermsEnumResponse resp; + + private RemoteClusterTermsEnumResponse(String clusterAlias, TermsEnumResponse resp) { + this.clusterAlias = clusterAlias; + this.resp = resp; + } + } + + private static class TermCountIterator implements Iterator, Comparable { + private final Iterator iterator; + private TermCount current; + + private TermCountIterator(Iterator iterator) { + this.iterator = iterator; + this.current = iterator.next(); + } + + public String term() { + return current.getTerm(); + } + + public long docCount() { + return current.getDocCount(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public TermCount next() { + return current = iterator.next(); + } + + @Override + public int compareTo(TermCountIterator o) { + return current.getTerm().compareTo(o.term()); + } + } } diff --git a/x-pack/qa/multi-cluster-search-security/build.gradle b/x-pack/qa/multi-cluster-search-security/build.gradle index 7dd1b3816632a..8adab3fe43744 100644 --- a/x-pack/qa/multi-cluster-search-security/build.gradle +++ b/x-pack/qa/multi-cluster-search-security/build.gradle @@ -12,7 +12,7 @@ dependencies { restResources { restApi { include '_common', 'bulk', 'field_caps', 'security', 'search', 'clear_scroll', 'scroll', 'async_search', 'cluster', - 'indices', 'open_point_in_time', 'close_point_in_time' + 'indices', 'open_point_in_time', 'close_point_in_time', 'terms_enum' } } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml index 0e7d124ff4d72..22441992a4f8b 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml @@ -35,11 +35,14 @@ - match: {indices.8.name: my_remote_cluster:secured_via_alias} - match: {indices.8.attributes.0: open} - match: {indices.9.name: my_remote_cluster:shared_index} + - match: {indices.9.attributes.0: open} - match: {indices.10.name: my_remote_cluster:single_doc_index} - - match: {indices.11.attributes.0: open} - - match: {indices.11.name: my_remote_cluster:test_index} - - match: {indices.11.aliases.0: aliased_test_index} - - match: {indices.11.attributes.0: open} + - match: {indices.10.attributes.0: open} + - match: {indices.11.name: my_remote_cluster:terms_enum_index } + - match: {indices.11.attributes.0: open } + - match: {indices.12.name: my_remote_cluster:test_index} + - match: {indices.12.aliases.0: aliased_test_index} + - match: {indices.12.attributes.0: open} - match: {aliases.0.name: my_remote_cluster:.security} - match: {aliases.0.indices.0: .security-7} - match: {aliases.1.name: my_remote_cluster:aliased_closed_index} diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index e7688734d8093..a90ed83c02425 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -350,3 +350,5 @@ teardown: id: "$id" - match: { acknowledged: true } + + diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/120_terms_enum.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/120_terms_enum.yml new file mode 100644 index 0000000000000..291b6a8bf1030 --- /dev/null +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/120_terms_enum.yml @@ -0,0 +1,138 @@ +--- +setup: + - skip: + features: headers + + - do: + cluster.health: + wait_for_status: yellow + + - do: + security.put_role: + name: "terms_enum_all_role" + body: > + { + "cluster": ["all"], + "indices": [ + { + "names": ["my_remote_cluster:terms_enum_index"], + "privileges": ["read"] + } + ] + } + + - do: + security.put_user: + username: "joe_all" + body: > + { + "password": "s3krit-password", + "roles" : [ "terms_enum_all_role" ] + } + + - do: + security.put_role: + name: "terms_enum_none_role" + body: > + { + "cluster": ["all"], + "indices": [ + { + "names": ["my_remote_cluster:terms_enum_index"], + "privileges": ["read"] + } + ] + } + + - do: + security.put_user: + username: "joe_none" + body: > + { + "password": "s3krit-password", + "roles" : [ "terms_enum_none_role" ] + } + + - do: + security.put_role: + name: "terms_enum_fls_role" + body: > + { + "cluster": ["all"], + "indices": [ + { + "names": ["my_remote_cluster:terms_enum_index"], + "privileges": ["read"] + } + ] + } + + - do: + security.put_user: + username: "joe_fls" + body: > + { + "password": "s3krit-password", + "roles" : [ "terms_enum_fls_role" ] + } + +--- +teardown: + - do: + security.delete_user: + username: "joe_all" + ignore: 404 + - do: + security.delete_user: + username: "joe_none" + ignore: 404 + - do: + security.delete_user: + username: "joe_fls" + ignore: 404 + - do: + security.delete_role: + name: "terms_enum_all_role" + ignore: 404 + - do: + security.delete_role: + name: "terms_enum_none_role" + ignore: 404 + - do: + security.delete_role: + name: "terms_enum_fls_role" + ignore: 404 + +--- +"Test terms enum on a remote cluster": + - do: + headers: { Authorization: "Basic am9lX2FsbDpzM2tyaXQtcGFzc3dvcmQ=" } # joe_all sees all docs + terms_enum: + index: my_remote_cluster:terms_enum_index + body: {"field": "foo", size: 2} + - length: { terms: 2 } + - match: { terms.0: "foo" } + - match: { terms.1: "foobar" } + + - do: + headers: { Authorization: "Basic am9lX2FsbDpzM2tyaXQtcGFzc3dvcmQ=" } # joe_all sees all docs + terms_enum: + index: my_remote_cluster:terms_enum_index + body: { "field": "foo", "search_after": "foobar" } + - length: { terms: 1 } + - match: { terms.0: "zar" } + - match: { complete: true } + + - do: + headers: { Authorization: "Basic am9lX25vbmU6czNrcml0LXBhc3N3b3Jk" } # joe_none can't see docs + terms_enum: + index: my_remote_cluster:terms_enum_index + body: {"field": "foo"} + - length: {terms: 0} + + - do: + headers: { Authorization: "Basic am9lX2ZsczpzM2tyaXQtcGFzc3dvcmQ=" } # joe_fls can't see field + terms_enum: + index: my_remote_cluster:terms_enum_index + body: {"field": "foo"} + - length: {terms: 0} diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml index f52b2f069e8c0..b8feed5c68d7f 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml @@ -68,6 +68,39 @@ setup: ] } + - do: + security.put_role: + name: "terms_enum_all_role" + body: > + { + "cluster": ["monitor"], + "indices": [ + { "names": ["terms_enum_index"], "privileges": ["read"], "query": "{\"term\": {\"ck\": \"const\"}}" } + ] + } + + - do: + security.put_role: + name: "terms_enum_none_role" + body: > + { + "cluster": ["monitor"], + "indices": [ + { "names": ["terms_enum_index"], "privileges": ["read"], "query": "{\"term\": {\"ck\": \"dynamic\"}}" } + ] + } + + - do: + security.put_role: + name: "terms_enum_fls_role" + body: > + { + "cluster": ["monitor"], + "indices": [ + { "names": ["terms_enum_index"], "privileges": ["read"], "field_security" : {"grant" : [ "*"],"except": [ "foo" ]} } + ] + } + --- "Index data and search on the remote cluster": - skip: @@ -335,3 +368,30 @@ setup: - '{"public": true, "name": "doc 1", "secret": "sesame"}' - '{"index": {"_index": "shared_index", "_id": 2}}' - '{"public": false, "name": "doc 2", "secret": "sesame"}' + + - do: + indices.create: + index: terms_enum_index + body: + settings: + index: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + ck: + type: constant_keyword + value: const + foo: + type: keyword + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "terms_enum_index"}}' + - '{"foo": "zar"}' + - '{"index": {"_index": "terms_enum_index"}}' + - '{"foo": "foo"}' + - '{"index": {"_index": "terms_enum_index"}}' + - '{"foo": "foobar"}'