Skip to content

Commit

Permalink
fix: expands the HTTP interceptor API to include a call back for fail…
Browse files Browse the repository at this point in the history
…ed connection attempts (6144)

Add test which demonstrates what I expect to work
---
Introduce afterConnectionFailure to complement  `after` & `afterFailure`.

Fixes #6143
---
Add a bit more explanatory Javadoc to Interceptor
---
Extract private method to ensure changes to DefaultMockServer usage are consistent.
---
Add test which covers future being completed exceptionally with a CompletionException.

This happens in the Jetty implementation but  gets wrapped in an IOException by the OkHttp impl but this should be enough for the coverage checker.
---
Move afterConnectionFailure callback to `retryWithExponentialBackoff` so its invoked when an established websocket connection fails
---
restart the mock server after a connection failure is detected to speed up tests.

Rather than waiting ~20s to give up retrying.
---
Add changelog entry
---
Clarify javadoc
---
@see -> @link
---
Add unit test to ensure we unwrap CompletionExceptions.

Really just making the coverage checker happy.
---
Drop exception handling test from AbstractInterceptorTest as its now better covered elsewhere.

(cherry picked from commit 8147542)
Signed-off-by: Marc Nuri <[email protected]>
  • Loading branch information
SamBarker authored and manusa committed Aug 9, 2024
1 parent 710a8a2 commit 1f1da91
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Fix #6066: Added support for missing `v1.APIVersions` in KubernetesClient
* Fix #6110: VolumeSource (and other file mode fields) in Octal are correctly interpreted
* Fix #6137: `ConfigBuilder.withAutoConfigure` is not working
* Fix #6143: Expands the HTTP interceptor API to include a call back for failed connection attempts
* Fix #6197: JettyHttp client error handling improvements.
* Fix #6215: Suppressing rejected execution exception for port forwarder
* Fix #6212: Improved reliability of file upload to Pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* A collection of callback methods invoked through the various stages of the HTTP request lifecycle.
* Each invocation of {@link Interceptor#before(BasicBuilder, HttpRequest, RequestTags)} will be matched with a call to one of
* {@link Interceptor#afterConnectionFailure(HttpRequest, Throwable)} or
* {@link Interceptor#after(HttpRequest, HttpResponse, AsyncBody.Consumer)}.
* Callbacks that lead to a request being sent allow for that request to be customised.
*/
public interface Interceptor {

interface RequestTags {
Expand Down Expand Up @@ -63,7 +70,10 @@ default AsyncBody.Consumer<List<ByteBuffer>> consumer(AsyncBody.Consumer<List<By
}

/**
* Called after a websocket failure or by default from a normal request
* Called after a websocket failure or by default from a normal request.
* <p>
* Failure is determined by HTTP status code and will be invoked in addition to {@link Interceptor#after(HttpRequest,
* HttpResponse, AsyncBody.Consumer)}
*
* @param builder used to modify the request
* @param response the failed response
Expand All @@ -75,7 +85,10 @@ default CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpRespon

/**
* Called after a non-websocket failure
*
* <p>
* Failure is determined by HTTP status code and will be invoked in addition to {@link Interceptor#after(HttpRequest,
* HttpResponse, AsyncBody.Consumer)}
*
* @param builder used to modify the request
* @param response the failed response
* @return true if the builder should be used to execute a new request
Expand All @@ -84,4 +97,15 @@ default CompletableFuture<Boolean> afterFailure(HttpRequest.Builder builder, Htt
return afterFailure((BasicBuilder) builder, response, tags);
}

/**
* Called after a connection attempt fails.
* <p>
* This method will be invoked on each failed connection attempt.
*
* @param request the HTTP request.
* @param failure the Java exception that caused the failure.
*/
default void afterConnectionFailure(HttpRequest request, Throwable failure) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,31 @@ private <V> CompletableFuture<V> retryWithExponentialBackoff(
}
}
} else {
if (throwable instanceof CompletionException) {
throwable = throwable.getCause();
}
if (throwable instanceof IOException) {
final Throwable actualCause = unwrapCompletionException(throwable);
builder.interceptors.forEach((s, interceptor) -> interceptor.afterConnectionFailure(request, actualCause));
if (actualCause instanceof IOException) {
// TODO: may not be specific enough - incorrect ssl settings for example will get caught here
LOG.debug(
String.format("HTTP operation on url: %s should be retried after %d millis because of IOException",
uri, retryInterval),
throwable);
actualCause);
return true;
}
}
return false;
});
}

static Throwable unwrapCompletionException(Throwable throwable) {
final Throwable actualCause;
if (throwable instanceof CompletionException) {
actualCause = throwable.getCause();
} else {
actualCause = throwable;
}
return actualCause;
}

