diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/HeaderResource.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/HeaderResource.java new file mode 100644 index 000000000..20baaa6df --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/HeaderResource.java @@ -0,0 +1,37 @@ +package io.quarkus.ts.http.restclient.reactive.fault.tolerance; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; + +/** + * Stores request headers from incoming server requests + * and exposes an endpoint that returns these headers + * It is used for testing purposes to verify if headers are correctly propagated. + */ +@Path("/fault/headers") +@ApplicationScoped +public class HeaderResource { + + private final List> headerList = new ArrayList<>(); + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getStoredHeaders() { + return Response.ok(headerList).build(); + } + + public void storeHeaders(MultivaluedMap headers) { + headerList.add(headers.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get(0)))); + } +} diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdClientRequestFilter.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdClientRequestFilter.java new file mode 100644 index 000000000..dc36c5e24 --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdClientRequestFilter.java @@ -0,0 +1,28 @@ +package io.quarkus.ts.http.restclient.reactive.fault.tolerance; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.client.ClientRequestContext; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.ext.Provider; + +/** + * Injects `REQUEST_ID` into the headers of every outgoing REST client call. + * It is used in combination with the `RequestIdManager` class to ensure + * request context propagation between client and server calls. + */ +@Provider +@ApplicationScoped +public class RequestIdClientRequestFilter implements ClientRequestFilter { + + private final RequestIdManager requestIdManager; + + public RequestIdClientRequestFilter(RequestIdManager requestIdManager) { + this.requestIdManager = requestIdManager; + } + + @Override + public void filter(ClientRequestContext requestContext) { + int requestId = requestIdManager.currentRequestId(); + requestContext.getHeaders().putSingle("REQUEST_ID", requestId); + } +} diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdContainerRequestFilter.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdContainerRequestFilter.java new file mode 100644 index 000000000..e68d5df48 --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdContainerRequestFilter.java @@ -0,0 +1,43 @@ +package io.quarkus.ts.http.restclient.reactive.fault.tolerance; + +import jakarta.enterprise.context.control.ActivateRequestContext; +import jakarta.inject.Inject; +import jakarta.ws.rs.ConstrainedTo; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.RuntimeType; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerRequestFilter; +import jakarta.ws.rs.container.PreMatching; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.ext.Provider; + +/** + * This server-side filter is used to extract the `REQUEST_ID` header + * from incoming requests and pass it to the `RequestIdManagerImpl` to manage request context. + * It also stores the headers via the `HeaderResource` for test verification. + */ +@Provider +@Produces +@PreMatching +@ConstrainedTo(RuntimeType.SERVER) +public class RequestIdContainerRequestFilter implements ContainerRequestFilter { + + @Inject + RequestIdManagerImpl requestIdManagerImpl; + + @Inject + HeaderResource headerResource; + + @Override + @ActivateRequestContext + public void filter(ContainerRequestContext requestContext) { + MultivaluedMap headers = requestContext.getHeaders(); + if (headers.containsKey("REQUEST_ID")) { + int requestId = Integer.valueOf(headers.getFirst("REQUEST_ID")); + requestIdManagerImpl.overrideRequestId(requestId); + } + + // Store the headers in the HeaderResource for later retrieval in tests + headerResource.storeHeaders(headers); + } +} diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManager.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManager.java new file mode 100644 index 000000000..f734b55a3 --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManager.java @@ -0,0 +1,6 @@ +package io.quarkus.ts.http.restclient.reactive.fault.tolerance; + +public interface RequestIdManager { + + int currentRequestId(); +} diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManagerImpl.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManagerImpl.java new file mode 100644 index 000000000..2a95fecfb --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManagerImpl.java @@ -0,0 +1,22 @@ +package io.quarkus.ts.http.restclient.reactive.fault.tolerance; + +import jakarta.enterprise.context.RequestScoped; + +/** + * Manages the `REQUEST_ID` for each request, allowing it + * to be overridden when the `REQUEST_ID` is passed via headers from client requests. + * Handles a unique id per request. + */ +@RequestScoped +public class RequestIdManagerImpl implements RequestIdManager { + + private int requestID; + + public int currentRequestId() { + return requestID; + } + + public void overrideRequestId(int inboundRequestId) { + this.requestID = inboundRequestId; + } +} diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RetryClient.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RetryClient.java new file mode 100644 index 000000000..9b3d829de --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RetryClient.java @@ -0,0 +1,32 @@ +package io.quarkus.ts.http.restclient.reactive.fault.tolerance; + +import java.util.concurrent.CompletionStage; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.ProcessingException; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import org.eclipse.microprofile.faulttolerance.Retry; +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +/** + * Interface defines a REST client making asynchronous calls + * to the `/client/async` endpoint. It uses the `@Retry` annotation + * to retry in case of a `ProcessingException`. It is called by the + * server resource `ServerRetryResource`. + */ +@ApplicationScoped +@RegisterRestClient(configKey = "client.endpoint") +@Path("/client") +public interface RetryClient { + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Retry(retryOn = ProcessingException.class) + @Path("async") + CompletionStage getItemsAsync(); +} diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/ServerRetryResource.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/ServerRetryResource.java new file mode 100644 index 000000000..94ea58b00 --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/ServerRetryResource.java @@ -0,0 +1,39 @@ +package io.quarkus.ts.http.restclient.reactive.fault.tolerance; + +import java.util.concurrent.ExecutionException; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import org.eclipse.microprofile.rest.client.inject.RestClient; + +/** + * Server resource that calling `RetryClient` to trigger + * an asynchronous client request. It is used to simulate and test + * retry logic when the client fails to complete a request. + */ +@ApplicationScoped +@Path("/server") +public class ServerRetryResource { + + private final RetryClient retryClient; + + @Inject + public ServerRetryResource(@RestClient RetryClient retryClient) { + this.retryClient = retryClient; + } + + @GET + @Produces(MediaType.TEXT_PLAIN) + @Path("async") + public Response getItemsAsync() throws ExecutionException, InterruptedException { + return retryClient.getItemsAsync() + .toCompletableFuture() + .get(); + } +} diff --git a/http/rest-client-reactive/src/main/resources/application.properties b/http/rest-client-reactive/src/main/resources/application.properties index 5e25ddc66..eebf6c47e 100644 --- a/http/rest-client-reactive/src/main/resources/application.properties +++ b/http/rest-client-reactive/src/main/resources/application.properties @@ -8,3 +8,6 @@ quarkus.rest-client.encoder-mode-rfc1738.url=${vertx-server-url} quarkus.rest-client.encoder-mode-rfc1738.multipart-post-encoder-mode=RFC1738 quarkus.rest-client.encoder-mode-rfc3986.url=${vertx-server-url} quarkus.rest-client.encoder-mode-rfc3986.multipart-post-encoder-mode=RFC3986 + +# Set the Client endpoint to a non-existing domain to trigger a fault +client.endpoint/mp-rest/url=http://unknown-domain:8080 diff --git a/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientRetryIT.java b/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientRetryIT.java new file mode 100644 index 000000000..055aab193 --- /dev/null +++ b/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientRetryIT.java @@ -0,0 +1,80 @@ +package io.quarkus.ts.http.restclient.reactive; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.core.Options.ChunkedEncodingPolicy.NEVER; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.http.Fault; + +import io.quarkus.test.bootstrap.RestService; +import io.quarkus.test.scenarios.QuarkusScenario; +import io.quarkus.test.services.QuarkusApplication; +import io.restassured.common.mapper.TypeRef; +import io.restassured.http.ContentType; + +/** + * Verifies SmallRye fault-tolerance retry logic and header propagation between the client and server. + */ +@QuarkusScenario +public class ReactiveRestClientRetryIT { + + private static final WireMockServer mockServer; + private final int headerId = 121; + + static { + mockServer = new WireMockServer(WireMockConfiguration.options() + .dynamicPort() + .useChunkedTransferEncoding(NEVER)); + mockServer.stubFor(WireMock.get(WireMock.urlEqualTo("/client/async")) + .willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))); + + mockServer.start(); + } + + @QuarkusApplication + static RestService app = new RestService() + .withProperty("client.endpoint/mp-rest/url", mockServer::baseUrl); + + @Test + @Tag("QUARKUS-4477") + // Commit that fixes the issue + // https://github.com/quarkusio/quarkus/pull/39988/commits/b9cc3c2dc65a6f61641c83a940e13c116ce6cd0c + void shouldPerformRetryOfFailingBlockingClientCall() { + app.given().header("REQUEST_ID", headerId) + .get("/server/async") + .then() + .statusCode(500); + + // Check number of server events, one failing call plus 3 retries by default + Assertions.assertEquals(4, mockServer.getServeEvents().getServeEvents().stream().count()); + + List> headers = app.given() + .get("/fault/headers") + .then() + .statusCode(200) + .contentType(ContentType.JSON) + .extract() + .body() + .as(new TypeRef<>() { + }); + + // Check if REQUEST_ID header was propagated and stored + Assertions.assertTrue(headers.stream().anyMatch(header -> header.containsKey("REQUEST_ID") + && headerId == (Integer.parseInt(header.get("REQUEST_ID"))))); + } + + @AfterAll + static void afterAll() { + mockServer.stop(); + } +}