-
Notifications
You must be signed in to change notification settings - Fork 25k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix search states of CCS requests in mixed cluster (#71265)
Previously, the search states are stored in ReaderContext on data nodes. Since 7.10, we send them to the coordinating node in a QuerySearchResult of a ShardSearchRequest and the coordinating node then sends them back in ShardFetchSearchRequest. We must keep the search states in data nodes unless they are sent back in the fetch phase. We used the channel version to determine this guarantee. However, it's not correct in CCS requests in mixed clusters. 1. The coordinating node of the local cluster on the old version sends a ShardSearchRequest to a proxy node of the remote cluster on the new version. That proxy node delivers the request to the data node. In this case, the channel version between the data node and the proxy node is >= 7.10, but we won't receive the search states in the fetch phase as they are stripped out in the channel between the old coordinating node and the new proxy. ``` [coordinating node v7.9] --> [proxy node v7.10] --> [data node on v7.10] ``` 2. The coordinating node of the local on the new version sends a ShardSearchRequest to a proxy node of the remote cluster on the new version. However, the coordinating node sends a ShardFetchSearchRequest to another proxy node of the remote cluster that is still on an old version. The search states then are stripped out and never reach the data node. ``` -> query phase: [coordinating node v7.10] --> [proxy node v7.10] --> [data node on v7.10] -> fetch phase: [coordinating node v7.10] --> [proxy node v7.9] --> [data node on v7.10] ``` This commit fixes the first issue by explicitly serializing the channel version in a ShardSearchRequest and the second by continue storing the search states in ReaderContext unless all nodes are upgraded. Relates #52741
- Loading branch information
Showing
13 changed files
with
461 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
import org.elasticsearch.gradle.Version | ||
import org.elasticsearch.gradle.info.BuildParams | ||
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask | ||
|
||
apply plugin: 'elasticsearch.testclusters' | ||
apply plugin: 'elasticsearch.standalone-test' | ||
apply from: "$rootDir/gradle/bwc-test.gradle" | ||
apply plugin: 'elasticsearch.rest-resources' | ||
|
||
dependencies { | ||
testImplementation project(':client:rest-high-level') | ||
} | ||
|
||
for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { | ||
String baseName = "v${bwcVersion}" | ||
String bwcVersionStr = "${bwcVersion}" | ||
|
||
/** | ||
* We execute tests 3 times. | ||
* - The local cluster is unchanged and it consists of an old version node and a new version node. | ||
* - Nodes in the remote cluster are upgraded one by one in three steps. | ||
* - Only node-0 and node-2 of the remote cluster can accept remote connections. This can creates a test | ||
* scenario where a query request and fetch request are sent via **proxy nodes** that have different version. | ||
*/ | ||
testClusters { | ||
"${baseName}-local" { | ||
numberOfNodes = 2 | ||
versions = [bwcVersionStr, project.version] | ||
setting 'cluster.remote.node.attr', 'gateway' | ||
} | ||
"${baseName}-remote" { | ||
numberOfNodes = 3 | ||
versions = [bwcVersionStr, project.version] | ||
firstNode.setting 'node.attr.gateway', 'true' | ||
lastNode.setting 'node.attr.gateway', 'true' | ||
} | ||
} | ||
|
||
tasks.withType(StandaloneRestIntegTestTask).matching { it.name.startsWith("${baseName}#") }.configureEach { | ||
useCluster testClusters."${baseName}-local" | ||
useCluster testClusters."${baseName}-remote" | ||
systemProperty 'tests.upgrade_from_version', bwcVersionStr.replace('-SNAPSHOT', '') | ||
|
||
doFirst { | ||
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}-local".allHttpSocketURI.join(",")}") | ||
nonInputProperties.systemProperty('tests.rest.remote_cluster', "${-> testClusters."${baseName}-remote".allHttpSocketURI.join(",")}") | ||
} | ||
} | ||
|
||
tasks.register("${baseName}#oneThirdUpgraded", StandaloneRestIntegTestTask) { | ||
dependsOn "processTestResources" | ||
mustRunAfter("precommit") | ||
doFirst { | ||
testClusters."${baseName}-local".nextNodeToNextVersion() | ||
testClusters."${baseName}-remote".nextNodeToNextVersion() | ||
} | ||
} | ||
|
||
tasks.register("${baseName}#twoThirdUpgraded", StandaloneRestIntegTestTask) { | ||
dependsOn "${baseName}#oneThirdUpgraded" | ||
doFirst { | ||
testClusters."${baseName}-remote".nextNodeToNextVersion() | ||
} | ||
} | ||
|
||
tasks.register("${baseName}#fullUpgraded", StandaloneRestIntegTestTask) { | ||
dependsOn "${baseName}#twoThirdUpgraded" | ||
doFirst { | ||
testClusters."${baseName}-remote".nextNodeToNextVersion() | ||
} | ||
} | ||
|
||
tasks.register(bwcTaskName(bwcVersion)) { | ||
dependsOn tasks.named("${baseName}#fullUpgraded") | ||
} | ||
} |
254 changes: 254 additions & 0 deletions
254
...lling-upgrade-remote-cluster/src/test/java/org/elasticsearch/upgrades/SearchStatesIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.upgrades; | ||
|
||
import org.apache.http.HttpHost; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.action.search.SearchResponse; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.client.RequestOptions; | ||
import org.elasticsearch.client.Response; | ||
import org.elasticsearch.client.RestClient; | ||
import org.elasticsearch.client.RestHighLevelClient; | ||
import org.elasticsearch.client.indices.CreateIndexRequest; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.DeprecationHandler; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.common.xcontent.json.JsonXContent; | ||
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; | ||
import org.elasticsearch.test.rest.ESRestTestCase; | ||
import org.elasticsearch.test.rest.yaml.ObjectPath; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.hamcrest.Matchers.empty; | ||
import static org.hamcrest.Matchers.hasSize; | ||
import static org.hamcrest.Matchers.not; | ||
|
||
/** | ||
* This test ensure that we keep the search states of a CCS request correctly when the local and remote clusters | ||
* have different but compatible versions. See SearchService#createAndPutReaderContext | ||
*/ | ||
public class SearchStatesIT extends ESRestTestCase { | ||
|
||
private static final Logger LOGGER = LogManager.getLogger(SearchStatesIT.class); | ||
private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version")); | ||
private static final String CLUSTER_ALIAS = "remote_cluster"; | ||
|
||
static class Node { | ||
final String id; | ||
final String name; | ||
final Version version; | ||
final String transportAddress; | ||
final String httpAddress; | ||
final Map<String, Object> attributes; | ||
|
||
Node(String id, String name, Version version, String transportAddress, String httpAddress, Map<String, Object> attributes) { | ||
this.id = id; | ||
this.name = name; | ||
this.version = version; | ||
this.transportAddress = transportAddress; | ||
this.httpAddress = httpAddress; | ||
this.attributes = attributes; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Node{" + | ||
"id='" + id + '\'' + | ||
", name='" + name + '\'' + | ||
", version=" + version + | ||
", transportAddress='" + transportAddress + '\'' + | ||
", httpAddress='" + httpAddress + '\'' + | ||
", attributes=" + attributes + | ||
'}'; | ||
} | ||
} | ||
|
||
static List<Node> getNodes(RestClient restClient) throws IOException { | ||
Response response = restClient.performRequest(new Request("GET", "_nodes")); | ||
ObjectPath objectPath = ObjectPath.createFromResponse(response); | ||
final Map<String, Object> nodeMap = objectPath.evaluate("nodes"); | ||
final List<Node> nodes = new ArrayList<>(); | ||
for (String id : nodeMap.keySet()) { | ||
final String name = objectPath.evaluate("nodes." + id + ".name"); | ||
final Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version")); | ||
final String transportAddress = objectPath.evaluate("nodes." + id + ".transport.publish_address"); | ||
final String httpAddress = objectPath.evaluate("nodes." + id + ".http.publish_address"); | ||
final Map<String, Object> attributes = objectPath.evaluate("nodes." + id + ".attributes"); | ||
nodes.add(new Node(id, name, version, transportAddress, httpAddress, attributes)); | ||
} | ||
return nodes; | ||
} | ||
|
||
static List<HttpHost> parseHosts(String props) { | ||
final String address = System.getProperty(props); | ||
assertNotNull("[" + props + "] is not configured", address); | ||
String[] stringUrls = address.split(","); | ||
List<HttpHost> hosts = new ArrayList<>(stringUrls.length); | ||
for (String stringUrl : stringUrls) { | ||
int portSeparator = stringUrl.lastIndexOf(':'); | ||
if (portSeparator < 0) { | ||
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]"); | ||
} | ||
String host = stringUrl.substring(0, portSeparator); | ||
int port = Integer.parseInt(stringUrl.substring(portSeparator + 1)); | ||
hosts.add(new HttpHost(host, port, "http")); | ||
} | ||
assertThat("[" + props + "] is empty", hosts, not(empty())); | ||
return hosts; | ||
} | ||
|
||
public static void configureRemoteClusters(List<Node> remoteNodes) throws Exception { | ||
assertThat(remoteNodes, hasSize(3)); | ||
final String remoteClusterSettingPrefix = "cluster.remote." + CLUSTER_ALIAS + "."; | ||
try (RestHighLevelClient localClient = newLocalClient()) { | ||
final Settings remoteConnectionSettings; | ||
if (randomBoolean()) { | ||
final List<String> seeds = remoteNodes.stream() | ||
.filter(n -> n.attributes.containsKey("gateway")) | ||
.map(n -> n.transportAddress) | ||
.collect(Collectors.toList()); | ||
assertThat(seeds, hasSize(2)); | ||
LOGGER.info("--> use sniff mode with seed [{}], remote nodes [{}]", seeds, remoteNodes); | ||
remoteConnectionSettings = Settings.builder() | ||
.putNull(remoteClusterSettingPrefix + "proxy_address") | ||
.put(remoteClusterSettingPrefix + "mode", "sniff") | ||
.putList(remoteClusterSettingPrefix + "seeds", seeds) | ||
.build(); | ||
} else { | ||
final Node proxyNode = randomFrom(remoteNodes); | ||
LOGGER.info("--> use proxy node [{}], remote nodes [{}]", proxyNode, remoteNodes); | ||
remoteConnectionSettings = Settings.builder() | ||
.putNull(remoteClusterSettingPrefix + "seeds") | ||
.put(remoteClusterSettingPrefix + "mode", "proxy") | ||
.put(remoteClusterSettingPrefix + "proxy_address", proxyNode.transportAddress) | ||
.build(); | ||
} | ||
assertTrue( | ||
localClient.cluster() | ||
.putSettings(new ClusterUpdateSettingsRequest().persistentSettings(remoteConnectionSettings), RequestOptions.DEFAULT) | ||
.isAcknowledged() | ||
); | ||
assertBusy(() -> { | ||
final Response resp = localClient.getLowLevelClient().performRequest(new Request("GET", "/_remote/info")); | ||
assertOK(resp); | ||
final ObjectPath objectPath = ObjectPath.createFromResponse(resp); | ||
assertNotNull(objectPath.evaluate(CLUSTER_ALIAS)); | ||
assertTrue(objectPath.evaluate(CLUSTER_ALIAS + ".connected")); | ||
}, 60, TimeUnit.SECONDS); | ||
} | ||
} | ||
|
||
static RestHighLevelClient newLocalClient() { | ||
final List<HttpHost> hosts = parseHosts("tests.rest.cluster"); | ||
final int index = random().nextInt(hosts.size()); | ||
LOGGER.info("Using client node {}", index); | ||
return new RestHighLevelClient(RestClient.builder(hosts.get(index))); | ||
} | ||
|
||
static RestHighLevelClient newRemoteClient() { | ||
return new RestHighLevelClient(RestClient.builder(randomFrom(parseHosts("tests.rest.remote_cluster")))); | ||
} | ||
|
||
static int indexDocs(RestHighLevelClient client, String index, int numDocs) throws IOException { | ||
for (int i = 0; i < numDocs; i++) { | ||
client.index(new IndexRequest(index).id("id_" + i).source("f", i), RequestOptions.DEFAULT); | ||
} | ||
client.indices().refresh(new RefreshRequest(index), RequestOptions.DEFAULT); | ||
return numDocs; | ||
} | ||
|
||
void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs) { | ||
try (RestHighLevelClient localClient = newLocalClient()) { | ||
Request request = new Request("POST", "/_search"); | ||
final int expectedDocs; | ||
if (randomBoolean()) { | ||
request.addParameter("index", remoteIndex); | ||
expectedDocs = remoteNumDocs; | ||
} else { | ||
request.addParameter("index", localIndex + "," + remoteIndex); | ||
expectedDocs = localNumDocs + remoteNumDocs; | ||
} | ||
if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_0_0)) { | ||
request.addParameter("ccs_minimize_roundtrips", Boolean.toString(randomBoolean())); | ||
} | ||
int size = between(1, 100); | ||
request.setJsonEntity("{\"sort\": \"f\", \"size\": " + size + "}"); | ||
Response response = localClient.getLowLevelClient().performRequest(request); | ||
try (XContentParser parser = JsonXContent.jsonXContent.createParser( | ||
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, | ||
response.getEntity().getContent())) { | ||
SearchResponse searchResponse = SearchResponse.fromXContent(parser); | ||
ElasticsearchAssertions.assertNoFailures(searchResponse); | ||
ElasticsearchAssertions.assertHitCount(searchResponse, expectedDocs); | ||
} | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
} | ||
|
||
public void testBWCSearchStates() throws Exception { | ||
String localIndex = "test_bwc_search_states_index"; | ||
String remoteIndex = "test_bwc_search_states_remote_index"; | ||
try (RestHighLevelClient localClient = newLocalClient(); | ||
RestHighLevelClient remoteClient = newRemoteClient()) { | ||
localClient.indices().create(new CreateIndexRequest(localIndex) | ||
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))), | ||
RequestOptions.DEFAULT); | ||
int localNumDocs = indexDocs(localClient, localIndex, between(10, 100)); | ||
|
||
remoteClient.indices().create(new CreateIndexRequest(remoteIndex) | ||
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))), | ||
RequestOptions.DEFAULT); | ||
int remoteNumDocs = indexDocs(remoteClient, remoteIndex, between(10, 100)); | ||
|
||
configureRemoteClusters(getNodes(remoteClient.getLowLevelClient())); | ||
int iterations = between(1, 20); | ||
for (int i = 0; i < iterations; i++) { | ||
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs); | ||
} | ||
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT); | ||
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.