static long retryAfterMillis(HttpResponse<?> httpResponse) {
String retryAfter = httpResponse.header(StandardHttpHeaders.RETRY_AFTER);
if (retryAfter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,26 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class AbstractInterceptorTest {

private static final Duration FUTURE_COMPLETION_TIME = Duration.of(10, ChronoUnit.SECONDS);
private static DefaultMockServer server;

@BeforeEach
void startServer() {
server = new DefaultMockServer(false);
server = newMockServer();
server.start();
}

Expand Down Expand Up @@ -170,6 +174,69 @@ public CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpRespons
}
}

@Test
@DisplayName("afterConnectionFailure, invoked when remote server offline")
public void afterConnectionFailureRemoteOffline() {
// Given
final int originalPort = server.getPort();
server.shutdown();
final CountDownLatch connectionFailureCallbackInvoked = new CountDownLatch(1);
final HttpClient.Builder builder = getHttpClientFactory().newBuilder()
.connectTimeout(1, TimeUnit.SECONDS)
.addOrReplaceInterceptor("test", new Interceptor() {
@Override
public void afterConnectionFailure(HttpRequest request, Throwable failure) {
connectionFailureCallbackInvoked.countDown();
server = newMockServer();
server.start(originalPort); // Need to restart on the original port as we can't alter the request during retry.
}
});
// When
try (HttpClient client = builder.build()) {
final CompletableFuture<HttpResponse<String>> response = client.sendAsync(client.newHttpRequestBuilder()
.timeout(1, TimeUnit.SECONDS)
.uri(server.url("/not-found")).build(), String.class);

// Then
assertThat(response).succeedsWithin(FUTURE_COMPLETION_TIME);
assertThat(connectionFailureCallbackInvoked).extracting(CountDownLatch::getCount).isEqualTo(0L);
}
}

@Test
@DisplayName("afterConnectionFailure, request is retried when remote server offline")
public void afterConnectionFailureRetry() {
// Given
final int originalPort = server.getPort();
server.shutdown();
final CountDownLatch afterInvoked = new CountDownLatch(1);
final HttpClient.Builder builder = getHttpClientFactory().newBuilder()
.connectTimeout(1, TimeUnit.SECONDS)
.addOrReplaceInterceptor("test", new Interceptor() {
@Override
public void afterConnectionFailure(HttpRequest request, Throwable failure) {
server = newMockServer();
server.start(originalPort); // Need to restart on the original port as we can't alter the request during retry.
server.expect().withPath("/intercepted-url").andReturn(200, "This works").once();
}

@Override
public void after(HttpRequest request, HttpResponse<?> response, Consumer<List<ByteBuffer>> consumer) {
afterInvoked.countDown();
}
});
// When
try (HttpClient client = builder.build()) {
final CompletableFuture<HttpResponse<String>> response = client.sendAsync(client.newHttpRequestBuilder()
.timeout(1, TimeUnit.SECONDS)
.uri(server.url("/intercepted-url")).build(), String.class);

// Then
assertThat(response).succeedsWithin(FUTURE_COMPLETION_TIME);
assertThat(afterInvoked).extracting(CountDownLatch::getCount).isEqualTo(0L);
}
}

@Test
@DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes")
public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception {
Expand Down Expand Up @@ -412,4 +479,7 @@ public void before(BasicBuilder builder, HttpRequest request, RequestTags tags)
.containsEntry("test-header", Collections.singletonList("Test-Value-Override"));
}

private static DefaultMockServer newMockServer() {
return new DefaultMockServer(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -50,6 +51,7 @@

class StandardHttpClientTest {

public static final String IO_ERROR_MESSAGE = "IO woopsie";
private TestStandardHttpClient client;

@BeforeEach
Expand Down Expand Up @@ -281,4 +283,26 @@ void testDerivedIsClosed() {
assertTrue(client.isClosed());
}

@Test
void shouldUnwrapCompletionException() {
// Given

// When
final Throwable throwable = StandardHttpClient
.unwrapCompletionException(new CompletionException(new IOException(IO_ERROR_MESSAGE)));

// Then
assertThat(throwable).isInstanceOf(IOException.class).hasMessage(IO_ERROR_MESSAGE);
}

@Test
void shouldNotUnwrapOtherExceptions() {
// Given

// When
final Throwable throwable = StandardHttpClient.unwrapCompletionException(new IOException(IO_ERROR_MESSAGE));

// Then
assertThat(throwable).isInstanceOf(IOException.class).hasMessage(IO_ERROR_MESSAGE);
}
}

0 comments on commit 1f1da91

Please sign in to comment.