Skip to content

Commit

Permalink
Ensure that client responses are observed when filters fail
Browse files Browse the repository at this point in the history
Prior to this commit, an error thrown by a `ExchangeFilterFunction`
configured on a `WebClient` instance would be recorded as such by the
client observation, but the response details would be missing from the
observation.
All filter functions and the exchange function (performing the HTTP
call) would be merged into a single `ExchangeFunction`; this instance
was instrumented and osberved. As a result, the instrumentation would
only get the error signal returned by the filter function and would not
see the HTTP response even if it was received. This means that the
recorded observation would not have the relevant information for the
HTTP status.

This commit ensures that between the configured `ExchangeFilterFunction`
and the `ExchangeFunction`, an instrumentation `ExchangeFilterFunction`
is inserted. This allows to set the client response to the observation
context, even if a later error signal is thrown by a filter function.

Note that with this change, an error signal sent by a filter function
will be still recorded in the observation.

See gh-30059
  • Loading branch information
bclozel committed Mar 30, 2023
1 parent 8fca258 commit d451d6a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class DefaultWebClient implements WebClient {

private final ExchangeFunction exchangeFunction;

@Nullable
private final ExchangeFilterFunction filterFunctions;

private final UriBuilderFactory uriBuilderFactory;

@Nullable
Expand All @@ -93,19 +96,21 @@ class DefaultWebClient implements WebClient {

private final ObservationRegistry observationRegistry;

@Nullable
private final ClientRequestObservationConvention observationConvention;

private final DefaultWebClientBuilder builder;


DefaultWebClient(ExchangeFunction exchangeFunction, UriBuilderFactory uriBuilderFactory,
DefaultWebClient(ExchangeFunction exchangeFunction, @Nullable ExchangeFilterFunction filterFunctions, UriBuilderFactory uriBuilderFactory,
@Nullable HttpHeaders defaultHeaders, @Nullable MultiValueMap<String, String> defaultCookies,
@Nullable Consumer<RequestHeadersSpec<?>> defaultRequest,
@Nullable Map<Predicate<HttpStatusCode>, Function<ClientResponse, Mono<? extends Throwable>>> statusHandlerMap,
ObservationRegistry observationRegistry, ClientRequestObservationConvention observationConvention,
ObservationRegistry observationRegistry, @Nullable ClientRequestObservationConvention observationConvention,
DefaultWebClientBuilder builder) {

this.exchangeFunction = exchangeFunction;
this.filterFunctions = filterFunctions;
this.uriBuilderFactory = uriBuilderFactory;
this.defaultHeaders = defaultHeaders;
this.defaultCookies = defaultCookies;
Expand Down Expand Up @@ -438,16 +443,21 @@ public Mono<ClientResponse> exchange() {
observation
.parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null))
.start();
ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext);
if (filterFunctions != null) {
filterFunction = filterFunctions.andThen(filterFunction);
}
ClientRequest request = requestBuilder.build();
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
observationContext.setRequest(request);
Mono<ClientResponse> responseMono = exchangeFunction.exchange(request)
Mono<ClientResponse> responseMono = filterFunction.apply(exchangeFunction)
.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
if (this.contextModifier != null) {
responseMono = responseMono.contextWrite(this.contextModifier);
}
return responseMono.doOnNext(observationContext::setResponse)
return responseMono
.doOnError(observationContext::setError)
.doOnCancel(() -> {
observationContext.setAborted(true);
Expand Down Expand Up @@ -718,4 +728,19 @@ public Mono<? extends Throwable> apply(ClientResponse response) {
}
}

private static class ObservationFilterFunction implements ExchangeFilterFunction {

private final ClientRequestObservationContext observationContext;

public ObservationFilterFunction(ClientRequestObservationContext observationContext) {
this.observationContext = observationContext;
}

@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return next.exchange(request)
.doOnNext(this.observationContext::setResponse);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,17 @@ public WebClient build() {
ExchangeFunctions.create(connectorToUse, initExchangeStrategies()) :
this.exchangeFunction);

ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream()
ExchangeFilterFunction filterFunctions = (this.filters != null ? this.filters.stream()
.reduce(ExchangeFilterFunction::andThen)
.map(filter -> filter.apply(exchange))
.orElse(exchange) : exchange);
.orElse(null) : null);

HttpHeaders defaultHeaders = copyDefaultHeaders();

MultiValueMap<String, String> defaultCookies = copyDefaultCookies();

return new DefaultWebClient(filteredExchange, initUriBuilderFactory(),
return new DefaultWebClient(exchange,
filterFunctions,
initUriBuilderFactory(),
defaultHeaders,
defaultCookies,
this.defaultRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction chain
verifyAndGetRequest();
}

@Test
void recordsObservationWithResponseDetailsWhenFilterFunctionErrors() {
ExchangeFilterFunction errorFunction = (req, next) -> next.exchange(req).then(Mono.error(new IllegalStateException()));
WebClient client = this.builder.filter(errorFunction).build();
Mono<Void> responseMono = client.get().uri("/path").retrieve().bodyToMono(Void.class);
StepVerifier.create(responseMono)
.expectError(IllegalStateException.class)
.verify(Duration.ofSeconds(5));
assertThatHttpObservation()
.hasLowCardinalityKeyValue("exception", "IllegalStateException")
.hasLowCardinalityKeyValue("status", "200");
}

private TestObservationRegistryAssert.TestObservationRegistryAssertReturningObservationContextAssert assertThatHttpObservation() {
return TestObservationRegistryAssert.assertThat(this.observationRegistry)
.hasObservationWithNameEqualTo("http.client.requests").that();
Expand Down

0 comments on commit d451d6a

Please sign in to comment.