diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index cf3828cd70ed..a675dba7019e 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -78,6 +78,9 @@ class DefaultWebClient implements WebClient { private final ExchangeFunction exchangeFunction; + @Nullable + private final ExchangeFilterFunction filterFunctions; + private final UriBuilderFactory uriBuilderFactory; @Nullable @@ -93,19 +96,21 @@ class DefaultWebClient implements WebClient { private final ObservationRegistry observationRegistry; + @Nullable private final ClientRequestObservationConvention observationConvention; private final DefaultWebClientBuilder builder; - DefaultWebClient(ExchangeFunction exchangeFunction, UriBuilderFactory uriBuilderFactory, + DefaultWebClient(ExchangeFunction exchangeFunction, @Nullable ExchangeFilterFunction filterFunctions, UriBuilderFactory uriBuilderFactory, @Nullable HttpHeaders defaultHeaders, @Nullable MultiValueMap defaultCookies, @Nullable Consumer> defaultRequest, @Nullable Map, Function>> statusHandlerMap, - ObservationRegistry observationRegistry, ClientRequestObservationConvention observationConvention, + ObservationRegistry observationRegistry, @Nullable ClientRequestObservationConvention observationConvention, DefaultWebClientBuilder builder) { this.exchangeFunction = exchangeFunction; + this.filterFunctions = filterFunctions; this.uriBuilderFactory = uriBuilderFactory; this.defaultHeaders = defaultHeaders; this.defaultCookies = defaultCookies; @@ -438,16 +443,21 @@ public Mono exchange() { observation .parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null)) .start(); + ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext); + if (filterFunctions != null) { + filterFunction = filterFunctions.andThen(filterFunction); + } ClientRequest request = requestBuilder.build(); observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null)); observationContext.setRequest(request); - Mono responseMono = exchangeFunction.exchange(request) + Mono responseMono = filterFunction.apply(exchangeFunction) + .exchange(request) .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]") .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR); if (this.contextModifier != null) { responseMono = responseMono.contextWrite(this.contextModifier); } - return responseMono.doOnNext(observationContext::setResponse) + return responseMono .doOnError(observationContext::setError) .doOnCancel(() -> { observationContext.setAborted(true); @@ -718,4 +728,19 @@ public Mono apply(ClientResponse response) { } } + private static class ObservationFilterFunction implements ExchangeFilterFunction { + + private final ClientRequestObservationContext observationContext; + + public ObservationFilterFunction(ClientRequestObservationContext observationContext) { + this.observationContext = observationContext; + } + + @Override + public Mono filter(ClientRequest request, ExchangeFunction next) { + return next.exchange(request) + .doOnNext(this.observationContext::setResponse); + } + } + } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java index a5493f2def10..4321e05abf5c 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java @@ -314,16 +314,17 @@ public WebClient build() { ExchangeFunctions.create(connectorToUse, initExchangeStrategies()) : this.exchangeFunction); - ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream() + ExchangeFilterFunction filterFunctions = (this.filters != null ? this.filters.stream() .reduce(ExchangeFilterFunction::andThen) - .map(filter -> filter.apply(exchange)) - .orElse(exchange) : exchange); + .orElse(null) : null); HttpHeaders defaultHeaders = copyDefaultHeaders(); MultiValueMap defaultCookies = copyDefaultCookies(); - return new DefaultWebClient(filteredExchange, initUriBuilderFactory(), + return new DefaultWebClient(exchange, + filterFunctions, + initUriBuilderFactory(), defaultHeaders, defaultCookies, this.defaultRequest, diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java index 3131cbda3e31..a94a5671b26d 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java @@ -133,6 +133,19 @@ public Mono filter(ClientRequest request, ExchangeFunction chain verifyAndGetRequest(); } + @Test + void recordsObservationWithResponseDetailsWhenFilterFunctionErrors() { + ExchangeFilterFunction errorFunction = (req, next) -> next.exchange(req).then(Mono.error(new IllegalStateException())); + WebClient client = this.builder.filter(errorFunction).build(); + Mono responseMono = client.get().uri("/path").retrieve().bodyToMono(Void.class); + StepVerifier.create(responseMono) + .expectError(IllegalStateException.class) + .verify(Duration.ofSeconds(5)); + assertThatHttpObservation() + .hasLowCardinalityKeyValue("exception", "IllegalStateException") + .hasLowCardinalityKeyValue("status", "200"); + } + private TestObservationRegistryAssert.TestObservationRegistryAssertReturningObservationContextAssert assertThatHttpObservation() { return TestObservationRegistryAssert.assertThat(this.observationRegistry) .hasObservationWithNameEqualTo("http.client.requests").that();