Skip to content

Commit

Permalink
Emulate "fields" option on older versions (#75745)
Browse files Browse the repository at this point in the history
We introduced the "fields" option in search with version 7.10. With this
change we are trying to do a best-effort attempt at emulating this option
in mixed cluster or CCS scenarios where older nodes or clusters are targeted. 

This change emulates the fields behaviour by modifying the search request 
to include the respective parts of the "_source" field and then parses them back
into the "fields" section returned with the response. This will not be fully equivalent
to the post-7.10 "fields" functionality (e.g. we cannot get multi-field values which are
not part of “_source” from pre-7.10 nodes), but will include values present in “_source” 

In order to use the emulation, clients need to set the `enable_fields_emulation` flag on
the search request. Also the emulation behaviour will be limited to searches using the
default `query_then_fetch` search type. If both the “fields” option and source filtering
are mixed in the same search request, we will throw an error.
  • Loading branch information
Christoph Büscher authored Sep 14, 2021
1 parent e10bc3a commit 5a83dcd
Show file tree
Hide file tree
Showing 19 changed files with 991 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,10 @@
import org.elasticsearch.client.security.RefreshPolicy;
import org.elasticsearch.client.tasks.TaskId;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -63,6 +60,9 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.rankeval.RankEvalRequest;
Expand Down Expand Up @@ -422,6 +422,9 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
if (searchRequest.isCcsMinimizeRoundtrips() != SearchRequest.defaultCcsMinimizeRoundtrips(searchRequest)) {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
if (searchRequest.isFieldsOptionEmulationEnabled() != SearchRequest.DEFAULT_FIELDS_EMULATION_ENABLED) {
params.putParam("enable_fields_emulation", Boolean.toString(searchRequest.isFieldsOptionEmulationEnabled()));
}
if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
Expand Down Expand Up @@ -1353,6 +1353,8 @@ public void testMultiSearch() throws IOException {
});
// scroll is not supported in the current msearch api, so unset it:
searchRequest.scroll((Scroll) null);
// fields emulation is not supported in msearch api
searchRequest.setFieldsOptionEmulationEnabled(false);
// only expand_wildcards, ignore_unavailable and allow_no_indices can be
// specified from msearch api, so unset other options:
IndicesOptions randomlyGenerated = searchRequest.indicesOptions();
Expand Down Expand Up @@ -2141,6 +2143,13 @@ private static void setRandomSearchParams(SearchRequest searchRequest,
expectedParams.put("ccs_minimize_roundtrips", "false");
}
}
if (randomBoolean()) {
boolean enableFieldsEmulation = randomBoolean();
searchRequest.setFieldsOptionEmulationEnabled(enableFieldsEmulation);
if (enableFieldsEmulation) {
expectedParams.put("enable_fields_emulation", Boolean.toString(enableFieldsEmulation));
}
}
if (randomBoolean()) {
searchRequest.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE));
}
Expand Down
61 changes: 61 additions & 0 deletions qa/ccs-old-version-remote-cluster/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask

apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.bwc-test'
apply plugin: 'elasticsearch.rest-resources'

dependencies {
testImplementation project(':client:rest-high-level')
}

for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
String baseName = "v${bwcVersion}"
String bwcVersionStr = "${bwcVersion}"

testClusters {
"${baseName}-local" {
numberOfNodes = 2
versions = [project.version]
setting 'cluster.remote.node.attr', 'gateway'
setting 'xpack.security.enabled', 'false'
}
"${baseName}-remote" {
numberOfNodes = 2
versions = [bwcVersionStr]
firstNode.setting 'node.attr.gateway', 'true'
lastNode.setting 'node.attr.gateway', 'true'
setting 'xpack.security.enabled', 'false'
}
}

tasks.withType(StandaloneRestIntegTestTask).matching { it.name.startsWith("${baseName}#") }.configureEach {
useCluster testClusters."${baseName}-local"
useCluster testClusters."${baseName}-remote"
systemProperty 'tests.upgrade_from_version', bwcVersionStr.replace('-SNAPSHOT', '')

doFirst {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}-local".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.rest.remote_cluster', "${-> testClusters."${baseName}-remote".allHttpSocketURI.join(",")}")
}
}

tasks.register("${baseName}#testBWCEmulation", StandaloneRestIntegTestTask) {
dependsOn "processTestResources"
mustRunAfter("precommit")
}

