Skip to content

Commit

Permalink
[grid] enable the httpclient to perform async requests #14403 (#14409)
Browse files Browse the repository at this point in the history
Co-authored-by: Viet Nguyen Duc <[email protected]>
  • Loading branch information
joerg1985 and VietND96 authored Oct 29, 2024
1 parent f391cd0 commit e7c09a2
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 27 deletions.
5 changes: 5 additions & 0 deletions java/src/org/openqa/selenium/remote/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.URL;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.openqa.selenium.internal.Require;
Expand All @@ -32,6 +33,10 @@ public interface HttpClient extends Closeable, HttpHandler {

WebSocket openSocket(HttpRequest request, WebSocket.Listener listener);

default CompletableFuture<HttpResponse> executeAsync(HttpRequest req) {
return CompletableFuture.supplyAsync(() -> execute(req));
}

default void close() {}

interface Factory {
Expand Down
91 changes: 64 additions & 27 deletions java/src/org/openqa/selenium/remote/http/jdk/JdkHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand Down Expand Up @@ -369,9 +370,64 @@ private URI getWebSocketUri(HttpRequest request) throws URISyntaxException {
return uri;
}

@Override
public CompletableFuture<HttpResponse> executeAsync(HttpRequest request) {
// the facade for this http request
CompletableFuture<HttpResponse> cf = new CompletableFuture<>();

// the actual http request
Future<?> future =
executorService.submit(
() -> {
try {
HttpResponse response = handler.execute(request);

cf.complete(response);
} catch (Throwable t) {
cf.completeExceptionally(t);
}
});

// try to interrupt the http request in case of a timeout, to avoid
// https://bugs.openjdk.org/browse/JDK-8258397
cf.exceptionally(
(throwable) -> {
if (throwable instanceof java.util.concurrent.TimeoutException) {
// interrupts the thread
future.cancel(true);
}

// nobody will read this result
return null;
});

// will complete exceptionally with a java.util.concurrent.TimeoutException
return cf.orTimeout(readTimeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
return handler.execute(req);
try {
// executeAsync does define a timeout, no need to use a timeout here
return executeAsync(req).get();
} catch (CancellationException e) {
throw new WebDriverException(e.getMessage(), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new WebDriverException(e.getMessage(), e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();

if (cause instanceof java.util.concurrent.TimeoutException) {
throw new TimeoutException(cause);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
}

throw new WebDriverException((cause != null) ? cause : e);
}
}

private HttpResponse execute0(HttpRequest req) throws UncheckedIOException {
Expand All @@ -390,34 +446,13 @@ private HttpResponse execute0(HttpRequest req) throws UncheckedIOException {
// - avoid a downgrade of POST requests, see the javadoc of j.n.h.HttpClient.Redirect
// - not run into https://bugs.openjdk.org/browse/JDK-8304701
for (int i = 0; i < 100; i++) {
java.net.http.HttpRequest request = messages.createRequest(req, method, rawUri);
java.net.http.HttpResponse<byte[]> response;

// use sendAsync to not run into https://bugs.openjdk.org/browse/JDK-8258397
CompletableFuture<java.net.http.HttpResponse<byte[]>> future =
client.sendAsync(request, byteHandler);

try {
response = future.get(readTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (CancellationException e) {
throw new WebDriverException(e.getMessage(), e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();

if (cause instanceof HttpTimeoutException) {
throw new TimeoutException(cause);
} else if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}

throw new WebDriverException((cause != null) ? cause : e);
} catch (java.util.concurrent.TimeoutException e) {
future.cancel(true);
throw new TimeoutException(e);
if (Thread.interrupted()) {
throw new InterruptedException("http request has been interrupted");
}

java.net.http.HttpRequest request = messages.createRequest(req, method, rawUri);
java.net.http.HttpResponse<byte[]> response = client.send(request, byteHandler);

switch (response.statusCode()) {
case 303:
method = HttpMethod.GET;
Expand Down Expand Up @@ -454,6 +489,8 @@ private HttpResponse execute0(HttpRequest req) throws UncheckedIOException {
}

throw new ProtocolException("Too many redirects: 101");
} catch (HttpTimeoutException e) {
throw new TimeoutException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -233,6 +234,35 @@ public void shouldAllowConfigurationFromSystemProperties() {
}
}

@Test
public void shouldStopRequestAfterTimeout() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();

delegate =
req -> {
counter.incrementAndGet();
try {
Thread.sleep(1600);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
HttpResponse response = new HttpResponse();
response.setStatus(302);
response.addHeader("Location", "/");
return response;
};
ClientConfig clientConfig = ClientConfig.defaultConfig().readTimeout(Duration.ofMillis(800));

try (HttpClient client =
createFactory().createClient(clientConfig.baseUri(URI.create(server.whereIs("/"))))) {
HttpRequest request = new HttpRequest(GET, "/delayed");
assertThatExceptionOfType(TimeoutException.class).isThrownBy(() -> client.execute(request));
Thread.sleep(4200);

assertThat(counter.get()).isEqualTo(1);
}
}

private HttpResponse getResponseWithHeaders(final Multimap<String, String> headers) {
return executeWithinServer(
new HttpRequest(GET, "/foo"),
Expand Down

0 comments on commit e7c09a2

Please sign in to comment.