diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index f204cf162a6af..627ad975f6d3e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -41,9 +41,9 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; @@ -309,10 +309,19 @@ private void executeRequest(Task task, SearchRequest searchRequest, skippedClusters, remoteClusterIndices, remoteClusterService, threadPool, ActionListener.wrap( searchShardsResponses -> { - List remoteShardIterators = new ArrayList<>(); - Map remoteAliasFilters = new HashMap<>(); - BiFunction clusterNodeLookup = processRemoteShards( - searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + final BiFunction clusterNodeLookup = + getRemoteClusterNodeLookup(searchShardsResponses); + final Map remoteAliasFilters; + final List remoteShardIterators; + if (searchContext != null) { + remoteAliasFilters = searchContext.aliasFilter(); + remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses, + searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices); + } else { + remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses); + remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices, + remoteAliasFilters); + } int localClusters = localIndices == null ? 0 : 1; int totalClusters = remoteClusterIndices.size() + localClusters; int successfulClusters = searchShardsResponses.size() + localClusters; @@ -498,23 +507,29 @@ private void executeLocalSearch(Task task, SearchTimeProvider timeProvider, Sear searchContext, searchAsyncActionProvider); } - static BiFunction processRemoteShards(Map searchShardsResponses, - Map remoteIndicesByCluster, - List remoteShardIterators, - Map aliasFilterMap) { + static BiFunction getRemoteClusterNodeLookup(Map searchShardsResp) { Map> clusterToNode = new HashMap<>(); - for (Map.Entry entry : searchShardsResponses.entrySet()) { + for (Map.Entry entry : searchShardsResp.entrySet()) { String clusterAlias = entry.getKey(); - ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); - HashMap idToDiscoveryNode = new HashMap<>(); - clusterToNode.put(clusterAlias, idToDiscoveryNode); - for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) { - idToDiscoveryNode.put(remoteNode.getId(), remoteNode); + for (DiscoveryNode remoteNode : entry.getValue().getNodes()) { + clusterToNode.computeIfAbsent(clusterAlias, k -> new HashMap<>()).put(remoteNode.getId(), remoteNode); + } + } + return (clusterAlias, nodeId) -> { + Map clusterNodes = clusterToNode.get(clusterAlias); + if (clusterNodes == null) { + throw new IllegalArgumentException("unknown remote cluster: " + clusterAlias); } + return clusterNodes.get(nodeId); + }; + } + + static Map getRemoteAliasFilters(Map searchShardsResp) { + final Map aliasFilterMap = new HashMap<>(); + for (Map.Entry entry : searchShardsResp.entrySet()) { + ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); final Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters(); for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { - //add the cluster name to the remote index names for indices disambiguation - //this ends up in the hits returned with the search response ShardId shardId = clusterSearchShardsGroup.getShardId(); final AliasFilter aliasFilter; if (indicesAndFilters == null) { @@ -523,10 +538,26 @@ static BiFunction processRemoteShards(Map getRemoteShardsIterator(Map searchShardsResponses, + Map remoteIndicesByCluster, + Map aliasFilterMap) { + final List remoteShardIterators = new ArrayList<>(); + for (Map.Entry entry : searchShardsResponses.entrySet()) { + for (ClusterSearchShardsGroup clusterSearchShardsGroup : entry.getValue().getGroups()) { + //add the cluster name to the remote index names for indices disambiguation + //this ends up in the hits returned with the search response + ShardId shardId = clusterSearchShardsGroup.getShardId(); + AliasFilter aliasFilter = aliasFilterMap.get(shardId.getIndex().getUUID()); + String[] aliases = aliasFilter.getAliases(); + String clusterAlias = entry.getKey(); + String[] finalIndices = aliases.length == 0 ? new String[]{shardId.getIndexName()} : aliases; final OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias); assert originalIndices != null : "original indices are null for clusterAlias: " + clusterAlias; SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, @@ -535,13 +566,27 @@ static BiFunction processRemoteShards(Map { - Map clusterNodes = clusterToNode.get(clusterAlias); - if (clusterNodes == null) { - throw new IllegalArgumentException("unknown remote cluster: " + clusterAlias); - } - return clusterNodes.get(nodeId); - }; + return remoteShardIterators; + } + + static List getRemoteShardsIteratorFromPointInTime(Map searchShardsResponses, + SearchContextId searchContextId, + TimeValue searchContextKeepAlive, + Map remoteClusterIndices) { + final List remoteShardIterators = new ArrayList<>(); + for (Map.Entry entry : searchShardsResponses.entrySet()) { + for (ClusterSearchShardsGroup group : entry.getValue().getGroups()) { + final ShardId shardId = group.getShardId(); + final String clusterAlias = entry.getKey(); + final SearchContextIdForNode perNode = searchContextId.shards().get(shardId); + assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias(); + final List targetNodes = List.of(perNode.getNode()); + SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, targetNodes, + remoteClusterIndices.get(clusterAlias), perNode.getSearchContextId(), searchContextKeepAlive); + remoteShardIterators.add(shardIterator); + } + } + return remoteShardIterators; } private Index[] resolveLocalIndices(OriginalIndices localIndices, @@ -568,39 +613,34 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea final List localShardIterators; final Map aliasFilter; final Map> indexRoutings; - final Executor asyncSearchExecutor; - boolean preFilterSearchShards; + final String[] concreteLocalIndices; if (searchContext != null) { assert searchRequest.pointInTimeBuilder() != null; aliasFilter = searchContext.aliasFilter(); indexRoutings = Map.of(); - asyncSearchExecutor = asyncSearchExecutor(localIndices.indices(), clusterState); - localShardIterators = getSearchShardsFromSearchContexts(clusterState, localIndices, searchRequest.getLocalClusterAlias(), - searchContext, searchRequest.pointInTimeBuilder().getKeepAlive()); - preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, localIndices.indices(), - localShardIterators.size() + remoteShardIterators.size()); + concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices(); + localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices, + searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive()); } else { final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); - final String[] concreteIndices = new String[indices.length]; + concreteLocalIndices = new String[indices.length]; for (int i = 0; i < indices.length; i++) { - concreteIndices[i] = indices[i].getName(); + concreteLocalIndices[i] = indices[i].getName(); } - asyncSearchExecutor = asyncSearchExecutor(concreteIndices, clusterState); Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardRoutings = clusterService.operationRouting().searchShards(clusterState, - concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); + concreteLocalIndices, routingMap, searchRequest.preference(), + searchService.getResponseCollectorService(), nodeSearchCounts); localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false) .map(it -> new SearchShardIterator( searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) .collect(Collectors.toList()); aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); indexRoutings = routingMap; - preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteIndices, - localShardIterators.size() + remoteShardIterators.size()); } final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators); @@ -630,6 +670,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea final DiscoveryNodes nodes = clusterState.nodes(); BiFunction connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(), nodes::get, remoteConnections, searchTransportService::getConnection); + final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState); + final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices, + localShardIterators.size() + remoteShardIterators.size()); searchAsyncActionProvider.asyncSearchAction( task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState, Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, indexRoutings, listener, @@ -873,27 +916,21 @@ static Map getIndicesFromSearchContexts(SearchContextId .collect(Collectors.toMap(Map.Entry::getKey, e -> new OriginalIndices(e.getValue().toArray(String[]::new), indicesOptions))); } - static List getSearchShardsFromSearchContexts(ClusterState clusterState, - OriginalIndices originalIndices, - String localClusterAlias, - SearchContextId searchContext, - TimeValue keepAlive) { + static List getLocalLocalShardsIteratorFromPointInTime(ClusterState clusterState, + OriginalIndices originalIndices, + String localClusterAlias, + SearchContextId searchContext, + TimeValue keepAlive) { final List iterators = new ArrayList<>(searchContext.shards().size()); for (Map.Entry entry : searchContext.shards().entrySet()) { - final ShardId shardId = entry.getKey(); - final ShardIterator shards = OperationRouting.getShards(clusterState, shardId); - final List matchingNodeFirst = new ArrayList<>(shards.size()); - final String nodeId = entry.getValue().getNode(); - // always search the matching node first even when its shard was relocated to another node - // because the point in time should keep the corresponding search context open. - matchingNodeFirst.add(nodeId); - for (ShardRouting shard : shards) { - if (shard.currentNodeId().equals(nodeId) == false) { - matchingNodeFirst.add(shard.currentNodeId()); - } + final SearchContextIdForNode perNode = entry.getValue(); + if (Strings.isEmpty(perNode.getClusterAlias())) { + final ShardId shardId = entry.getKey(); + OperationRouting.getShards(clusterState, shardId); + final List targetNodes = List.of(perNode.getNode()); + iterators.add(new SearchShardIterator(localClusterAlias, shardId, targetNodes, originalIndices, + perNode.getSearchContextId(), keepAlive)); } - iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirst, originalIndices, - entry.getValue().getSearchContextId(), keepAlive)); } return iterators; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 91ba7af424b47..4511a9a145b73 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -168,16 +168,17 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r if (scroll != null) { searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll"))); } - searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions())); - searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); checkRestTotalHits(request, searchRequest); if (searchRequest.pointInTimeBuilder() != null) { - preparePointInTime(searchRequest, namedWriteableRegistry); + preparePointInTime(searchRequest, request, namedWriteableRegistry); + } else { + searchRequest.setCcsMinimizeRoundtrips( + request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); } } @@ -293,7 +294,7 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil } } - static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) { + static void preparePointInTime(SearchRequest request, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry) { assert request.pointInTimeBuilder() != null; ActionRequestValidationException validationException = null; if (request.indices().length > 0) { @@ -308,6 +309,11 @@ static void preparePointInTime(SearchRequest request, NamedWriteableRegistry nam if (request.preference() != null) { validationException = addValidationError("[preference] cannot be used with point in time", validationException); } + if (restRequest.paramAsBoolean("ccs_minimize_roundtrips", false)) { + validationException = + addValidationError("[ccs_minimize_roundtrips] cannot be used with point in time", validationException); + request.setCcsMinimizeRoundtrips(false); + } ExceptionsHelper.reThrowIfNotNull(validationException); final IndicesOptions indicesOptions = request.indicesOptions(); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index f4c5b0a35d5b4..880c609d74d4f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -208,7 +208,6 @@ public void testProcessRemoteShards() { null)) { RemoteClusterService service = transportService.getRemoteClusterService(); assertFalse(service.isCrossClusterSearchEnabled()); - List iteratorList = new ArrayList<>(); Map searchShardsResponseMap = new HashMap<>(); DiscoveryNode[] nodes = new DiscoveryNode[] { new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), @@ -246,9 +245,9 @@ public void testProcessRemoteShards() { new OriginalIndices(new String[]{"fo*", "ba*"}, SearchRequest.DEFAULT_INDICES_OPTIONS)); remoteIndicesByCluster.put("test_cluster_2", new OriginalIndices(new String[]{"x*"}, SearchRequest.DEFAULT_INDICES_OPTIONS)); - Map remoteAliases = new HashMap<>(); - TransportSearchAction.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList, - remoteAliases); + Map remoteAliases = TransportSearchAction.getRemoteAliasFilters(searchShardsResponseMap); + List iteratorList = + TransportSearchAction.getRemoteShardsIterator(searchShardsResponseMap, remoteIndicesByCluster, remoteAliases); assertEquals(4, iteratorList.size()); for (SearchShardIterator iterator : iteratorList) { if (iterator.shardId().getIndexName().endsWith("foo")) { diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index 23ab89d1d05c7..47ae97e427c8f 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -89,7 +89,6 @@ public void setupSuiteScopeCluster() throws InterruptedException { indexRandom(true, true, reqs); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61790") public void testMaxMinAggregation() throws Exception { int step = numShards > 2 ? randomIntBetween(2, numShards) : 2; int numFailures = randomBoolean() ? randomIntBetween(0, numShards) : 0; @@ -134,7 +133,6 @@ public void testMaxMinAggregation() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61790") public void testTermsAggregation() throws Exception { int step = numShards > 2 ? randomIntBetween(2, numShards) : 2; int numFailures = randomBoolean() ? randomIntBetween(0, numShards) : 0; diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/CCSPointInTimeIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/CCSPointInTimeIT.java new file mode 100644 index 0000000000000..6c99f772075d2 --- /dev/null +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/CCSPointInTimeIT.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.search; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; + +public class CCSPointInTimeIT extends AbstractMultiClustersTestCase { + + @Override + protected Collection remoteClusterAlias() { + return List.of("remote_cluster"); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + final List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(LocalStateCompositeXPackPlugin.class); + return plugins; + } + + void indexDocs(Client client, String index, int numDocs) { + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + client.prepareIndex(index).setId(id).setSource("value", i).get(); + } + client.admin().indices().prepareRefresh(index).get(); + } + + public void testBasic() { + final Client localClient = client(LOCAL_CLUSTER); + final Client remoteClient = client("remote_cluster"); + int localNumDocs = randomIntBetween(10, 50); + assertAcked(localClient.admin().indices().prepareCreate("local_test")); + indexDocs(localClient, "local_test", localNumDocs); + + int remoteNumDocs = randomIntBetween(10, 50); + assertAcked(remoteClient.admin().indices().prepareCreate("remote_test")); + indexDocs(remoteClient, "remote_test", remoteNumDocs); + boolean includeLocalIndex = randomBoolean(); + List indices = new ArrayList<>(); + if (includeLocalIndex) { + indices.add( randomFrom("*", "local_*", "local_test")); + } + indices.add(randomFrom("*:*", "remote_cluster:*", "remote_cluster:remote_test")); + String pitId = openPointInTime(indices.toArray(new String[0]), TimeValue.timeValueMinutes(2)); + try { + if (randomBoolean()) { + localClient.prepareIndex("local_test").setId("local_new").setSource().get(); + localClient.admin().indices().prepareRefresh().get(); + } + if (randomBoolean()) { + remoteClient.prepareIndex("remote_test").setId("remote_new").setSource().get(); + remoteClient.admin().indices().prepareRefresh().get(); + } + SearchResponse resp = localClient.prepareSearch() + .setPreference(null) + .setQuery(new MatchAllQueryBuilder()) + .setSearchContext(pitId, TimeValue.timeValueMinutes(2)) + .setSize(1000) + .get(); + assertNoFailures(resp); + assertHitCount(resp, (includeLocalIndex ? localNumDocs : 0) + remoteNumDocs); + } finally { + closePointInTime(pitId); + } + } + + private String openPointInTime(String[] indices, TimeValue keepAlive) { + OpenPointInTimeRequest request = new OpenPointInTimeRequest( + indices, + OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, + keepAlive, + null, + null + ); + final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); + return response.getSearchContextId(); + } + + private void closePointInTime(String readerId) { + client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(readerId)).actionGet(); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java index bf04039687816..04beae9ad83e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Objects; public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject { private static final ParseField ID = new ParseField("id"); @@ -21,7 +22,7 @@ public final class OpenPointInTimeResponse extends ActionResponse implements ToX private final String searchContextId; public OpenPointInTimeResponse(String searchContextId) { - this.searchContextId = searchContextId; + this.searchContextId = Objects.requireNonNull(searchContextId); } public OpenPointInTimeResponse(StreamInput in) throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java index d6cb1ee75916f..5ebb4731ec51a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java @@ -72,6 +72,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen .preference(request.preference()) .routing(request.routing()) .allowPartialSearchResults(false); + searchRequest.setCcsMinimizeRoundtrips(false); transportSearchAction.executeRequest( task, searchRequest, @@ -91,7 +92,10 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen new ActionListenerResponseHandler(phaseListener, ShardOpenReaderResponse::new) ); }, - ActionListener.map(listener, r -> new OpenPointInTimeResponse(r.pointInTimeId())) + ActionListener.map(listener, r -> { + assert r.pointInTimeId() != null : r; + return new OpenPointInTimeResponse(r.pointInTimeId()); + }) ); } diff --git a/x-pack/qa/multi-cluster-search-security/build.gradle b/x-pack/qa/multi-cluster-search-security/build.gradle index 9e7196a4db650..aa745149953fd 100644 --- a/x-pack/qa/multi-cluster-search-security/build.gradle +++ b/x-pack/qa/multi-cluster-search-security/build.gradle @@ -10,7 +10,7 @@ dependencies { restResources { restApi { - includeXpack 'security', 'async_search', 'indices' + includeXpack 'security', 'async_search', 'indices', 'open_point_in_time', 'close_point_in_time' } } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml index f777187efdec4..3d9dfefc11df2 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml @@ -30,13 +30,15 @@ - match: {indices.5.attributes.0: open} - match: {indices.6.name: my_remote_cluster:field_caps_index_3} - match: {indices.6.attributes.0: open} - - match: {indices.7.name: my_remote_cluster:secured_via_alias} - - match: {indices.7.attributes.0: open} - - match: {indices.8.name: my_remote_cluster:single_doc_index} + - match: {indices.7.name: my_remote_cluster:point_in_time_index } + - match: {indices.7.attributes.0: open } + - match: {indices.8.name: my_remote_cluster:secured_via_alias} - match: {indices.8.attributes.0: open} - - match: {indices.9.name: my_remote_cluster:test_index} - - match: {indices.9.aliases.0: aliased_test_index} - - match: {indices.9.attributes.0: open} + - match: {indices.9.name: my_remote_cluster:single_doc_index} + - match: {indices.10.attributes.0: open} + - match: {indices.10.name: my_remote_cluster:test_index} + - match: {indices.10.aliases.0: aliased_test_index} + - match: {indices.10.attributes.0: open} - match: {aliases.0.name: my_remote_cluster:.security} - match: {aliases.0.indices.0: .security-7} - match: {aliases.1.name: my_remote_cluster:aliased_closed_index} diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/80_point_in_time.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/80_point_in_time.yml new file mode 100644 index 0000000000000..0aac5ce25e56d --- /dev/null +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/80_point_in_time.yml @@ -0,0 +1,113 @@ +--- +setup: + - skip: + features: headers + + - do: + cluster.health: + wait_for_status: yellow + - do: + security.put_user: + username: "joe" + body: > + { + "password": "s3krit", + "roles" : [ "x_cluster_role" ] + } + - do: + security.put_role: + name: "x_cluster_role" + body: > + { + "cluster": [], + "indices": [ + { + "names": ["local_pit", "my_remote_cluster:point_in_time_index"], + "privileges": ["read"] + } + ] + } + + - do: + security.put_user: + username: "remote" + body: > + { + "password": "s3krit", + "roles" : [ "remote_ccs" ] + } + - do: + security.put_role: + name: "remote_ccs" + body: > + { + } +--- +teardown: + - do: + security.delete_user: + username: "joe" + ignore: 404 + - do: + security.delete_role: + name: "x_cluster_role" + ignore: 404 +--- +"Search with point in time": + + - do: + indices.create: + index: local_pit + body: + settings: + index: + number_of_shards: 2 + number_of_replicas: 0 + mappings: + properties: + created_at: + type: date + format: "yyyy-MM-dd" + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "local_pit"}}' + - '{"f": "l1", "created_at" : "2020-01-01"}' + - '{"index": {"_index": "local_pit"}}' + - '{"f": "l2", "created_at" : "2021-01-02"}' + + - do: + headers: { Authorization: "Basic am9lOnMza3JpdA==" } + open_point_in_time: + index: my_remote_cluster:point_in_time_index,local_pit + keep_alive: 5m + - set: {id: pit_id} + + - do: + headers: { Authorization: "Basic am9lOnMza3JpdA==" } + search: + rest_total_hits_as_int: true + sort: created_at + body: + query: + range: + created_at: + gte: "2020-01-03" + pit: + id: "$pit_id" + keep_alive: 1m + + - match: { hits.total: 3 } + - match: { hits.hits.0._index: "my_remote_cluster:point_in_time_index" } + - match: { hits.hits.0._source.f: "r3" } + - match: { hits.hits.1._index: "my_remote_cluster:point_in_time_index" } + - match: { hits.hits.1._source.f: "r4" } + - match: { hits.hits.2._index: "local_pit" } + - match: { hits.hits.2._source.f: "l2" } + + - do: + headers: { Authorization: "Basic am9lOnMza3JpdA==" } + close_point_in_time: + body: + id: "$pit_id" diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml index a87eb9bd63bb8..46ffe74079419 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml @@ -23,7 +23,7 @@ setup: "indices": [ { "names": ["single_doc_index", "secure_alias", "test_index", "aliased_test_index", "field_caps_index_1", - "field_caps_index_3"], + "field_caps_index_3", "point_in_time_index"], "privileges": ["read", "read_cross_cluster"] } ] @@ -46,7 +46,7 @@ setup: "indices": [ { "names": ["single_doc_index", "secure_alias", "test_index", "aliased_test_index", "field_caps_index_1", - "field_caps_index_3"], + "field_caps_index_3", "point_in_time_index"], "privileges": ["read", "read_cross_cluster"] } ] @@ -272,3 +272,30 @@ setup: "roles" : [ ] } - match: { created: false } + + - do: + indices.create: + index: point_in_time_index + body: + settings: + index: + number_of_shards: 2 + number_of_replicas: 0 + mappings: + properties: + created_at: + type: date + format: "yyyy-MM-dd" + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r1", "created_at" : "2020-01-01"}' + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r2", "created_at" : "2020-01-02"}' + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r3", "created_at" : "2020-01-03"}' + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r4", "created_at" : "2020-01-04"}' +