Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ability to minimize round-trips in CCS #37828

Merged
merged 33 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
337fd75
Adapt SearchRequest constructor
javanna Jan 21, 2019
f100b46
add default from and size public constants
javanna Jan 21, 2019
f3daf69
[TEST] add search handler to test transport
javanna Jan 21, 2019
e05725e
add comment on registering same cluster multiple times
javanna Jan 21, 2019
eae918b
wip
javanna Jan 21, 2019
2703d74
add search request ccs_execution_mode option
javanna Jan 22, 2019
c9b737b
done
javanna Jan 23, 2019
a6404e3
import
javanna Jan 24, 2019
0b064a9
progress
javanna Jan 24, 2019
650e876
fix
javanna Jan 24, 2019
74d52cb
rename execution mode to reduce mode
javanna Jan 24, 2019
c5d9dc5
rename reduce modes
javanna Jan 24, 2019
589d69e
last renames and update docs
javanna Jan 24, 2019
56d845e
update spec
javanna Jan 24, 2019
01e9b27
add auto ccs_reduce_mode
javanna Jan 24, 2019
0e77e76
fix failing xpack qa test and expand others
javanna Jan 24, 2019
bf3fdb7
unify listeners and extract method that converts auto to an effective…
javanna Jan 24, 2019
58e3327
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 25, 2019
ee25840
adapt and expand unit tests
javanna Jan 25, 2019
e554e2b
extract ccsRemoteReduce method and add unit tests for it
javanna Jan 25, 2019
9fbff36
fix failing test
javanna Jan 25, 2019
da6c3cb
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 28, 2019
8c67836
Rename ccs_reduce_mode to ccs_minimize_roundtrips and make it a boolean.
javanna Jan 29, 2019
0f1d784
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 29, 2019
2bb80c4
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 30, 2019
9574278
add assert that scrollId is never set in search response
javanna Jan 30, 2019
f735784
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 30, 2019
27ae7dd
remove ccs_reduction from _clusters section
javanna Jan 30, 2019
4d8266f
fix selection between the two modes which got lost with the recent up…
javanna Jan 30, 2019
86b91cf
revert minor change
javanna Jan 30, 2019
2f1bbaf
fix failing test
javanna Jan 30, 2019
91d5df8
fix parameter name
javanna Jan 30, 2019
0165ec1
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ private static void addSearchRequestParams(Params params, SearchRequest searchRe
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT));
if (searchRequest.getCCSReduceMode() != null) {
params.putParam("ccs_reduce_mode", searchRequest.getCCSReduceMode().toString());
}
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.CCSReduceMode;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1239,7 +1240,7 @@ public void testMultiSearch() throws IOException {
requests.add(searchRequest);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())),
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null,
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, null,
xContentRegistry(), true);
assertEquals(requests, multiSearchRequest.requests());
}
Expand Down Expand Up @@ -1862,6 +1863,10 @@ private static void setRandomSearchParams(SearchRequest searchRequest,
searchRequest.scroll(randomTimeValue());
expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
}
if (randomBoolean()) {
searchRequest.setCCSReduceMode(randomFrom(CCSReduceMode.values()));
expectedParams.put("ccs_reduce_mode", searchRequest.getCCSReduceMode().toString());
}
}

static void setRandomIndicesOptions(Consumer<IndicesOptions> setter, Supplier<IndicesOptions> getter,
Expand Down
45 changes: 45 additions & 0 deletions docs/reference/modules/cross-cluster-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ GET /cluster_one:twitter/_search
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 2,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0,
"skipped": 0
},
"_clusters": {
"ccs_reduce_mode": "remote",
"total": 1,
"successful": 1,
"skipped": 0
Expand Down Expand Up @@ -130,13 +132,15 @@ will be prefixed with their remote cluster name:
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
"failed": 0,
"skipped": 0
},
"_clusters": {
"ccs_reduce_mode": "remote",
"total": 2,
"successful": 2,
"skipped": 0
Expand Down Expand Up @@ -222,13 +226,15 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
"failed": 0,
"skipped": 0
},
"_clusters": { <1>
"ccs_reduce_mode": "remote",
"total": 3,
"successful": 2,
"skipped": 1
Expand Down Expand Up @@ -273,3 +279,42 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/]
<1> The `clusters` section indicates that one cluster was unavailable and got skipped

[float]
[[ccs-reduce-mode]]
=== Reduce mode

When searching across remote clusters, there are two possible reduce modes:

- `remote`: the coordinating node sends one search request to each cluster.
Each cluster performs the search independently, reducing and fetching results
which are then returned to the caller. Once the CCS node has received all the
responses, it performs another reduction and returns the relevant results back
to the user. The `remote` reduce mode is beneficial when there is network
latency between the coordinating node and the remote clusters involved, which
is typically the case. A single request is sent to each remote cluster, at the
cost of retrieving `from` + `size` already fetched results.

- `local`: the coordinating node sends a <<search-shards,search shards>>
request to each remote cluster, in order to collect information about their
corresponding remote indices involved in the search request and the shards
where their data is located. Once each cluster has responded to such request,
the search executes as if all shards were part of the same cluster. The
coordinating node sends one request to each shard involved, each shard
executes the query and returns its own results which are then reduced (and
fetched, depending on the <<search-request-search-type, search type>>) by the
coordinating node. The `local` reduce mode is beneficial whenever there is very
low network latency between the coordinating node and the remote clusters
involved, as it treats all shards the same, at the cost of sending many
requests to each remote cluster.

