Skip to content

Commit

Permalink
Verify fault tolerance retry logic with rest client reactive
Browse files Browse the repository at this point in the history
  • Loading branch information
gtroitsk committed Sep 5, 2024
1 parent 8321699 commit 6eae594
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.ts.http.restclient.reactive.fault.tolerance;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.core.Response;

/**
* Stores request headers from incoming server requests
* and exposes an endpoint that returns these headers
* It is used for testing purposes to verify if headers are correctly propagated.
*/
@Path("/fault/headers")
@ApplicationScoped
public class HeaderResource {

private final List<Map<String, String>> headerList = new ArrayList<>();

@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getStoredHeaders() {
return Response.ok(headerList).build();
}

public void storeHeaders(MultivaluedMap<String, String> headers) {
headerList.add(headers.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get(0))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.ts.http.restclient.reactive.fault.tolerance;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.client.ClientRequestContext;
import jakarta.ws.rs.client.ClientRequestFilter;
import jakarta.ws.rs.ext.Provider;

/**
* Injects `REQUEST_ID` into the headers of every outgoing REST client call.
* It is used in combination with the `RequestIdManager` class to ensure
* request context propagation between client and server calls.
*/
@Provider
@ApplicationScoped
public class RequestIdClientRequestFilter implements ClientRequestFilter {

private final RequestIdManager requestIdManager;

public RequestIdClientRequestFilter(RequestIdManager requestIdManager) {
this.requestIdManager = requestIdManager;
}

@Override
public void filter(ClientRequestContext requestContext) {
int requestId = requestIdManager.currentRequestId();
requestContext.getHeaders().putSingle("REQUEST_ID", requestId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.ts.http.restclient.reactive.fault.tolerance;

import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import jakarta.ws.rs.ConstrainedTo;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.RuntimeType;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.container.ContainerRequestFilter;
import jakarta.ws.rs.container.PreMatching;
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.ext.Provider;

/**
* This server-side filter is used to extract the `REQUEST_ID` header
* from incoming requests and pass it to the `RequestIdManagerImpl` to manage request context.
* It also stores the headers via the `HeaderResource` for test verification.
*/
@Provider
@Produces
@PreMatching
@ConstrainedTo(RuntimeType.SERVER)
public class RequestIdContainerRequestFilter implements ContainerRequestFilter {

@Inject
RequestIdManagerImpl requestIdManagerImpl;

@Inject
HeaderResource headerResource;

@Override
@ActivateRequestContext
public void filter(ContainerRequestContext requestContext) {
MultivaluedMap<String, String> headers = requestContext.getHeaders();
if (headers.containsKey("REQUEST_ID")) {
int requestId = Integer.valueOf(headers.getFirst("REQUEST_ID"));
requestIdManagerImpl.overrideRequestId(requestId);
}

// Store the headers in the HeaderResource for later retrieval in tests
headerResource.storeHeaders(headers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.quarkus.ts.http.restclient.reactive.fault.tolerance;

public interface RequestIdManager {

int currentRequestId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.ts.http.restclient.reactive.fault.tolerance;

import jakarta.enterprise.context.RequestScoped;

/**
* Manages the `REQUEST_ID` for each request, allowing it
* to be overridden when the `REQUEST_ID` is passed via headers from client requests.
* Handles a unique id per request.
*/
@RequestScoped
public class RequestIdManagerImpl implements RequestIdManager {

private int requestID;

public int currentRequestId() {
return requestID;
}

public void overrideRequestId(int inboundRequestId) {
this.requestID = inboundRequestId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.ts.http.restclient.reactive.fault.tolerance;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.ProcessingException;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

/**
* Interface defines a REST client making asynchronous calls
* to the `/client/async` endpoint. It uses the `@Retry` annotation
* to retry in case of a `ProcessingException`. It is called by the
* server resource `ServerRetryResource`.
*/
@ApplicationScoped
@RegisterRestClient(configKey = "client.endpoint")
@Path("/client")
public interface RetryClient {

@GET
@Produces(MediaType.APPLICATION_JSON)
@Retry(retryOn = ProcessingException.class)
@Path("async")
CompletionStage<Response> getItemsAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.ts.http.restclient.reactive.fault.tolerance;

import java.util.concurrent.ExecutionException;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import org.eclipse.microprofile.rest.client.inject.RestClient;

/**
* Server resource that calling `RetryClient` to trigger
* an asynchronous client request. It is used to simulate and test
* retry logic when the client fails to complete a request.
*/
@ApplicationScoped
@Path("/server")
public class ServerRetryResource {

private final RetryClient retryClient;

@Inject
public ServerRetryResource(@RestClient RetryClient retryClient) {
this.retryClient = retryClient;
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("async")
public Response getItemsAsync() throws ExecutionException, InterruptedException {
return retryClient.getItemsAsync()
.toCompletableFuture()
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ quarkus.rest-client.encoder-mode-rfc1738.url=${vertx-server-url}
quarkus.rest-client.encoder-mode-rfc1738.multipart-post-encoder-mode=RFC1738
quarkus.rest-client.encoder-mode-rfc3986.url=${vertx-server-url}
quarkus.rest-client.encoder-mode-rfc3986.multipart-post-encoder-mode=RFC3986

# Set the Client endpoint to a non-existing domain to trigger a fault
client.endpoint/mp-rest/url=http://unknown-domain:8080
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.quarkus.ts.http.restclient.reactive;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.core.Options.ChunkedEncodingPolicy.NEVER;

import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.Fault;

import io.quarkus.test.bootstrap.RestService;
import io.quarkus.test.scenarios.QuarkusScenario;
import io.quarkus.test.services.QuarkusApplication;
import io.restassured.common.mapper.TypeRef;
import io.restassured.http.ContentType;

/**
* Verifies SmallRye fault-tolerance retry logic and header propagation between the client and server.
*/
@QuarkusScenario
public class ReactiveRestClientRetryIT {

private static final WireMockServer mockServer;
private final int headerId = 121;

static {
mockServer = new WireMockServer(WireMockConfiguration.options()
.dynamicPort()
.useChunkedTransferEncoding(NEVER));
mockServer.stubFor(WireMock.get(WireMock.urlEqualTo("/client/async"))
.willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER)));

mockServer.start();
}

@QuarkusApplication
static RestService app = new RestService()
.withProperty("client.endpoint/mp-rest/url", mockServer::baseUrl);

@Test
@Tag("QUARKUS-4477")
// Commit that fixes the issue
// https://github.com/quarkusio/quarkus/pull/39988/commits/b9cc3c2dc65a6f61641c83a940e13c116ce6cd0c
void shouldPerformRetryOfFailingBlockingClientCall() {
app.given().header("REQUEST_ID", headerId)
.get("/server/async")
.then()
.statusCode(500);

// Check number of server events, one failing call plus 3 retries by default
Assertions.assertEquals(4, mockServer.getServeEvents().getServeEvents().stream().count());

List<Map<String, String>> headers = app.given()
.get("/fault/headers")
.then()
.statusCode(200)
.contentType(ContentType.JSON)
.extract()
.body()
.as(new TypeRef<>() {
});

// Check if REQUEST_ID header was propagated and stored
Assertions.assertTrue(headers.stream().anyMatch(header -> header.containsKey("REQUEST_ID")
&& headerId == (Integer.parseInt(header.get("REQUEST_ID")))));
}

@AfterAll
static void afterAll() {
mockServer.stop();
}
}

0 comments on commit 6eae594

Please sign in to comment.