Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ability to minimize round-trips in CCS #37828

Merged
merged 33 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
337fd75
Adapt SearchRequest constructor
javanna Jan 21, 2019
f100b46
add default from and size public constants
javanna Jan 21, 2019
f3daf69
[TEST] add search handler to test transport
javanna Jan 21, 2019
e05725e
add comment on registering same cluster multiple times
javanna Jan 21, 2019
eae918b
wip
javanna Jan 21, 2019
2703d74
add search request ccs_execution_mode option
javanna Jan 22, 2019
c9b737b
done
javanna Jan 23, 2019
a6404e3
import
javanna Jan 24, 2019
0b064a9
progress
javanna Jan 24, 2019
650e876
fix
javanna Jan 24, 2019
74d52cb
rename execution mode to reduce mode
javanna Jan 24, 2019
c5d9dc5
rename reduce modes
javanna Jan 24, 2019
589d69e
last renames and update docs
javanna Jan 24, 2019
56d845e
update spec
javanna Jan 24, 2019
01e9b27
add auto ccs_reduce_mode
javanna Jan 24, 2019
0e77e76
fix failing xpack qa test and expand others
javanna Jan 24, 2019
bf3fdb7
unify listeners and extract method that converts auto to an effective…
javanna Jan 24, 2019
58e3327
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 25, 2019
ee25840
adapt and expand unit tests
javanna Jan 25, 2019
e554e2b
extract ccsRemoteReduce method and add unit tests for it
javanna Jan 25, 2019
9fbff36
fix failing test
javanna Jan 25, 2019
da6c3cb
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 28, 2019
8c67836
Rename ccs_reduce_mode to ccs_minimize_roundtrips and make it a boolean.
javanna Jan 29, 2019
0f1d784
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 29, 2019
2bb80c4
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 30, 2019
9574278
add assert that scrollId is never set in search response
javanna Jan 30, 2019
f735784
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 30, 2019
27ae7dd
remove ccs_reduction from _clusters section
javanna Jan 30, 2019
4d8266f
fix selection between the two modes which got lost with the recent up…
javanna Jan 30, 2019
86b91cf
revert minor change
javanna Jan 30, 2019
2f1bbaf
fix failing test
javanna Jan 30, 2019
91d5df8
fix parameter name
javanna Jan 30, 2019
0165ec1
Merge branch 'master' into feature/ccs_local_reduction
javanna Jan 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ private static void addSearchRequestParams(Params params, SearchRequest searchRe
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ public void testMultiSearch() throws IOException {
requests.add(searchRequest);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())),
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null,
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, null,
xContentRegistry(), true);
assertEquals(requests, multiSearchRequest.requests());
}
Expand Down Expand Up @@ -1862,6 +1862,10 @@ private static void setRandomSearchParams(SearchRequest searchRequest,
searchRequest.scroll(randomTimeValue());
expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
}
if (randomBoolean()) {
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
}
expectedParams.put("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}

static void setRandomIndicesOptions(Consumer<IndicesOptions> setter, Supplier<IndicesOptions> getter,
Expand Down
43 changes: 43 additions & 0 deletions docs/reference/modules/cross-cluster-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ GET /cluster_one:twitter/_search
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 2,
"_shards": {
"total": 1,
"successful": 1,
Expand Down Expand Up @@ -130,6 +131,7 @@ will be prefixed with their remote cluster name:
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
Expand Down Expand Up @@ -222,6 +224,7 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
Expand Down Expand Up @@ -273,3 +276,43 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/]
<1> The `clusters` section indicates that one cluster was unavailable and got skipped

[float]
[[ccs-reduction]]
=== CCS reduction phase

Cross-cluster search requests can be executed in two ways:

- the CCS coordinating node minimizes network round-trips by sending one search
request to each cluster. Each cluster performs the search independently,
reducing and fetching results. Once the CCS node has received all the
responses, it performs another reduction and returns the relevant results back
to the user. This strategy is beneficial when there is network latency between
the CCS coordinating node and the remote clusters involved, which is typically
the case. A single request is sent to each remote cluster, at the cost of
retrieving `from` + `size` already fetched results. This is the default
strategy, used whenever possible. In case a scroll is provided, or inner hits
are requested as part of field collapsing, this strategy is not supported hence
network round-trips cannot be minimized and the following strategy is used
instead.

- the CCS coordinating node sends a <<search-shards,search shards>> request to
each remote cluster, in order to collect information about their corresponding
remote indices involved in the search request and the shards where their data
is located. Once each cluster has responded to such request, the search
executes as if all shards were part of the same cluster. The coordinating node
sends one request to each shard involved, each shard executes the query and
returns its own results which are then reduced (and fetched, depending on the
<<search-request-search-type, search type>>) by the CCS coordinating node.
This strategy may be beneficial whenever there is very low network latency
between the CCS coordinating node and the remote clusters involved, as it
treats all shards the same, at the cost of sending many requests to each remote
cluster, which is problematic in presence of network latency.

The <<search-request-body, search API>> supports the `ccs_minimize_roundtrips`
parameter, which defaults to `true` and can be set to `false` in case
minimizing network round-trips is not desirable.

Note that all the communication between the nodes, regardless of which cluster
they belong to and the selected reduce mode, happens through the
<<modules-transport,transport layer>>.
5 changes: 5 additions & 0 deletions docs/reference/search/request-body.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ And here is a sample response:
reduce the memory overhead per search request if the potential number of
shards in the request can be large.

`ccs_minimize_roundtrips`::

Defaults to `true`. Set to `false` to disable minimizing network round-trips
between the coordinating node and the remote clusters when executing
cross-cluster search requests. See <<ccs-reduction>> for more.


Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public MultiSearchTemplateRequest add(SearchTemplateRequest request) {
return this;
}


/**
* Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected MultiSearchTemplateResponse createTestInstance() {
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
Expand All @@ -59,22 +59,27 @@ protected MultiSearchTemplateResponse createTestInstance() {
}
return new MultiSearchTemplateResponse(items, overallTookInMillis);
}


private static SearchResponse.Clusters randomClusters() {
int totalClusters = randomIntBetween(0, 10);
int successfulClusters = randomIntBetween(0, totalClusters);
int skippedClusters = totalClusters - successfulClusters;
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
}

private static MultiSearchTemplateResponse createTestInstanceWithFailures() {
int numItems = randomIntBetween(0, 128);
long overallTookInMillis = randomNonNegativeLong();
MultiSearchTemplateResponse.Item[] items = new MultiSearchTemplateResponse.Item[numItems];
for (int i = 0; i < numItems; i++) {
if (randomBoolean()) {
// Creating a minimal response is OK, because SearchResponse self
// is tested elsewhere.
// Creating a minimal response is OK, because SearchResponse is tested elsewhere.
long tookInMillis = randomNonNegativeLong();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
Expand Down Expand Up @@ -133,6 +138,5 @@ public void testFromXContentWithFailures() throws IOException {
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilterWhenResultHasErrors(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
Expand All @@ -32,9 +33,11 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand All @@ -49,6 +52,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -107,6 +112,14 @@ private static MockTransportService startTransport(
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new,
(request, channel, task) -> {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
channel.sendResponse(searchResponse);
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
terms:
field: f1.keyword

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: {num}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- gte: { hits.hits.0._seq_no: 0 }
Expand All @@ -59,6 +63,9 @@
terms:
field: f1.keyword

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
Expand All @@ -76,6 +83,9 @@
terms:
field: f1.keyword

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
Expand All @@ -93,6 +103,7 @@
terms:
field: f1.keyword

- is_false: _clusters
- match: { _shards.total: 2 }
- match: { hits.total: 5}
- match: { hits.hits.0._index: "test_index"}
Expand Down Expand Up @@ -122,6 +133,9 @@
rest_total_hits_as_int: true
index: test_remote_cluster:test_index

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
Expand All @@ -148,6 +162,9 @@
rest_total_hits_as_int: true
index: "*:test_index"

- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 6 }
- match: { hits.total: 12 }

Expand All @@ -159,6 +176,9 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
Expand All @@ -172,6 +192,9 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 4 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
Expand All @@ -185,6 +208,9 @@
rest_total_hits_as_int: true
index: "my_remote_cluster:single_doc_index"

- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 1 }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
ccs_minimize_roundtrips: false
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } }

- match: { hits.total: 1 }
- match: { hits.hits.0._index: "skip_shards_index"}
- is_false: num_reduce_phases
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}
Expand All @@ -45,10 +47,12 @@
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
ccs_minimize_roundtrips: false
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2015-02-01", "lt": "2016-02-01"} } } }

- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
- is_false: num_reduce_phases
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
"type" : "boolean",
"description" : "Specify whether wildcard and prefix queries should be analyzed (default: false)"
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
},
"default_operator": {
"type" : "enum",
"options" : ["AND","OR"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},
Expand Down
Loading