By default `remote` is used whenever possible. In case a scroll is provided,
or inner hits are requested as part of field collapsing, `local` is used
instead. The <<search-request-body, search API>> supports the `ccs_reduce_mode`
parameter which allows to select the reduce mode if needed. The reduce mode
that was used for a cross-cluster search request is returned as part of the
`_clusters` section.

Note that all the communication between the nodes, regardless of which cluster
they belong to and the selected reduce mode, happens through the
<<modules-transport,transport layer>>.
4 changes: 4 additions & 0 deletions docs/reference/search/request-body.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ And here is a sample response:
reduce the memory overhead per search request if the potential number of
shards in the request can be large.

`ccs_reduce_mode`::

The cross-cluster search reduce mode. Can be `local` or `remote`.
See <<ccs-reduce-mode>> for more.


Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public MultiSearchTemplateRequest add(SearchTemplateRequest request) {
return this;
}


/**
* Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.script.mustache;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.CCSReduceMode;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -50,7 +51,7 @@ protected MultiSearchTemplateResponse createTestInstance() {
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
Expand All @@ -59,22 +60,28 @@ protected MultiSearchTemplateResponse createTestInstance() {
}
return new MultiSearchTemplateResponse(items, overallTookInMillis);
}


private static SearchResponse.Clusters randomClusters() {
int totalClusters = randomIntBetween(0, 10);
int successfulClusters = randomIntBetween(0, totalClusters);
int skippedClusters = totalClusters - successfulClusters;
CCSReduceMode executionMode = randomFrom(CCSReduceMode.values());
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters, executionMode);
}

private static MultiSearchTemplateResponse createTestInstanceWithFailures() {
int numItems = randomIntBetween(0, 128);
long overallTookInMillis = randomNonNegativeLong();
MultiSearchTemplateResponse.Item[] items = new MultiSearchTemplateResponse.Item[numItems];
for (int i = 0; i < numItems; i++) {
if (randomBoolean()) {
// Creating a minimal response is OK, because SearchResponse self
// is tested elsewhere.
// Creating a minimal response is OK, because SearchResponse is tested elsewhere.
long tookInMillis = randomNonNegativeLong();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
Expand Down Expand Up @@ -133,6 +140,5 @@ public void testFromXContentWithFailures() throws IOException {
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilterWhenResultHasErrors(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
Expand All @@ -32,9 +33,12 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.CCSReduceMode;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand All @@ -49,6 +53,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -107,6 +113,14 @@ private static MockTransportService startTransport(
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new,
(request, channel, task) -> {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
channel.sendResponse(searchResponse);
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
Expand Down Expand Up @@ -151,6 +165,7 @@ public void testSearchSkipUnavailable() throws IOException {
assertEquals(2, response.getClusters().getTotal());
assertEquals(2, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(CCSReduceMode.REMOTE, response.getClusters().getCCSReduceMode());
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
}
Expand All @@ -159,6 +174,7 @@ public void testSearchSkipUnavailable() throws IOException {
assertEquals(1, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(CCSReduceMode.REMOTE, response.getClusters().getCCSReduceMode());
assertEquals(0, response.getHits().getTotalHits().value);
}

Expand All @@ -168,6 +184,7 @@ public void testSearchSkipUnavailable() throws IOException {
assertEquals(2, response.getClusters().getTotal());
assertEquals(2, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(CCSReduceMode.LOCAL, response.getClusters().getCCSReduceMode());
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
String scrollId = response.getScrollId();
Expand All @@ -186,6 +203,7 @@ public void testSearchSkipUnavailable() throws IOException {
assertEquals(2, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(CCSReduceMode.REMOTE, response.getClusters().getCCSReduceMode());
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
}
Expand All @@ -194,6 +212,7 @@ public void testSearchSkipUnavailable() throws IOException {
assertEquals(1, response.getClusters().getTotal());
assertEquals(0, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(CCSReduceMode.REMOTE, response.getClusters().getCCSReduceMode());
assertEquals(0, response.getHits().getTotalHits().value);
}

Expand All @@ -203,6 +222,7 @@ public void testSearchSkipUnavailable() throws IOException {
assertEquals(2, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(CCSReduceMode.LOCAL, response.getClusters().getCCSReduceMode());
assertEquals(10, response.getHits().getTotalHits().value);
assertEquals(10, response.getHits().getHits().length);
String scrollId = response.getScrollId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
terms:
field: f1.keyword

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: {num}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- gte: { hits.hits.0._seq_no: 0 }
Expand All @@ -59,6 +64,10 @@
terms:
field: f1.keyword

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: { _shards.total: 5 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
Expand All @@ -76,6 +85,10 @@
terms:
field: f1.keyword

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: { _shards.total: 3 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
Expand All @@ -93,6 +106,7 @@
terms:
field: f1.keyword

- is_false: _clusters
- match: { _shards.total: 2 }
- match: { hits.total: 5}
- match: { hits.hits.0._index: "test_index"}
Expand Down Expand Up @@ -122,6 +136,10 @@
rest_total_hits_as_int: true
index: test_remote_cluster:test_index

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
Expand All @@ -148,6 +166,10 @@
rest_total_hits_as_int: true
index: "*:test_index"

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: { _shards.total: 6 }
- match: { hits.total: 12 }

Expand All @@ -159,6 +181,10 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: { _shards.total: 3 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
Expand All @@ -172,6 +198,10 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: { _shards.total: 4 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
Expand All @@ -185,6 +215,10 @@
rest_total_hits_as_int: true
index: "my_remote_cluster:single_doc_index"

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: {_clusters.ccs_reduce_mode: "remote"}
- match: { _shards.total: 1 }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
Loading