tasks.register(bwcTaskName(bwcVersion)) {
dependsOn tasks.named("${baseName}#testBWCEmulation")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.upgrades;

import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.rest.AbstractCCSRestTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.hasSize;

/**
* This test ensure that we emulate the "fields" option when the local cluster supports it but the remote
* cluster is running an older compatible version.
*/
public class CCSFieldsOptionEmulationIT extends AbstractCCSRestTestCase {

private static final Logger LOGGER = LogManager.getLogger(CCSFieldsOptionEmulationIT.class);
private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
private static final String CLUSTER_ALIAS = "remote_cluster";

static int indexDocs(RestHighLevelClient client, String index, int numDocs, boolean expectWarnings) throws IOException {
for (int i = 0; i < numDocs; i++) {
Request indexDoc = new Request("PUT", index + "/type/" + i);
indexDoc.setJsonEntity("{\"field\": \"f" + i + "\", \"array\": [1, 2, 3] , \"obj\": { \"innerObj\" : \"foo\" } }");
if (expectWarnings) {
indexDoc.setOptions(expectWarnings(RestIndexAction.TYPES_DEPRECATION_MESSAGE));
}
client.getLowLevelClient().performRequest(indexDoc);
}
client.indices().refresh(new RefreshRequest(index), RequestOptions.DEFAULT);
return numDocs;
}

static RestHighLevelClient newLocalClient(Logger logger) {
final List<HttpHost> hosts = parseHosts("tests.rest.cluster");
final int index = random().nextInt(hosts.size());
logger.info("Using client node {}", index);
return new RestHighLevelClient(RestClient.builder(hosts.get(index)));
}

static RestHighLevelClient newRemoteClient() {
return new RestHighLevelClient(RestClient.builder(randomFrom(parseHosts("tests.rest.remote_cluster"))));
}

public void testFieldsOptionEmulation() throws Exception {
String localIndex = "test_bwc_fields_index";
String remoteIndex = "test_bwc_fields_remote_index";
try (RestHighLevelClient localClient = newLocalClient(LOGGER);
RestHighLevelClient remoteClient = newRemoteClient()) {
localClient.indices().create(new CreateIndexRequest(localIndex)
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))),
RequestOptions.DEFAULT);
int localNumDocs = indexDocs(localClient, localIndex, between(10, 20), true);

Builder remoteIndexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5));
remoteClient.indices().create(new CreateIndexRequest(remoteIndex)
.settings(remoteIndexSettings),
RequestOptions.DEFAULT);
boolean expectRemoteIndexWarnings = UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_0_0);
int remoteNumDocs = indexDocs(remoteClient, remoteIndex, between(10, 20), expectRemoteIndexWarnings);
int expectedHitCount = localNumDocs + remoteNumDocs;

List<Node> remoteNodes = getNodes(remoteClient.getLowLevelClient());
assertThat(remoteNodes, hasSize(2));
configureRemoteClusters(getNodes(remoteClient.getLowLevelClient()), CLUSTER_ALIAS, UPGRADE_FROM_VERSION, LOGGER);
RestClient lowLevelClient = localClient.getLowLevelClient();
for (String minimizeRoundTrips : new String[] { "true", "false" }) {
Request request = new Request("POST", "/_search");
request.addParameter("index", localIndex + "," + CLUSTER_ALIAS + ":" + remoteIndex);
request.addParameter("ccs_minimize_roundtrips", minimizeRoundTrips);
request.addParameter("enable_fields_emulation", "true");
request.setJsonEntity("{\"_source\": false, \"fields\": [\"*\"] , \"size\": " + expectedHitCount + "}");
Response response = lowLevelClient.performRequest(request);
try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.getEntity().getContent()
)
) {
SearchResponse searchResponse = SearchResponse.fromXContent(parser);
ElasticsearchAssertions.assertNoFailures(searchResponse);
ElasticsearchAssertions.assertHitCount(searchResponse, expectedHitCount);
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
assertFalse("No source in hit expected but was: " + hit.toString(), hit.hasSource());
Map<String, DocumentField> fields = hit.getFields();
assertNotNull(fields);
assertNotNull("Field `field` not found, hit was: " + hit.toString(), fields.get("field"));
if (hit.getIndex().equals(localIndex) || UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_10_0)) {
assertNotNull("Field `field.keyword` not found, hit was: " + hit.toString(), fields.get("field.keyword"));
} else {
// we won't be able to get multi-field for remote indices below V7.10
assertNull(
"Field `field.keyword` should not be returned, hit was: " + hit.toString(),
fields.get("field.keyword")
);
}
DocumentField arrayField = fields.get("array");
assertNotNull("Field `array` not found, hit was: " + hit.toString(), arrayField);
assertEquals(3, ((List<?>) arrayField.getValues()).size());
assertNull("Object fields should be flattened by the fields API", fields.get("obj"));
assertEquals(1, fields.get("obj.innerObj").getValues().size());
assertEquals("foo", fields.get("obj.innerObj").getValue());
}
}
}
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);
}
}
}
Loading

0 comments on commit 5a83dcd

Please sign in to comment.