-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Verify fault tolerance retry logic with rest client reactive
(cherry picked from commit 6eae594)
- Loading branch information
Showing
9 changed files
with
290 additions
and
0 deletions.
There are no files selected for viewing
37 changes: 37 additions & 0 deletions
37
.../src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/HeaderResource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package io.quarkus.ts.http.restclient.reactive.fault.tolerance; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.ws.rs.GET; | ||
import jakarta.ws.rs.Path; | ||
import jakarta.ws.rs.Produces; | ||
import jakarta.ws.rs.core.MediaType; | ||
import jakarta.ws.rs.core.MultivaluedMap; | ||
import jakarta.ws.rs.core.Response; | ||
|
||
/** | ||
* Stores request headers from incoming server requests | ||
* and exposes an endpoint that returns these headers | ||
* It is used for testing purposes to verify if headers are correctly propagated. | ||
*/ | ||
@Path("/fault/headers") | ||
@ApplicationScoped | ||
public class HeaderResource { | ||
|
||
private final List<Map<String, String>> headerList = new ArrayList<>(); | ||
|
||
@GET | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response getStoredHeaders() { | ||
return Response.ok(headerList).build(); | ||
} | ||
|
||
public void storeHeaders(MultivaluedMap<String, String> headers) { | ||
headerList.add(headers.entrySet().stream() | ||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get(0)))); | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
.../io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdClientRequestFilter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package io.quarkus.ts.http.restclient.reactive.fault.tolerance; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.ws.rs.client.ClientRequestContext; | ||
import jakarta.ws.rs.client.ClientRequestFilter; | ||
import jakarta.ws.rs.ext.Provider; | ||
|
||
/** | ||
* Injects `REQUEST_ID` into the headers of every outgoing REST client call. | ||
* It is used in combination with the `RequestIdManager` class to ensure | ||
* request context propagation between client and server calls. | ||
*/ | ||
@Provider | ||
@ApplicationScoped | ||
public class RequestIdClientRequestFilter implements ClientRequestFilter { | ||
|
||
private final RequestIdManager requestIdManager; | ||
|
||
public RequestIdClientRequestFilter(RequestIdManager requestIdManager) { | ||
this.requestIdManager = requestIdManager; | ||
} | ||
|
||
@Override | ||
public void filter(ClientRequestContext requestContext) { | ||
int requestId = requestIdManager.currentRequestId(); | ||
requestContext.getHeaders().putSingle("REQUEST_ID", requestId); | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
.../quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdContainerRequestFilter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package io.quarkus.ts.http.restclient.reactive.fault.tolerance; | ||
|
||
import jakarta.enterprise.context.control.ActivateRequestContext; | ||
import jakarta.inject.Inject; | ||
import jakarta.ws.rs.ConstrainedTo; | ||
import jakarta.ws.rs.Produces; | ||
import jakarta.ws.rs.RuntimeType; | ||
import jakarta.ws.rs.container.ContainerRequestContext; | ||
import jakarta.ws.rs.container.ContainerRequestFilter; | ||
import jakarta.ws.rs.container.PreMatching; | ||
import jakarta.ws.rs.core.MultivaluedMap; | ||
import jakarta.ws.rs.ext.Provider; | ||
|
||
/** | ||
* This server-side filter is used to extract the `REQUEST_ID` header | ||
* from incoming requests and pass it to the `RequestIdManagerImpl` to manage request context. | ||
* It also stores the headers via the `HeaderResource` for test verification. | ||
*/ | ||
@Provider | ||
@Produces | ||
@PreMatching | ||
@ConstrainedTo(RuntimeType.SERVER) | ||
public class RequestIdContainerRequestFilter implements ContainerRequestFilter { | ||
|
||
@Inject | ||
RequestIdManagerImpl requestIdManagerImpl; | ||
|
||
@Inject | ||
HeaderResource headerResource; | ||
|
||
@Override | ||
@ActivateRequestContext | ||
public void filter(ContainerRequestContext requestContext) { | ||
MultivaluedMap<String, String> headers = requestContext.getHeaders(); | ||
if (headers.containsKey("REQUEST_ID")) { | ||
int requestId = Integer.valueOf(headers.getFirst("REQUEST_ID")); | ||
requestIdManagerImpl.overrideRequestId(requestId); | ||
} | ||
|
||
// Store the headers in the HeaderResource for later retrieval in tests | ||
headerResource.storeHeaders(headers); | ||
} | ||
} |
6 changes: 6 additions & 0 deletions
6
...rc/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package io.quarkus.ts.http.restclient.reactive.fault.tolerance; | ||
|
||
public interface RequestIdManager { | ||
|
||
int currentRequestId(); | ||
} |
22 changes: 22 additions & 0 deletions
22
...ain/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RequestIdManagerImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package io.quarkus.ts.http.restclient.reactive.fault.tolerance; | ||
|
||
import jakarta.enterprise.context.RequestScoped; | ||
|
||
/** | ||
* Manages the `REQUEST_ID` for each request, allowing it | ||
* to be overridden when the `REQUEST_ID` is passed via headers from client requests. | ||
* Handles a unique id per request. | ||
*/ | ||
@RequestScoped | ||
public class RequestIdManagerImpl implements RequestIdManager { | ||
|
||
private int requestID; | ||
|
||
public int currentRequestId() { | ||
return requestID; | ||
} | ||
|
||
public void overrideRequestId(int inboundRequestId) { | ||
this.requestID = inboundRequestId; | ||
} | ||
} |
32 changes: 32 additions & 0 deletions
32
...ive/src/main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/RetryClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package io.quarkus.ts.http.restclient.reactive.fault.tolerance; | ||
|
||
import java.util.concurrent.CompletionStage; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.ws.rs.GET; | ||
import jakarta.ws.rs.Path; | ||
import jakarta.ws.rs.ProcessingException; | ||
import jakarta.ws.rs.Produces; | ||
import jakarta.ws.rs.core.MediaType; | ||
import jakarta.ws.rs.core.Response; | ||
|
||
import org.eclipse.microprofile.faulttolerance.Retry; | ||
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; | ||
|
||
/** | ||
* Interface defines a REST client making asynchronous calls | ||
* to the `/client/async` endpoint. It uses the `@Retry` annotation | ||
* to retry in case of a `ProcessingException`. It is called by the | ||
* server resource `ServerRetryResource`. | ||
*/ | ||
@ApplicationScoped | ||
@RegisterRestClient(configKey = "client.endpoint") | ||
@Path("/client") | ||
public interface RetryClient { | ||
|
||
@GET | ||
@Produces(MediaType.APPLICATION_JSON) | ||
@Retry(retryOn = ProcessingException.class) | ||
@Path("async") | ||
CompletionStage<Response> getItemsAsync(); | ||
} |
39 changes: 39 additions & 0 deletions
39
...main/java/io/quarkus/ts/http/restclient/reactive/fault/tolerance/ServerRetryResource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package io.quarkus.ts.http.restclient.reactive.fault.tolerance; | ||
|
||
import java.util.concurrent.ExecutionException; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
import jakarta.ws.rs.GET; | ||
import jakarta.ws.rs.Path; | ||
import jakarta.ws.rs.Produces; | ||
import jakarta.ws.rs.core.MediaType; | ||
import jakarta.ws.rs.core.Response; | ||
|
||
import org.eclipse.microprofile.rest.client.inject.RestClient; | ||
|
||
/** | ||
* Server resource that calling `RetryClient` to trigger | ||
* an asynchronous client request. It is used to simulate and test | ||
* retry logic when the client fails to complete a request. | ||
*/ | ||
@ApplicationScoped | ||
@Path("/server") | ||
public class ServerRetryResource { | ||
|
||
private final RetryClient retryClient; | ||
|
||
@Inject | ||
public ServerRetryResource(@RestClient RetryClient retryClient) { | ||
this.retryClient = retryClient; | ||
} | ||
|
||
@GET | ||
@Produces(MediaType.TEXT_PLAIN) | ||
@Path("async") | ||
public Response getItemsAsync() throws ExecutionException, InterruptedException { | ||
return retryClient.getItemsAsync() | ||
.toCompletableFuture() | ||
.get(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
...ctive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientRetryIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package io.quarkus.ts.http.restclient.reactive; | ||
|
||
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; | ||
import static com.github.tomakehurst.wiremock.core.Options.ChunkedEncodingPolicy.NEVER; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import org.junit.jupiter.api.AfterAll; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.Tag; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import com.github.tomakehurst.wiremock.WireMockServer; | ||
import com.github.tomakehurst.wiremock.client.WireMock; | ||
import com.github.tomakehurst.wiremock.core.WireMockConfiguration; | ||
import com.github.tomakehurst.wiremock.http.Fault; | ||
|
||
import io.quarkus.test.bootstrap.RestService; | ||
import io.quarkus.test.scenarios.QuarkusScenario; | ||
import io.quarkus.test.services.QuarkusApplication; | ||
import io.restassured.common.mapper.TypeRef; | ||
import io.restassured.http.ContentType; | ||
|
||
/** | ||
* Verifies SmallRye fault-tolerance retry logic and header propagation between the client and server. | ||
*/ | ||
@QuarkusScenario | ||
public class ReactiveRestClientRetryIT { | ||
|
||
private static final WireMockServer mockServer; | ||
private final int headerId = 121; | ||
|
||
static { | ||
mockServer = new WireMockServer(WireMockConfiguration.options() | ||
.dynamicPort() | ||
.useChunkedTransferEncoding(NEVER)); | ||
mockServer.stubFor(WireMock.get(WireMock.urlEqualTo("/client/async")) | ||
.willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER))); | ||
|
||
mockServer.start(); | ||
} | ||
|
||
@QuarkusApplication | ||
static RestService app = new RestService() | ||
.withProperty("client.endpoint/mp-rest/url", mockServer::baseUrl); | ||
|
||
@Test | ||
@Tag("QUARKUS-4477") | ||
// Commit that fixes the issue | ||
// https://github.com/quarkusio/quarkus/pull/39988/commits/b9cc3c2dc65a6f61641c83a940e13c116ce6cd0c | ||
void shouldPerformRetryOfFailingBlockingClientCall() { | ||
app.given().header("REQUEST_ID", headerId) | ||
.get("/server/async") | ||
.then() | ||
.statusCode(500); | ||
|
||
// Check number of server events, one failing call plus 3 retries by default | ||
Assertions.assertEquals(4, mockServer.getServeEvents().getServeEvents().stream().count()); | ||
|
||
List<Map<String, String>> headers = app.given() | ||
.get("/fault/headers") | ||
.then() | ||
.statusCode(200) | ||
.contentType(ContentType.JSON) | ||
.extract() | ||
.body() | ||
.as(new TypeRef<>() { | ||
}); | ||
|
||
// Check if REQUEST_ID header was propagated and stored | ||
Assertions.assertTrue(headers.stream().anyMatch(header -> header.containsKey("REQUEST_ID") | ||
&& headerId == (Integer.parseInt(header.get("REQUEST_ID"))))); | ||
} | ||
|
||
@AfterAll | ||
static void afterAll() { | ||
mockServer.stop(); | ||
} | ||
} |