diff --git a/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java b/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java index 4d57f12742e03..e133776affb9c 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java +++ b/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java @@ -39,16 +39,6 @@ public ResponseException(Response response) throws IOException { this.response = response; } - /** - * Wrap a {@linkplain ResponseException} with another one with the current - * stack trace. This is used during synchronous calls so that the caller - * ends up in the stack trace of the exception thrown. - */ - ResponseException(ResponseException e) throws IOException { - super(e.getMessage(), e); - this.response = e.getResponse(); - } - static String buildMessage(Response response) throws IOException { String message = String.format(Locale.ROOT, "method [%s], host [%s], URI [%s], status line [%s]", diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 175d524f02af5..f0478742ecc82 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -70,11 +70,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; @@ -103,7 +100,6 @@ public class RestClient implements Closeable { // We don't rely on default headers supported by HttpAsyncClient as those cannot be replaced. // These are package private for tests. final List
defaultHeaders; - private final long maxRetryTimeoutMillis; private final String pathPrefix; private final AtomicInteger lastNodeIndex = new AtomicInteger(0); private final ConcurrentMap blacklist = new ConcurrentHashMap<>(); @@ -112,10 +108,9 @@ public class RestClient implements Closeable { private volatile NodeTuple> nodeTuple; private final WarningsHandler warningsHandler; - RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, List nodes, String pathPrefix, + RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) { this.client = client; - this.maxRetryTimeoutMillis = maxRetryTimeoutMillis; this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); this.failureListener = failureListener; this.pathPrefix = pathPrefix; @@ -213,9 +208,64 @@ public List getNodes() { * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error */ public Response performRequest(Request request) throws IOException { - SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis); - performRequestAsyncNoCatch(request, listener); - return listener.get(); + InternalRequest internalRequest = new InternalRequest(request); + return performRequest(nextNodes(), internalRequest, null); + } + + private Response performRequest(final NodeTuple> nodeTuple, + final InternalRequest request, + Exception previousException) throws IOException { + RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); + HttpResponse httpResponse; + try { + httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); + } catch(Exception e) { + RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); + onFailure(context.node); + Exception cause = extractAndWrapCause(e); + addSuppressedException(previousException, cause); + if (nodeTuple.nodes.hasNext()) { + return performRequest(nodeTuple, request, cause); + } + if (cause instanceof IOException) { + throw (IOException) cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause); + } + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); + if (responseOrResponseException.responseException == null) { + return responseOrResponseException.response; + } + addSuppressedException(previousException, responseOrResponseException.responseException); + if (nodeTuple.nodes.hasNext()) { + return performRequest(nodeTuple, request, responseOrResponseException.responseException); + } + throw responseOrResponseException.responseException; + } + + private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { + RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); + int statusCode = httpResponse.getStatusLine().getStatusCode(); + Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse); + if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { + onResponse(node); + if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) { + throw new WarningFailureException(response); + } + return new ResponseOrResponseException(response); + } + ResponseException responseException = new ResponseException(response); + if (isRetryStatus(statusCode)) { + //mark host dead and retry against next one + onFailure(node); + return new ResponseOrResponseException(responseException); + } + //mark host alive and don't retry, as the error should be a request problem + onResponse(node); + throw responseException; } /** @@ -236,85 +286,31 @@ public Response performRequest(Request request) throws IOException { */ public void performRequestAsync(Request request, ResponseListener responseListener) { try { - performRequestAsyncNoCatch(request, responseListener); + FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); + InternalRequest internalRequest = new InternalRequest(request); + performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener); } catch (Exception e) { responseListener.onFailure(e); } } - void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException { - Map requestParams = new HashMap<>(request.getParameters()); - //ignore is a special parameter supported by the clients, shouldn't be sent to es - String ignoreString = requestParams.remove("ignore"); - Set ignoreErrorCodes; - if (ignoreString == null) { - if (HttpHead.METHOD_NAME.equals(request.getMethod())) { - //404 never causes error if returned for a HEAD request - ignoreErrorCodes = Collections.singleton(404); - } else { - ignoreErrorCodes = Collections.emptySet(); - } - } else { - String[] ignoresArray = ignoreString.split(","); - ignoreErrorCodes = new HashSet<>(); - if (HttpHead.METHOD_NAME.equals(request.getMethod())) { - //404 never causes error if returned for a HEAD request - ignoreErrorCodes.add(404); - } - for (String ignoreCode : ignoresArray) { - try { - ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); - } - } - } - URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams); - HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); - setHeaders(httpRequest, request.getOptions().getHeaders()); - FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener); - long startTime = System.nanoTime(); - performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes, - request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(), - request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener); - } - - private void performRequestAsync(final long startTime, final NodeTuple> nodeTuple, final HttpRequestBase request, - final Set ignoreErrorCodes, - final WarningsHandler thisWarningsHandler, - final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, + private void performRequestAsync(final NodeTuple> nodeTuple, + final InternalRequest request, final FailureTrackingResponseListener listener) { - final Node node = nodeTuple.nodes.next(); - //we stream the request body if the entity allows for it - final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request); - final HttpAsyncResponseConsumer asyncResponseConsumer = - httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer(); - final HttpClientContext context = HttpClientContext.create(); - context.setAuthCache(nodeTuple.authCache); - client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback() { + final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); + client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback() { @Override public void completed(HttpResponse httpResponse) { try { - RequestLogger.logResponse(logger, request, node.getHost(), httpResponse); - int statusCode = httpResponse.getStatusLine().getStatusCode(); - Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse); - if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { - onResponse(node); - if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) { - listener.onDefinitiveFailure(new WarningFailureException(response)); - } else { - listener.onSuccess(response); - } + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); + if (responseOrResponseException.responseException == null) { + listener.onSuccess(responseOrResponseException.response); } else { - ResponseException responseException = new ResponseException(response); - if (isRetryStatus(statusCode)) { - //mark host dead and retry against next one - onFailure(node); - retryIfPossible(responseException); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(responseOrResponseException.responseException); + performRequestAsync(nodeTuple, request, listener); } else { - //mark host alive and don't retry, as the error should be a request problem - onResponse(node); - listener.onDefinitiveFailure(responseException); + listener.onDefinitiveFailure(responseOrResponseException.responseException); } } } catch(Exception e) { @@ -325,31 +321,16 @@ public void completed(HttpResponse httpResponse) { @Override public void failed(Exception failure) { try { - RequestLogger.logFailedRequest(logger, request, node, failure); - onFailure(node); - retryIfPossible(failure); - } catch(Exception e) { - listener.onDefinitiveFailure(e); - } - } - - private void retryIfPossible(Exception exception) { - if (nodeTuple.nodes.hasNext()) { - //in case we are retrying, check whether maxRetryTimeout has been reached - long timeElapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); - long timeout = maxRetryTimeoutMillis - timeElapsedMillis; - if (timeout <= 0) { - IOException retryTimeoutException = new IOException( - "request retries exceeded max retry timeout [" + maxRetryTimeoutMillis + "]", exception); - listener.onDefinitiveFailure(retryTimeoutException); + RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); + onFailure(context.node); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(failure); + performRequestAsync(nodeTuple, request, listener); } else { - listener.trackFailure(exception); - request.reset(); - performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes, - thisWarningsHandler, httpAsyncResponseConsumerFactory, listener); + listener.onDefinitiveFailure(failure); } - } else { - listener.onDefinitiveFailure(exception); + } catch(Exception e) { + listener.onDefinitiveFailure(e); } } @@ -360,20 +341,6 @@ public void cancelled() { }); } - private void setHeaders(HttpRequest httpRequest, Collection
requestHeaders) { - // request headers override default headers, so we don't add default headers if they exist as request headers - final Set requestNames = new HashSet<>(requestHeaders.size()); - for (Header requestHeader : requestHeaders) { - httpRequest.addHeader(requestHeader); - requestNames.add(requestHeader.getName()); - } - for (Header defaultHeader : defaultHeaders) { - if (requestNames.contains(defaultHeader.getName()) == false) { - httpRequest.addHeader(defaultHeader); - } - } - } - /** * Returns a non-empty {@link Iterator} of nodes to be used for a request * that match the {@link NodeSelector}. @@ -383,7 +350,7 @@ private void setHeaders(HttpRequest httpRequest, Collection
requestHeade * that is closest to being revived. * @throws IOException if no nodes are available */ - private NodeTuple> nextNode() throws IOException { + private NodeTuple> nextNodes() throws IOException { NodeTuple> nodeTuple = this.nodeTuple; Iterable hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector); return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache); @@ -517,11 +484,10 @@ private static boolean isRetryStatus(int statusCode) { return false; } - private static Exception addSuppressedException(Exception suppressedException, Exception currentException) { + private static void addSuppressedException(Exception suppressedException, Exception currentException) { if (suppressedException != null) { currentException.addSuppressed(suppressedException); } - return currentException; } private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) { @@ -618,118 +584,8 @@ void onDefinitiveFailure(Exception exception) { * Tracks an exception, which caused a retry hence we should not return yet to the caller */ void trackFailure(Exception exception) { - this.exception = addSuppressedException(this.exception, exception); - } - } - - /** - * Listener used in any sync performRequest calls, it waits for a response or an exception back up to a timeout - */ - static class SyncResponseListener implements ResponseListener { - private final CountDownLatch latch = new CountDownLatch(1); - private final AtomicReference response = new AtomicReference<>(); - private final AtomicReference exception = new AtomicReference<>(); - - private final long timeout; - - SyncResponseListener(long timeout) { - assert timeout > 0; - this.timeout = timeout; - } - - @Override - public void onSuccess(Response response) { - Objects.requireNonNull(response, "response must not be null"); - boolean wasResponseNull = this.response.compareAndSet(null, response); - if (wasResponseNull == false) { - throw new IllegalStateException("response is already set"); - } - - latch.countDown(); - } - - @Override - public void onFailure(Exception exception) { - Objects.requireNonNull(exception, "exception must not be null"); - boolean wasExceptionNull = this.exception.compareAndSet(null, exception); - if (wasExceptionNull == false) { - throw new IllegalStateException("exception is already set"); - } - latch.countDown(); - } - - /** - * Waits (up to a timeout) for some result of the request: either a response, or an exception. - */ - Response get() throws IOException { - try { - //providing timeout is just a safety measure to prevent everlasting waits - //the different client timeouts should already do their jobs - if (latch.await(timeout, TimeUnit.MILLISECONDS) == false) { - throw new IOException("listener timeout after waiting for [" + timeout + "] ms"); - } - } catch (InterruptedException e) { - throw new RuntimeException("thread waiting for the response was interrupted", e); - } - - Exception exception = this.exception.get(); - Response response = this.response.get(); - if (exception != null) { - if (response != null) { - IllegalStateException e = new IllegalStateException("response and exception are unexpectedly set at the same time"); - e.addSuppressed(exception); - throw e; - } - /* - * Wrap and rethrow whatever exception we received, copying the type - * where possible so the synchronous API looks as much as possible - * like the asynchronous API. We wrap the exception so that the caller's - * signature shows up in any exception we throw. - */ - if (exception instanceof WarningFailureException) { - throw new WarningFailureException((WarningFailureException) exception); - } - if (exception instanceof ResponseException) { - throw new ResponseException((ResponseException) exception); - } - if (exception instanceof ConnectTimeoutException) { - ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof SocketTimeoutException) { - SocketTimeoutException e = new SocketTimeoutException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof ConnectionClosedException) { - ConnectionClosedException e = new ConnectionClosedException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof SSLHandshakeException) { - SSLHandshakeException e = new SSLHandshakeException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof ConnectException) { - ConnectException e = new ConnectException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof IOException) { - throw new IOException(exception.getMessage(), exception); - } - if (exception instanceof RuntimeException){ - throw new RuntimeException(exception.getMessage(), exception); - } - throw new RuntimeException("error while performing request", exception); - } - - if (response == null) { - throw new IllegalStateException("response not set and no exception caught either"); - } - return response; + addSuppressedException(this.exception, exception); + this.exception = exception; } } @@ -808,4 +664,153 @@ public void remove() { itr.remove(); } } + + private class InternalRequest { + private final Request request; + private final Map params; + private final Set ignoreErrorCodes; + private final HttpRequestBase httpRequest; + private final WarningsHandler warningsHandler; + + InternalRequest(Request request) { + this.request = request; + this.params = new HashMap<>(request.getParameters()); + //ignore is a special parameter supported by the clients, shouldn't be sent to es + String ignoreString = params.remove("ignore"); + this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod()); + URI uri = buildUri(pathPrefix, request.getEndpoint(), params); + this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); + setHeaders(httpRequest, request.getOptions().getHeaders()); + this.warningsHandler = request.getOptions().getWarningsHandler() == null ? + RestClient.this.warningsHandler : request.getOptions().getWarningsHandler(); + } + + private void setHeaders(HttpRequest httpRequest, Collection
requestHeaders) { + // request headers override default headers, so we don't add default headers if they exist as request headers + final Set requestNames = new HashSet<>(requestHeaders.size()); + for (Header requestHeader : requestHeaders) { + httpRequest.addHeader(requestHeader); + requestNames.add(requestHeader.getName()); + } + for (Header defaultHeader : defaultHeaders) { + if (requestNames.contains(defaultHeader.getName()) == false) { + httpRequest.addHeader(defaultHeader); + } + } + } + + RequestContext createContextForNextAttempt(Node node, AuthCache authCache) { + this.httpRequest.reset(); + return new RequestContext(this, node, authCache); + } + } + + private static class RequestContext { + private final Node node; + private final HttpAsyncRequestProducer requestProducer; + private final HttpAsyncResponseConsumer asyncResponseConsumer; + private final HttpClientContext context; + + RequestContext(InternalRequest request, Node node, AuthCache authCache) { + this.node = node; + //we stream the request body if the entity allows for it + this.requestProducer = HttpAsyncMethods.create(node.getHost(), request.httpRequest); + this.asyncResponseConsumer = + request.request.getOptions().getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer(); + this.context = HttpClientContext.create(); + context.setAuthCache(authCache); + } + } + + private static Set getIgnoreErrorCodes(String ignoreString, String requestMethod) { + Set ignoreErrorCodes; + if (ignoreString == null) { + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + //404 never causes error if returned for a HEAD request + ignoreErrorCodes = Collections.singleton(404); + } else { + ignoreErrorCodes = Collections.emptySet(); + } + } else { + String[] ignoresArray = ignoreString.split(","); + ignoreErrorCodes = new HashSet<>(); + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + //404 never causes error if returned for a HEAD request + ignoreErrorCodes.add(404); + } + for (String ignoreCode : ignoresArray) { + try { + ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); + } + } + } + return ignoreErrorCodes; + } + + private static class ResponseOrResponseException { + private final Response response; + private final ResponseException responseException; + + ResponseOrResponseException(Response response) { + this.response = Objects.requireNonNull(response); + this.responseException = null; + } + + ResponseOrResponseException(ResponseException responseException) { + this.responseException = Objects.requireNonNull(responseException); + this.response = null; + } + } + + /** + * Wrap the exception so the caller's signature shows up in the stack trace, taking care to copy the original type and message + * where possible so async and sync code don't have to check different exceptions. + */ + private static Exception extractAndWrapCause(Exception exception) { + if (exception instanceof InterruptedException) { + throw new RuntimeException("thread waiting for the response was interrupted", exception); + } + if (exception instanceof ExecutionException) { + ExecutionException executionException = (ExecutionException)exception; + Throwable t = executionException.getCause() == null ? executionException : executionException.getCause(); + if (t instanceof Error) { + throw (Error)t; + } + exception = (Exception)t; + } + if (exception instanceof ConnectTimeoutException) { + ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof SocketTimeoutException) { + SocketTimeoutException e = new SocketTimeoutException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof ConnectionClosedException) { + ConnectionClosedException e = new ConnectionClosedException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof SSLHandshakeException) { + SSLHandshakeException e = new SSLHandshakeException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof ConnectException) { + ConnectException e = new ConnectException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof IOException) { + return new IOException(exception.getMessage(), exception); + } + if (exception instanceof RuntimeException){ + return new RuntimeException(exception.getMessage(), exception); + } + return new RuntimeException("error while performing request", exception); + } } diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java index 84cc3ee1667b1..2337cbf1fd029 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java @@ -42,14 +42,12 @@ public final class RestClientBuilder { public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000; - public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS; public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; public static final int DEFAULT_MAX_CONN_TOTAL = 30; private static final Header[] EMPTY_HEADERS = new Header[0]; private final List nodes; - private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT_MILLIS; private Header[] defaultHeaders = EMPTY_HEADERS; private RestClient.FailureListener failureListener; private HttpClientConfigCallback httpClientConfigCallback; @@ -102,20 +100,6 @@ public RestClientBuilder setFailureListener(RestClient.FailureListener failureLi return this; } - /** - * Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request. - * {@link #DEFAULT_MAX_RETRY_TIMEOUT_MILLIS} if not specified. - * - * @throws IllegalArgumentException if {@code maxRetryTimeoutMillis} is not greater than 0 - */ - public RestClientBuilder setMaxRetryTimeoutMillis(int maxRetryTimeoutMillis) { - if (maxRetryTimeoutMillis <= 0) { - throw new IllegalArgumentException("maxRetryTimeoutMillis must be greater than 0"); - } - this.maxRetryTimeout = maxRetryTimeoutMillis; - return this; - } - /** * Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration * @@ -208,7 +192,7 @@ public CloseableHttpAsyncClient run() { return createHttpClient(); } }); - RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, + RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode); httpClient.start(); return restClient; diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java index 834748d65de34..1e16d94076a05 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java @@ -82,14 +82,6 @@ public void testBuild() throws IOException { assertNotNull(restClient); } - try { - RestClient.builder(new HttpHost("localhost", 9200)) - .setMaxRetryTimeoutMillis(randomIntBetween(Integer.MIN_VALUE, 0)); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals("maxRetryTimeoutMillis must be greater than 0", e.getMessage()); - } - try { RestClient.builder(new HttpHost("localhost", 9200)).setDefaultHeaders(null); fail("should have failed"); @@ -156,12 +148,9 @@ public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder reques builder.setDefaultHeaders(headers); } if (randomBoolean()) { - builder.setMaxRetryTimeoutMillis(randomIntBetween(1, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiOfLengthBetween(2, 5); + String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiLettersOfLengthBetween(2, 5); while (pathPrefix.length() < 20 && randomBoolean()) { - pathPrefix += "/" + randomAsciiOfLengthBetween(3, 6); + pathPrefix += "/" + randomAsciiLettersOfLengthBetween(3, 6); } builder.setPathPrefix(pathPrefix + (randomBoolean() ? "/" : "")); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java index 272859e8441e3..4cc16c45bab2f 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java @@ -199,7 +199,7 @@ public void onFailure(Exception exception) { * Test host selector against a real server and * test what happens after calling */ - public void testNodeSelector() throws IOException { + public void testNodeSelector() throws Exception { try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) { Request request = new Request("GET", "/200"); int rounds = between(1, 10); @@ -210,7 +210,7 @@ public void testNodeSelector() throws IOException { */ if (stoppedFirstHost) { try { - restClient.performRequest(request); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); fail("expected to fail to connect"); } catch (ConnectException e) { // Windows isn't consistent here. Sometimes the message is even null! @@ -219,7 +219,7 @@ public void testNodeSelector() throws IOException { } } } else { - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(httpHosts[0], response.getHost()); } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index 7dd1c4d842bff..f3df9bf3bfd37 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -22,25 +22,10 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.http.Header; import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.concurrent.FutureCallback; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.message.BasicHttpResponse; -import org.apache.http.message.BasicStatusLine; -import org.apache.http.nio.protocol.HttpAsyncRequestProducer; -import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.junit.After; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -49,7 +34,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode; import static org.elasticsearch.client.RestClientTestUtil.randomErrorRetryStatusCode; @@ -61,9 +45,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for {@link RestClient} behaviour against multiple hosts: fail-over, blacklisting etc. @@ -75,39 +56,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { private List nodes; private HostsTrackingFailureListener failureListener; - @SuppressWarnings("unchecked") public RestClient createRestClient(NodeSelector nodeSelector) { - CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class); - when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), - any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { - @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { - HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; - final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); - final HttpHost httpHost = requestProducer.getTarget(); - HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; - assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class)); - final FutureCallback futureCallback = (FutureCallback) invocationOnMock.getArguments()[3]; - //return the desired status code or exception depending on the path - exec.execute(new Runnable() { - @Override - public void run() { - if (request.getURI().getPath().equals("/soe")) { - futureCallback.failed(new SocketTimeoutException(httpHost.toString())); - } else if (request.getURI().getPath().equals("/coe")) { - futureCallback.failed(new ConnectTimeoutException(httpHost.toString())); - } else if (request.getURI().getPath().equals("/ioe")) { - futureCallback.failed(new IOException(httpHost.toString())); - } else { - int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); - StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); - futureCallback.completed(new BasicHttpResponse(statusLine)); - } - } - }); - return null; - } - }); + CloseableHttpAsyncClient httpClient = RestClientSingleHostTests.mockHttpClient(exec); int numNodes = RandomNumbers.randomIntBetween(getRandom(), 2, 5); nodes = new ArrayList<>(numNodes); for (int i = 0; i < numNodes; i++) { @@ -115,7 +65,7 @@ public void run() { } nodes = Collections.unmodifiableList(nodes); failureListener = new HostsTrackingFailureListener(); - return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector, false); + return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false); } /** @@ -126,14 +76,15 @@ public void shutdownExec() { exec.shutdown(); } - public void testRoundRobinOkStatusCodes() throws IOException { + public void testRoundRobinOkStatusCodes() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5); for (int i = 0; i < numIters; i++) { Set hostsSet = hostsSet(); for (int j = 0; j < nodes.size(); j++) { int statusCode = randomOkStatusCode(getRandom()); - Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode)); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), "/" + statusCode)); assertEquals(statusCode, response.getStatusLine().getStatusCode()); assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost())); } @@ -142,7 +93,7 @@ public void testRoundRobinOkStatusCodes() throws IOException { failureListener.assertNotCalled(); } - public void testRoundRobinNoRetryErrors() throws IOException { + public void testRoundRobinNoRetryErrors() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5); for (int i = 0; i < numIters; i++) { @@ -151,7 +102,8 @@ public void testRoundRobinNoRetryErrors() throws IOException { String method = randomHttpMethod(getRandom()); int statusCode = randomErrorNoRetryStatusCode(getRandom()); try { - Response response = restClient.performRequest(new Request(method, "/" + statusCode)); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(method, "/" + statusCode)); if (method.equals("HEAD") && statusCode == 404) { //no exception gets thrown although we got a 404 assertEquals(404, response.getStatusLine().getStatusCode()); @@ -175,18 +127,13 @@ public void testRoundRobinNoRetryErrors() throws IOException { failureListener.assertNotCalled(); } - public void testRoundRobinRetryErrors() throws IOException { + public void testRoundRobinRetryErrors() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); String retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (ResponseException) e.getCause(); Set hostsSet = hostsSet(); //first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each failureListener.assertCalled(nodes); @@ -206,11 +153,6 @@ public void testRoundRobinRetryErrors() throws IOException { } while(e != null); assertEquals("every host should have been used but some weren't: " + hostsSet, 0, hostsSet.size()); } catch (IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); Set hostsSet = hostsSet(); //first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each failureListener.assertCalled(nodes); @@ -236,7 +178,8 @@ public void testRoundRobinRetryErrors() throws IOException { for (int j = 0; j < nodes.size(); j++) { retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { Response response = e.getResponse(); @@ -247,11 +190,6 @@ public void testRoundRobinRetryErrors() throws IOException { failureListener.assertCalled(response.getHost()); assertEquals(0, e.getSuppressed().length); } catch (IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); HttpHost httpHost = HttpHost.create(e.getMessage()); assertTrue("host [" + httpHost + "] not found, most likely used multiple times", hostsSet.remove(httpHost)); //after the first request, all hosts are blacklisted, a single one gets resurrected each time @@ -268,7 +206,8 @@ public void testRoundRobinRetryErrors() throws IOException { int statusCode = randomErrorNoRetryStatusCode(getRandom()); Response response; try { - response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode)); + response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), "/" + statusCode)); } catch (ResponseException e) { response = e.getResponse(); } @@ -285,7 +224,8 @@ public void testRoundRobinRetryErrors() throws IOException { for (int y = 0; y < i + 1; y++) { retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { Response response = e.getResponse(); @@ -293,11 +233,6 @@ public void testRoundRobinRetryErrors() throws IOException { assertThat(response.getHost(), equalTo(selectedHost)); failureListener.assertCalled(selectedHost); } catch(IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); HttpHost httpHost = HttpHost.create(e.getMessage()); assertThat(httpHost, equalTo(selectedHost)); failureListener.assertCalled(selectedHost); @@ -307,7 +242,7 @@ public void testRoundRobinRetryErrors() throws IOException { } } - public void testNodeSelector() throws IOException { + public void testNodeSelector() throws Exception { NodeSelector firstPositionOnly = new NodeSelector() { @Override public void select(Iterable restClientNodes) { @@ -330,12 +265,12 @@ public void select(Iterable restClientNodes) { * NodeSelector overrides the round robin behavior. */ Request request = new Request("GET", "/200"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(nodes.get(0).getHost(), response.getHost()); } } - public void testSetNodes() throws IOException { + public void testSetNodes() throws Exception { RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS); List newNodes = new ArrayList<>(nodes.size()); for (int i = 0; i < nodes.size(); i++) { @@ -350,7 +285,7 @@ public void testSetNodes() throws IOException { * NodeSelector overrides the round robin behavior. */ Request request = new Request("GET", "/200"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(newNodes.get(0).getHost(), response.getHost()); } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java index fb58f18d42af0..e3fd3c311378b 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java @@ -206,7 +206,7 @@ public void onFailure(Exception exception) { * to set/add headers to the {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever headers it received. */ - public void testHeaders() throws IOException { + public void testHeaders() throws Exception { for (String method : getHttpMethods()) { final Set standardHeaders = new HashSet<>(Arrays.asList("Connection", "Host", "User-agent", "Date")); if (method.equals("HEAD") == false) { @@ -222,7 +222,7 @@ public void testHeaders() throws IOException { request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); } catch (ResponseException e) { esResponse = e.getResponse(); } @@ -246,7 +246,7 @@ public void testHeaders() throws IOException { * out of the box by {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever body it received. */ - public void testDeleteWithBody() throws IOException { + public void testDeleteWithBody() throws Exception { bodyTest("DELETE"); } @@ -255,57 +255,57 @@ public void testDeleteWithBody() throws IOException { * out of the box by {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever body it received. */ - public void testGetWithBody() throws IOException { + public void testGetWithBody() throws Exception { bodyTest("GET"); } - public void testEncodeParams() throws IOException { + public void testEncodeParams() throws Exception { { Request request = new Request("PUT", "/200"); request.addParameter("routing", "this/is/the/routing"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=this%2Fis%2Fthe%2Frouting", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "this|is|the|routing"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=this%7Cis%7Cthe%7Crouting", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "routing#1"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=routing%231", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "中文"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=%E4%B8%AD%E6%96%87", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo+bar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo+bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%2Bbar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo/bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%2Fbar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo^bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%5Ebar", response.getRequestLine().getUri()); } } @@ -313,7 +313,7 @@ public void testEncodeParams() throws IOException { /** * Verify that credentials are sent on the first request with preemptive auth enabled (default when provided with credentials). */ - public void testPreemptiveAuthEnabled() throws IOException { + public void testPreemptiveAuthEnabled() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, true)) { @@ -328,7 +328,7 @@ public void testPreemptiveAuthEnabled() throws IOException { /** * Verify that credentials are not sent on the first request with preemptive auth disabled. */ - public void testPreemptiveAuthDisabled() throws IOException { + public void testPreemptiveAuthDisabled() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, false)) { @@ -343,7 +343,7 @@ public void testPreemptiveAuthDisabled() throws IOException { /** * Verify that credentials continue to be sent even if a 401 (Unauthorized) response is received */ - public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws IOException { + public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, true)) { @@ -362,14 +362,14 @@ public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws IOException public void testUrlWithoutLeadingSlash() throws Exception { if (pathPrefix.length() == 0) { try { - restClient.performRequest(new Request("GET", "200")); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); fail("request should have failed"); } catch (ResponseException e) { assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); } } else { { - Response response = restClient.performRequest(new Request("GET", "200")); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); //a trailing slash gets automatically added if a pathPrefix is configured assertEquals(200, response.getStatusLine().getStatusCode()); } @@ -378,7 +378,7 @@ public void testUrlWithoutLeadingSlash() throws Exception { try (RestClient restClient = RestClient.builder( new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort())) .setPathPrefix(pathPrefix.substring(1)).build()) { - Response response = restClient.performRequest(new Request("GET", "200")); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); //a trailing slash gets automatically added if a pathPrefix is configured assertEquals(200, response.getStatusLine().getStatusCode()); } @@ -386,16 +386,16 @@ public void testUrlWithoutLeadingSlash() throws Exception { } } - private Response bodyTest(final String method) throws IOException { + private Response bodyTest(final String method) throws Exception { return bodyTest(restClient, method); } - private Response bodyTest(final RestClient restClient, final String method) throws IOException { + private Response bodyTest(final RestClient restClient, final String method) throws Exception { int statusCode = randomStatusCode(getRandom()); return bodyTest(restClient, method, statusCode, new Header[0]); } - private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws IOException { + private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws Exception { String requestBody = "{ \"field\": \"value\" }"; Request request = new Request(method, "/" + statusCode); request.setJsonEntity(requestBody); @@ -406,7 +406,7 @@ private Response bodyTest(RestClient restClient, String method, int statusCode, request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); } catch(ResponseException e) { esResponse = e.getResponse(); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index aaef5404f2802..625a4612f330a 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.ConnectionClosedException; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; @@ -42,7 +43,6 @@ import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.message.BasicHttpResponse; import org.apache.http.message.BasicStatusLine; @@ -55,24 +55,30 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.net.ssl.SSLHandshakeException; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.SocketTimeoutException; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; import static org.elasticsearch.client.RestClientTestUtil.getAllErrorStatusCodes; import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods; import static org.elasticsearch.client.RestClientTestUtil.getOkStatusCodes; import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode; -import static org.elasticsearch.client.SyncResponseListenerTests.assertExceptionStackContainsCallingMethod; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -105,58 +111,84 @@ public class RestClientSingleHostTests extends RestClientTestCase { private boolean strictDeprecationMode; @Before - @SuppressWarnings("unchecked") public void createRestClient() { - httpClient = mock(CloseableHttpAsyncClient.class); + httpClient = mockHttpClient(exec); + defaultHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header-default"); + node = new Node(new HttpHost("localhost", 9200)); + failureListener = new HostsTrackingFailureListener(); + strictDeprecationMode = randomBoolean(); + restClient = new RestClient(this.httpClient, defaultHeaders, + singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode); + } + + @SuppressWarnings("unchecked") + static CloseableHttpAsyncClient mockHttpClient(final ExecutorService exec) { + CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class); when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), - any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { + any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { + @Override + public Future answer(InvocationOnMock invocationOnMock) throws Throwable { + final HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; + final FutureCallback futureCallback = + (FutureCallback) invocationOnMock.getArguments()[3]; + // Call the callback asynchronous to better simulate how async http client works + return exec.submit(new Callable() { @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { - HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; - HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; - assertThat(context.getAuthCache().get(node.getHost()), instanceOf(BasicScheme.class)); - final FutureCallback futureCallback = - (FutureCallback) invocationOnMock.getArguments()[3]; - HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); - //return the desired status code or exception depending on the path - if (request.getURI().getPath().equals("/soe")) { - futureCallback.failed(new SocketTimeoutException()); - } else if (request.getURI().getPath().equals("/coe")) { - futureCallback.failed(new ConnectTimeoutException()); - } else { - int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); - StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); - - final HttpResponse httpResponse = new BasicHttpResponse(statusLine); - //return the same body that was sent - if (request instanceof HttpEntityEnclosingRequest) { - HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); - if (entity != null) { - assertTrue("the entity is not repeatable, cannot set it to the response directly", - entity.isRepeatable()); - httpResponse.setEntity(entity); - } + public HttpResponse call() throws Exception { + if (futureCallback != null) { + try { + HttpResponse httpResponse = responseOrException(requestProducer); + futureCallback.completed(httpResponse); + } catch(Exception e) { + futureCallback.failed(e); } - //return the same headers that were sent - httpResponse.setHeaders(request.getAllHeaders()); - // Call the callback asynchronous to better simulate how async http client works - exec.execute(new Runnable() { - @Override - public void run() { - futureCallback.completed(httpResponse); - } - }); + return null; } - return null; + return responseOrException(requestProducer); } }); + } + }); + return httpClient; + } - defaultHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header-default"); - node = new Node(new HttpHost("localhost", 9200)); - failureListener = new HostsTrackingFailureListener(); - strictDeprecationMode = randomBoolean(); - restClient = new RestClient(httpClient, 10000, defaultHeaders, - singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode); + private static HttpResponse responseOrException(HttpAsyncRequestProducer requestProducer) throws Exception { + final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); + final HttpHost httpHost = requestProducer.getTarget(); + //return the desired status code or exception depending on the path + switch (request.getURI().getPath()) { + case "/soe": + throw new SocketTimeoutException(httpHost.toString()); + case "/coe": + throw new ConnectTimeoutException(httpHost.toString()); + case "/ioe": + throw new IOException(httpHost.toString()); + case "/closed": + throw new ConnectionClosedException(); + case "/handshake": + throw new SSLHandshakeException(""); + case "/uri": + throw new URISyntaxException("", ""); + case "/runtime": + throw new RuntimeException(); + default: + int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); + StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); + + final HttpResponse httpResponse = new BasicHttpResponse(statusLine); + //return the same body that was sent + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); + if (entity != null) { + assertTrue("the entity is not repeatable, cannot set it to the response directly", + entity.isRepeatable()); + httpResponse.setEntity(entity); + } + } + //return the same headers that were sent + httpResponse.setHeaders(request.getAllHeaders()); + return httpResponse; + } } /** @@ -195,10 +227,10 @@ public void testInternalHttpRequest() throws Exception { /** * End to end test for ok status codes */ - public void testOkStatusCodes() throws IOException { + public void testOkStatusCodes() throws Exception { for (String method : getHttpMethods()) { for (int okStatusCode : getOkStatusCodes()) { - Response response = restClient.performRequest(new Request(method, "/" + okStatusCode)); + Response response = performRequestSyncOrAsync(restClient, new Request(method, "/" + okStatusCode)); assertThat(response.getStatusLine().getStatusCode(), equalTo(okStatusCode)); } } @@ -208,7 +240,7 @@ public void testOkStatusCodes() throws IOException { /** * End to end test for error status codes: they should cause an exception to be thrown, apart from 404 with HEAD requests */ - public void testErrorStatusCodes() throws IOException { + public void testErrorStatusCodes() throws Exception { for (String method : getHttpMethods()) { Set expectedIgnores = new HashSet<>(); String ignoreParam = ""; @@ -256,21 +288,74 @@ public void testErrorStatusCodes() throws IOException { } } - public void testIOExceptions() { + public void testPerformRequestIOExceptions() throws Exception { for (String method : getHttpMethods()) { //IOExceptions should be let bubble up try { - restClient.performRequest(new Request(method, "/coe")); + restClient.performRequest(new Request(method, "/ioe")); fail("request should have failed"); } catch(IOException e) { - assertThat(e, instanceOf(ConnectTimeoutException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/coe")); + fail("request should have failed"); + } catch(ConnectTimeoutException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); try { restClient.performRequest(new Request(method, "/soe")); fail("request should have failed"); - } catch(IOException e) { - assertThat(e, instanceOf(SocketTimeoutException.class)); + } catch(SocketTimeoutException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/closed")); + fail("request should have failed"); + } catch(ConnectionClosedException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/handshake")); + fail("request should have failed"); + } catch(SSLHandshakeException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + } + } + + public void testPerformRequestRuntimeExceptions() throws Exception { + for (String method : getHttpMethods()) { + try { + restClient.performRequest(new Request(method, "/runtime")); + fail("request should have failed"); + } catch (RuntimeException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + } + } + + public void testPerformRequestExceptions() throws Exception { + for (String method : getHttpMethods()) { + try { + restClient.performRequest(new Request(method, "/uri")); + fail("request should have failed"); + } catch (RuntimeException e) { + assertThat(e.getCause(), instanceOf(URISyntaxException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); } @@ -280,7 +365,7 @@ public void testIOExceptions() { * End to end test for request and response body. Exercises the mock http client ability to send back * whatever body it has received. */ - public void testBody() throws IOException { + public void testBody() throws Exception { String body = "{ \"field\": \"value\" }"; StringEntity entity = new StringEntity(body, ContentType.APPLICATION_JSON); for (String method : Arrays.asList("DELETE", "GET", "PATCH", "POST", "PUT")) { @@ -309,7 +394,7 @@ public void testBody() throws IOException { Request request = new Request(method, "/" + randomStatusCode(getRandom())); request.setEntity(entity); try { - restClient.performRequest(request); + performRequestSyncOrAsync(restClient, request); fail("request should have failed"); } catch(UnsupportedOperationException e) { assertThat(e.getMessage(), equalTo(method + " with body is not supported")); @@ -321,7 +406,7 @@ public void testBody() throws IOException { * End to end test for request and response headers. Exercises the mock http client ability to send back * whatever headers it has received. */ - public void testHeaders() throws IOException { + public void testHeaders() throws Exception { for (String method : getHttpMethods()) { final Header[] requestHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header"); final int statusCode = randomStatusCode(getRandom()); @@ -333,7 +418,7 @@ public void testHeaders() throws IOException { request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = performRequestSyncOrAsync(restClient, request); } catch(ResponseException e) { esResponse = e.getResponse(); } @@ -343,7 +428,7 @@ public void testHeaders() throws IOException { } } - public void testDeprecationWarnings() throws IOException { + public void testDeprecationWarnings() throws Exception { String chars = randomAsciiAlphanumOfLength(5); assertDeprecationWarnings(singletonList("poorly formatted " + chars), singletonList("poorly formatted " + chars)); assertDeprecationWarnings(singletonList(formatWarning(chars)), singletonList(chars)); @@ -397,7 +482,7 @@ public boolean warningsShouldFailRequest(List warnings) { protected abstract WarningsHandler warningsHandler(); } - private void assertDeprecationWarnings(List warningHeaderTexts, List warningBodyTexts) throws IOException { + private void assertDeprecationWarnings(List warningHeaderTexts, List warningBodyTexts) throws Exception { String method = randomFrom(getHttpMethods()); Request request = new Request(method, "/200"); RequestOptions.Builder options = request.getOptions().toBuilder(); @@ -420,7 +505,7 @@ private void assertDeprecationWarnings(List warningHeaderTexts, List warningHeaderTexts, List exceptionRef = new AtomicReference<>(); + final AtomicReference responseRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + restClient.performRequestAsync(request, new ResponseListener() { + @Override + public void onSuccess(Response response) { + responseRef.set(response); + latch.countDown(); + + } + + @Override + public void onFailure(Exception exception) { + exceptionRef.set(exception); + latch.countDown(); + } + }); + latch.await(); + if (exceptionRef.get() != null) { + throw exceptionRef.get(); + } + return responseRef.get(); + } + } + + /** + * Asserts that the provided {@linkplain Exception} contains the method + * that called this somewhere on its stack. This is + * normally the case for synchronous calls but {@link RestClient} performs + * synchronous calls by performing asynchronous calls and blocking the + * current thread until the call returns so it has to take special care + * to make sure that the caller shows up in the exception. We use this + * assertion to make sure that we don't break that "special care". + */ + private static void assertExceptionStackContainsCallingMethod(Throwable t) { + // 0 is getStackTrace + // 1 is this method + // 2 is the caller, what we want + StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2]; + for (StackTraceElement se : t.getStackTrace()) { + if (se.getClassName().equals(myMethod.getClassName()) + && se.getMethodName().equals(myMethod.getMethodName())) { + return; + } + } + StringWriter stack = new StringWriter(); + t.printStackTrace(new PrintWriter(stack)); + fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); + } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java index f3f0f0e58b98d..6b5f8bf907eea 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -57,7 +57,7 @@ public class RestClientTests extends RestClientTestCase { public void testCloseIsIdempotent() throws IOException { List nodes = singletonList(new Node(new HttpHost("localhost", 9200))); CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class); - RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null, false); + RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false); restClient.close(); verify(closeableHttpAsyncClient, times(1)).close(); restClient.close(); @@ -353,8 +353,7 @@ private static String assertSelectAllRejected( NodeTuple> nodeTuple, private static RestClient createRestClient() { List nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200))); - return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000), - new Header[] {}, nodes, null, null, null, false); + return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false); } public void testRoundRobin() throws IOException { diff --git a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java b/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java deleted file mode 100644 index 683b23a596a16..0000000000000 --- a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.http.ConnectionClosedException; -import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.RequestLine; -import org.apache.http.StatusLine; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.message.BasicHttpResponse; -import org.apache.http.message.BasicRequestLine; -import org.apache.http.message.BasicStatusLine; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.SocketTimeoutException; -import java.net.URISyntaxException; -import javax.net.ssl.SSLHandshakeException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; - -public class SyncResponseListenerTests extends RestClientTestCase { - /** - * Asserts that the provided {@linkplain Exception} contains the method - * that called this somewhere on its stack. This is - * normally the case for synchronous calls but {@link RestClient} performs - * synchronous calls by performing asynchronous calls and blocking the - * current thread until the call returns so it has to take special care - * to make sure that the caller shows up in the exception. We use this - * assertion to make sure that we don't break that "special care". - */ - static void assertExceptionStackContainsCallingMethod(Exception e) { - // 0 is getStackTrace - // 1 is this method - // 2 is the caller, what we want - StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2]; - for (StackTraceElement se : e.getStackTrace()) { - if (se.getClassName().equals(myMethod.getClassName()) - && se.getMethodName().equals(myMethod.getMethodName())) { - return; - } - } - StringWriter stack = new StringWriter(); - e.printStackTrace(new PrintWriter(stack)); - fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); - } - - public void testOnSuccessNullResponse() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - try { - syncResponseListener.onSuccess(null); - fail("onSuccess should have failed"); - } catch (NullPointerException e) { - assertEquals("response must not be null", e.getMessage()); - } - } - - public void testOnFailureNullException() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - try { - syncResponseListener.onFailure(null); - fail("onFailure should have failed"); - } catch (NullPointerException e) { - assertEquals("exception must not be null", e.getMessage()); - } - } - - public void testOnSuccess() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - Response mockResponse = mockResponse(); - syncResponseListener.onSuccess(mockResponse); - Response response = syncResponseListener.get(); - assertSame(response, mockResponse); - - try { - syncResponseListener.onSuccess(mockResponse); - fail("get should have failed"); - } catch (IllegalStateException e) { - assertEquals(e.getMessage(), "response is already set"); - } - response = syncResponseListener.get(); - assertSame(response, mockResponse); - } - - public void testOnFailure() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - RuntimeException firstException = new RuntimeException("first-test"); - syncResponseListener.onFailure(firstException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals(firstException.getMessage(), e.getMessage()); - assertSame(firstException, e.getCause()); - } - - RuntimeException secondException = new RuntimeException("second-test"); - try { - syncResponseListener.onFailure(secondException); - } catch(IllegalStateException e) { - assertEquals(e.getMessage(), "exception is already set"); - } - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals(firstException.getMessage(), e.getMessage()); - assertSame(firstException, e.getCause()); - } - - Response response = mockResponse(); - syncResponseListener.onSuccess(response); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IllegalStateException e) { - assertEquals("response and exception are unexpectedly set at the same time", e.getMessage()); - assertNotNull(e.getSuppressed()); - assertEquals(1, e.getSuppressed().length); - assertSame(firstException, e.getSuppressed()[0]); - } - } - - public void testRuntimeIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - RuntimeException runtimeException = new RuntimeException(); - syncResponseListener.onFailure(runtimeException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - // We preserve the original exception in the cause - assertSame(runtimeException, e.getCause()); - // We copy the message - assertEquals(runtimeException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - ConnectTimeoutException timeoutException = new ConnectTimeoutException(); - syncResponseListener.onFailure(timeoutException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(timeoutException, e.getCause()); - // We copy the message - assertEquals(timeoutException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - SocketTimeoutException timeoutException = new SocketTimeoutException(); - syncResponseListener.onFailure(timeoutException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(timeoutException, e.getCause()); - // We copy the message - assertEquals(timeoutException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testConnectionClosedExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - ConnectionClosedException closedException = new ConnectionClosedException(randomAsciiAlphanumOfLength(5)); - syncResponseListener.onFailure(closedException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (ConnectionClosedException e) { - // We preserve the original exception in the cause - assertSame(closedException, e.getCause()); - // We copy the message - assertEquals(closedException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testSSLHandshakeExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - SSLHandshakeException exception = new SSLHandshakeException(randomAsciiAlphanumOfLength(5)); - syncResponseListener.onFailure(exception); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (SSLHandshakeException e) { - // We preserve the original exception in the cause - assertSame(exception, e.getCause()); - // We copy the message - assertEquals(exception.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testIOExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - IOException ioException = new IOException(); - syncResponseListener.onFailure(ioException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(ioException, e.getCause()); - // We copy the message - assertEquals(ioException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); - //we just need any checked exception - URISyntaxException exception = new URISyntaxException("test", "test"); - syncResponseListener.onFailure(exception); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals("error while performing request", e.getMessage()); - // We preserve the original exception in the cause - assertSame(exception, e.getCause()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - private static Response mockResponse() { - ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1); - RequestLine requestLine = new BasicRequestLine("GET", "/", protocolVersion); - StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK"); - HttpResponse httpResponse = new BasicHttpResponse(statusLine); - return new Response(requestLine, new HttpHost("localhost", 9200), httpResponse); - } -} diff --git a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java index 7eae17d83cf2b..8653db4226fe1 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java +++ b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java @@ -111,13 +111,6 @@ public void usage() throws IOException, InterruptedException { builder.setDefaultHeaders(defaultHeaders); // <1> //end::rest-client-init-default-headers } - { - //tag::rest-client-init-max-retry-timeout - RestClientBuilder builder = RestClient.builder( - new HttpHost("localhost", 9200, "http")); - builder.setMaxRetryTimeoutMillis(10000); // <1> - //end::rest-client-init-max-retry-timeout - } { //tag::rest-client-init-node-selector RestClientBuilder builder = RestClient.builder( @@ -305,8 +298,7 @@ public RequestConfig.Builder customizeRequestConfig( .setConnectTimeout(5000) .setSocketTimeout(60000); } - }) - .setMaxRetryTimeoutMillis(60000); + }); //end::rest-client-config-timeouts } { diff --git a/docs/java-rest/low-level/configuration.asciidoc b/docs/java-rest/low-level/configuration.asciidoc index b7da2b5ebccff..e284b52c67a67 100644 --- a/docs/java-rest/low-level/configuration.asciidoc +++ b/docs/java-rest/low-level/configuration.asciidoc @@ -18,8 +18,7 @@ https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/htt as an argument and has the same return type. The request config builder can be modified and then returned. In the following example we increase the connect timeout (defaults to 1 second) and the socket timeout (defaults to 30 -seconds). Also we adjust the max retry timeout accordingly (defaults to 30 -seconds too). +seconds). ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- diff --git a/docs/java-rest/low-level/usage.asciidoc b/docs/java-rest/low-level/usage.asciidoc index 3747314b6ecd3..ee1555019dbe1 100644 --- a/docs/java-rest/low-level/usage.asciidoc +++ b/docs/java-rest/low-level/usage.asciidoc @@ -180,15 +180,6 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-defaul <1> Set the default headers that need to be sent with each request, to prevent having to specify them with each single request -["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- -include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-max-retry-timeout] --------------------------------------------------- -<1> Set the timeout that should be honoured in case multiple attempts are made -for the same request. The default value is 30 seconds, same as the default -socket timeout. In case the socket timeout is customized, the maximum retry -timeout should be adjusted accordingly - ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failure-listener] diff --git a/docs/reference/migration/migrate_7_0/restclient.asciidoc b/docs/reference/migration/migrate_7_0/restclient.asciidoc index 3c0237db6e7b0..39d19c345cd95 100644 --- a/docs/reference/migration/migrate_7_0/restclient.asciidoc +++ b/docs/reference/migration/migrate_7_0/restclient.asciidoc @@ -20,4 +20,13 @@ e.g. `client.index(indexRequest, new Header("name" "value"))` becomes The Cluster Health API used to default to `shards` level to ease migration from transport client that doesn't support the `level` parameter and always returns information including indices and shards details. The level default -value has been aligned with the Elasticsearch default level: `cluster`. \ No newline at end of file +value has been aligned with the Elasticsearch default level: `cluster`. + +=== Low-level REST client changes + +[float] +==== Support for `maxRetryTimeout` removed from RestClient + +`RestClient` and `RestClientBuilder` no longer support the `maxRetryTimeout` +setting. The setting was removed as its counting mechanism was not accurate +and caused issues while adding little value. \ No newline at end of file diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java index 5cf96419f55bc..0a9e5f8dfc181 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java @@ -152,7 +152,7 @@ public void testFailsOnUnknownNode() throws Exception { } catch (ResponseException e) { assertThat(e.getResponse().getStatusLine().getStatusCode(), is(400)); assertThat( - e.getCause().getMessage(), + e.getMessage(), Matchers.containsString("add voting config exclusions request for [invalid] matched no master-eligible nodes") ); } diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java index 720f152265a61..9822da98fde70 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java +++ b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java @@ -41,7 +41,6 @@ public static Iterable parameters() throws Exception { protected Settings restClientSettings() { // Give more time to repository-azure to complete the snapshot operations return Settings.builder().put(super.restClientSettings()) - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "60s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "60s") .build(); } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java index eb5517b7acb56..1c57be7abbaa1 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java @@ -59,7 +59,6 @@ protected final Settings restClientSettings() { // increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 7932328c8c2f6..e7c111aad1605 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -55,7 +55,6 @@ protected Settings restClientSettings() { // increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 56f8881a5f529..177cdaad941f4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -92,7 +92,6 @@ public abstract class ESRestTestCase extends ESTestCase { public static final String TRUSTSTORE_PATH = "truststore.path"; public static final String TRUSTSTORE_PASSWORD = "truststore.password"; - public static final String CLIENT_RETRY_TIMEOUT = "client.retry.timeout"; public static final String CLIENT_SOCKET_TIMEOUT = "client.socket.timeout"; public static final String CLIENT_PATH_PREFIX = "client.path.prefix"; @@ -750,11 +749,6 @@ protected static void configureClient(RestClientBuilder builder, Settings settin } builder.setDefaultHeaders(defaultHeaders); } - final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT); - if (requestTimeoutString != null) { - final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT); - builder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis())); - } final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); if (socketTimeoutString != null) { final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java index 24c1ab1c1cbf1..8f07b532769c3 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java @@ -45,7 +45,6 @@ import java.io.IOException; import java.io.InputStream; -import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -670,16 +669,16 @@ public void testNonexistentPolicy() throws Exception { }); } - public void testInvalidPolicyNames() throws UnsupportedEncodingException { + public void testInvalidPolicyNames() { ResponseException ex; policy = randomAlphaOfLengthBetween(0,10) + "," + randomAlphaOfLengthBetween(0,10); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); - assertThat(ex.getCause().getMessage(), containsString("invalid policy name")); + assertThat(ex.getMessage(), containsString("invalid policy name")); policy = randomAlphaOfLengthBetween(0,10) + "%20" + randomAlphaOfLengthBetween(0,10); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); - assertThat(ex.getCause().getMessage(), containsString("invalid policy name")); + assertThat(ex.getMessage(), containsString("invalid policy name")); policy = "_" + randomAlphaOfLengthBetween(1, 20); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); @@ -716,7 +715,7 @@ public void testDeletePolicyInUse() throws IOException { Request deleteRequest = new Request("DELETE", "_ilm/policy/" + originalPolicy); ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(deleteRequest)); - assertThat(ex.getCause().getMessage(), + assertThat(ex.getMessage(), Matchers.allOf( containsString("Cannot delete policy [" + originalPolicy + "]. It is in use by one or more indices: ["), containsString(managedIndex1), diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index 1e40b31a24c85..e4d96645b87b7 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -68,7 +68,6 @@ protected Settings restClientSettings() { // we increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java b/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java index ff5c24b15edac..0281f38933b03 100644 --- a/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java +++ b/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.Before; +import javax.security.auth.login.LoginContext; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -33,8 +34,6 @@ import java.util.List; import java.util.Map; -import javax.security.auth.login.LoginContext; - import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.contains; @@ -148,13 +147,7 @@ private RestClient buildRestClientForKerberos(final SpnegoHttpClientConfigCallba return restClientBuilder.build(); } - private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings) - throws IOException { - final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT); - if (requestTimeoutString != null) { - final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT); - restClientBuilder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis())); - } + private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings) { final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); if (socketTimeoutString != null) { final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 3c7a9cee45562..9374346449c95 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -68,7 +68,6 @@ protected Settings restClientSettings() { // we increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); }