Skip to content

Commit

Permalink
Throw a TransportException when an error response cannot be parsed (#579
Browse files Browse the repository at this point in the history
) (#581)

Co-authored-by: Sylvain Wallez <[email protected]>
  • Loading branch information
github-actions[bot] and swallez authored May 23, 2023
1 parent 96716b8 commit 02fa978
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,32 @@

public class TransportException extends IOException {

private final int statusCode;
private final String endpointId;

public TransportException(String message, String endpointId) {
this(message, endpointId, null);
public TransportException(int statusCode, String message, String endpointId) {
this(statusCode, message, endpointId, null);
}

public TransportException(String message, String endpointId, Throwable cause) {
super(endpointId == null ? message : "[" + endpointId + "] " + message, cause);
public TransportException(int statusCode, String message, String endpointId, Throwable cause) {
super("status: " + statusCode + ", " + (endpointId == null ? message : "[" + endpointId + "] " + message), cause);
this.statusCode = statusCode;
this.endpointId = endpointId;
}

/**
* Status code returned by the http resquest
*/
public int statusCode() {
return statusCode;
}

/**
* Identifier of the API endpoint that caused the exception, if known.
*/
@Nullable
String getEndpointId() {
public String endpointId() {
return endpointId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import co.elastic.clients.util.ApiTypeHelper;
import co.elastic.clients.util.BinaryData;
import co.elastic.clients.util.MissingRequiredPropertyException;
import jakarta.json.JsonException;
import jakarta.json.stream.JsonGenerator;
import jakarta.json.stream.JsonParser;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class RestClientTransport implements ElasticsearchTransport {

/**
* The {@code Future} implementation returned by async requests.
* It wraps the RestClient's cancellable and progagates cancellation.
* It wraps the RestClient's cancellable and propagates cancellation.
*/
private static class RequestFuture<T> extends CompletableFuture<T> {
private volatile Cancellable cancellable;
Expand Down Expand Up @@ -310,6 +311,7 @@ private <ResponseT, ErrorT> ResponseT getHighLevelResponse(
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null) {
throw new TransportException(
statusCode,
"Request failed with status code '" + statusCode + "'",
endpoint.id(), new ResponseException(clientResp)
);
Expand All @@ -318,6 +320,7 @@ private <ResponseT, ErrorT> ResponseT getHighLevelResponse(
HttpEntity entity = clientResp.getEntity();
if (entity == null) {
throw new TransportException(
statusCode,
"Expecting a response body, but none was sent",
endpoint.id(), new ResponseException(clientResp)
);
Expand All @@ -333,14 +336,17 @@ private <ResponseT, ErrorT> ResponseT getHighLevelResponse(
// TODO: have the endpoint provide the exception constructor
throw new ElasticsearchException(endpoint.id(), (ErrorResponse) error);
}
} catch(MissingRequiredPropertyException errorEx) {
} catch(JsonException | MissingRequiredPropertyException errorEx) {
// Could not decode exception, try the response type
try {
ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint);
return response;
} catch(Exception respEx) {
// No better luck: throw the original error decoding exception
throw new TransportException("Failed to decode error response", endpoint.id(), new ResponseException(clientResp));
throw new TransportException(statusCode,
"Failed to decode error response, check exception cause for additional details", endpoint.id(),
new ResponseException(clientResp)
);
}
}
} else {
Expand Down Expand Up @@ -368,6 +374,7 @@ private <ResponseT> ResponseT decodeResponse(
// Expecting a body
if (entity == null) {
throw new TransportException(
statusCode,
"Expecting a response body, but none was sent",
endpoint.id(), new ResponseException(clientResp)
);
Expand Down Expand Up @@ -395,7 +402,7 @@ private <ResponseT> ResponseT decodeResponse(
return response;

} else {
throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'", endpoint.id());
throw new TransportException(statusCode, "Unhandled endpoint type: '" + endpoint.getClass().getName() + "'", endpoint.id());
}
}

Expand All @@ -411,6 +418,7 @@ private void checkProductHeader(Response clientResp, Endpoint<?, ?, ?> endpoint)
return;
}
throw new TransportException(
clientResp.getStatusLine().getStatusCode(),
"Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch "
+ "instance, and that any networking filters are preserving that header.",
endpoint.id(),
Expand All @@ -419,7 +427,9 @@ private void checkProductHeader(Response clientResp, Endpoint<?, ?, ?> endpoint)
}

if (!"Elasticsearch".equals(header)) {
throw new TransportException("Invalid value '" + header + "' for 'X-Elastic-Product' header.",
throw new TransportException(
clientResp.getStatusLine().getStatusCode(),
"Invalid value '" + header + "' for 'X-Elastic-Product' header.",
endpoint.id(),
new ResponseException(clientResp)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.transport;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpHost;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;

public class TransportTest extends Assertions {

@Test
public void testXMLResponse() throws Exception {
HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);

httpServer.createContext("/_cat/indices", exchange -> {
exchange.sendResponseHeaders(401, 0);
OutputStream out = exchange.getResponseBody();
out.write(
"<?xml version=\"1.0\"?><error>Error</error>".getBytes(StandardCharsets.UTF_8)
);
out.close();
});

httpServer.start();
InetSocketAddress address = httpServer.getAddress();

RestClient restClient = RestClient
.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
.build();

ElasticsearchClient esClient = new ElasticsearchClient(new RestClientTransport(restClient, new JacksonJsonpMapper()));

TransportException ex = Assertions.assertThrows(
TransportException.class,
() -> esClient.cat().indices()
);

httpServer.stop(0);

assertEquals(401, ex.statusCode());
assertEquals("es/cat.indices", ex.endpointId());

// Cause is transport-dependent
ResponseException restException = (ResponseException) ex.getCause();
assertEquals(401, restException.getResponse().getStatusLine().getStatusCode());
}
}

0 comments on commit 02fa978

Please sign in to comment.