Skip to content

Commit

Permalink
Add support for rest compatibility headers to the HLRC (elastic#78490)
Browse files Browse the repository at this point in the history
This adds support for the headers necessary for REST version compatibility to the High Level Rest
Client (HLRC).

Compatibility mode can be turned on either with the .setAPICompatibilityMode(true) when creating
the client, or by setting the ELASTIC_CLIENT_APIVERSIONING to true similar to our other
Elasticsearch clients.

Resolves elastic#77859
  • Loading branch information
dakrone committed Oct 7, 2021
1 parent 2dc4ee3 commit 1287d0c
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.client;

import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -67,17 +68,17 @@
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.rankeval.RankEvalResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
Expand Down Expand Up @@ -130,18 +131,18 @@
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler;
import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongRareTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongRareTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringRareTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringRareTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
Expand Down Expand Up @@ -253,11 +254,16 @@
public class RestHighLevelClient implements Closeable {

private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class);
/**
* Environment variable determining whether to send the 7.x compatibility header
*/
public static final String API_VERSIONING_ENV_VARIABLE = "ELASTIC_CLIENT_APIVERSIONING";

// To be called using performClientRequest and performClientRequestAsync to ensure version compatibility check
private final RestClient client;
private final NamedXContentRegistry registry;
private final CheckedConsumer<RestClient, IOException> doClose;
private final boolean useAPICompatibility;

/** Do not access directly but through getVersionValidationFuture() */
private volatile ListenableFuture<Optional<String>> versionValidationFuture;
Expand Down Expand Up @@ -310,11 +316,28 @@ protected RestHighLevelClient(RestClientBuilder restClientBuilder, List<NamedXCo
*/
protected RestHighLevelClient(RestClient restClient, CheckedConsumer<RestClient, IOException> doClose,
List<NamedXContentRegistry.Entry> namedXContentEntries) {
this(restClient, doClose, namedXContentEntries, null);
}

/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClient} that it should use to perform requests and
* a list of entries that allow to parse custom response sections added to Elasticsearch through plugins.
* This constructor can be called by subclasses in case an externally created low-level REST client needs to be provided.
* The consumer argument allows to control what needs to be done when the {@link #close()} method is called.
* Also subclasses can provide parsers for custom response sections added to Elasticsearch through plugins.
*/
protected RestHighLevelClient(RestClient restClient, CheckedConsumer<RestClient, IOException> doClose,
List<NamedXContentRegistry.Entry> namedXContentEntries, Boolean useAPICompatibility) {
this.client = Objects.requireNonNull(restClient, "restClient must not be null");
this.doClose = Objects.requireNonNull(doClose, "doClose consumer must not be null");
this.registry = new NamedXContentRegistry(
Stream.of(getDefaultNamedXContents().stream(), getProvidedNamedXContents().stream(), namedXContentEntries.stream())
.flatMap(Function.identity()).collect(toList()));
Stream.of(getDefaultNamedXContents().stream(), getProvidedNamedXContents().stream(), namedXContentEntries.stream())
.flatMap(Function.identity()).collect(toList()));
if (useAPICompatibility == null && "true".equals(System.getenv(API_VERSIONING_ENV_VARIABLE))) {
this.useAPICompatibility = true;
} else {
this.useAPICompatibility = Boolean.TRUE.equals(useAPICompatibility);
}
}

/**
Expand Down Expand Up @@ -2016,7 +2039,82 @@ protected static boolean convertExistsResponse(Response response) {
return response.getStatusLine().getStatusCode() == 200;
}

private enum EntityType {
JSON() {
@Override
public String header() {
return "application/json";
}
@Override
public String compatibleHeader() {
return "application/vnd.elasticsearch+json; compatible-with=7";
}
},
NDJSON() {
@Override
public String header() {
return "application/x-ndjson";
}
@Override
public String compatibleHeader() {
return "application/vnd.elasticsearch+x-ndjson; compatible-with=7";
}
},
STAR() {
@Override
public String header() {
return "application/*";
}
@Override
public String compatibleHeader() {
return "application/vnd.elasticsearch+json; compatible-with=7";
}
},
YAML() {
@Override
public String header() {
return "application/yaml";
}
@Override
public String compatibleHeader() {
return "application/vnd.elasticsearch+yaml; compatible-with=7";
}
},
SMILE() {
@Override
public String header() {
return "application/smile";
}
@Override
public String compatibleHeader() {
return "application/vnd.elasticsearch+smile; compatible-with=7";
}
},
CBOR() {
@Override
public String header() {
return "application/cbor";
}
@Override
public String compatibleHeader() {
return "application/vnd.elasticsearch+cbor; compatible-with=7";
}
};

public abstract String header();
public abstract String compatibleHeader();

@Override
public String toString() {
return header();
}
}

private Cancellable performClientRequestAsync(Request request, ResponseListener listener) {
// Add compatibility request headers if compatibility mode has been enabled
if (this.useAPICompatibility) {
modifyRequestForCompatibility(request);
}

ListenableFuture<Optional<String>> versionCheck = getVersionValidationFuture();

Expand Down Expand Up @@ -2068,7 +2166,71 @@ public void onFailure(Exception e) {
return result;
};


/**
* Go through all the request's existing headers, looking for {@code headerName} headers and if they exist,
* changing them to use version compatibility. If no request headers are changed, modify the entity type header if appropriate
*/
boolean addCompatibilityFor(RequestOptions.Builder newOptions, Header entityHeader, String headerName) {
// Modify any existing "Content-Type" headers on the request to use the version compatibility, if available
boolean contentTypeModified = false;
for (Header header : new ArrayList<>(newOptions.getHeaders())) {
if (headerName.equalsIgnoreCase(header.getName()) == false) {
continue;
}
contentTypeModified = contentTypeModified || modifyHeader(newOptions, header, headerName);
}

// If there were no request-specific headers, modify the request entity's header to be compatible
if (entityHeader != null && contentTypeModified == false) {
contentTypeModified = modifyHeader(newOptions, entityHeader, headerName);
}

return contentTypeModified;
}

/**
* Modify the given header to be version compatible, if necessary.
* Returns true if a modification was made, false otherwise.
*/
boolean modifyHeader(RequestOptions.Builder newOptions, Header header, String headerName) {
for (EntityType type : EntityType.values()) {
final String headerValue = header.getValue();
if (headerValue.startsWith(type.header())) {
String newHeaderValue = headerValue.replace(type.header(), type.compatibleHeader());
newOptions.removeHeader(header.getName());
newOptions.addHeader(headerName, newHeaderValue);
return true;
}
}
return false;
}

/**
* Make all necessary changes to support API compatibility for the given request. This includes
* modifying the "Content-Type" and "Accept" headers if present, or modifying the header based
* on the request's entity type.
*/
void modifyRequestForCompatibility(Request request) {
final Header entityHeader = request.getEntity() == null ? null : request.getEntity().getContentType();
final RequestOptions.Builder newOptions = request.getOptions().toBuilder();

addCompatibilityFor(newOptions, entityHeader, "Content-Type");
if (request.getOptions().containsHeader("Accept")) {
addCompatibilityFor(newOptions, entityHeader, "Accept");
} else {
// There is no entity, and no existing accept header, but we still need one
// with compatibility, so use the compatible JSON (default output) format
newOptions.addHeader("Accept", EntityType.JSON.compatibleHeader());
}
request.setOptions(newOptions);
}

private Response performClientRequest(Request request) throws IOException {
// Add compatibility request headers if compatibility mode has been enabled
if (this.useAPICompatibility) {
modifyRequestForCompatibility(request);
}

Optional<String> versionValidation;
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.client;

import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.CheckedConsumer;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Helper to build a {@link RestHighLevelClient}, allowing setting the low-level client that
* should be used as well as whether API compatibility should be used.
*/

public class RestHighLevelClientBuilder {
private final RestClient restClient;
private CheckedConsumer<RestClient, IOException> closeHandler = RestClient::close;
private List<NamedXContentRegistry.Entry> namedXContentEntries = Collections.emptyList();
private Boolean apiCompatibilityMode = null;

public RestHighLevelClientBuilder(RestClient restClient) {
this.restClient = restClient;
}

public RestHighLevelClientBuilder closeHandler(CheckedConsumer<RestClient, IOException> closeHandler) {
this.closeHandler = closeHandler;
return this;
}

public RestHighLevelClientBuilder namedXContentEntries(List<NamedXContentRegistry.Entry> namedXContentEntries) {
this.namedXContentEntries = namedXContentEntries;
return this;
}

public RestHighLevelClientBuilder setApiCompatibilityMode(Boolean enabled) {
this.apiCompatibilityMode = enabled;
return this;
}

public RestHighLevelClient build() {
return new RestHighLevelClient(this.restClient, this.closeHandler, this.namedXContentEntries, this.apiCompatibilityMode);
}
}
Loading

0 comments on commit 1287d0c

Please sign in to comment.