Skip to content

Commit

Permalink
Expose id from ClientHttpResponse
Browse files Browse the repository at this point in the history
Closes gh-26656
  • Loading branch information
rstoyanchev committed Mar 10, 2021
1 parent 2b017fe commit 55aa8e9
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ResponseCookie;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;

/**
* Represents a client-side reactive HTTP response.
Expand All @@ -30,6 +31,15 @@
*/
public interface ClientHttpResponse extends ReactiveHttpInputMessage {

/**
* Return an id that represents the underlying connection, if available,
* or the request for the purpose of correlating log messages.
* @since 5.3.5
*/
default String getId() {
return ObjectUtils.getIdentityHexString(this);
}

/**
* Return the HTTP status code as an {@link HttpStatus} enum value.
* @return the HTTP status as an HttpStatus enum value (never {@code null})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,11 @@ public ClientHttpResponse getDelegate() {

// ClientHttpResponse delegation methods...

@Override
public String getId() {
return this.delegate.getId();
}

@Override
public HttpStatus getStatusCode() {
return this.delegate.getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,9 +37,11 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;

/**
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
Expand All @@ -51,6 +53,11 @@
*/
class ReactorClientHttpResponse implements ClientHttpResponse {

/** Reactor Netty 1.0.5+. */
static final boolean reactorNettyRequestChannelOperationsIdPresent = ClassUtils.isPresent(
"reactor.netty.ChannelOperationsId", ReactorClientHttpResponse.class.getClassLoader());


private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class);

private final HttpClientResponse response;
Expand All @@ -64,8 +71,6 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
private final AtomicInteger state = new AtomicInteger();

private final String logPrefix;


/**
* Constructor that matches the inputs from
Expand All @@ -78,7 +83,6 @@ public ReactorClientHttpResponse(HttpClientResponse response, Connection connect
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
this.inbound = connection.inbound();
this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : "");
}

/**
Expand All @@ -92,10 +96,21 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
this.inbound = inbound;
this.bufferFactory = new NettyDataBufferFactory(alloc);
this.logPrefix = "";
}


@Override
public String getId() {
String id = null;
if (reactorNettyRequestChannelOperationsIdPresent) {
id = ChannelOperationsIdHelper.getId(this.response);
}
if (id == null && this.response instanceof Connection) {
id = ((Connection) this.response).channel().id().asShortText();
}
return (id != null ? id : ObjectUtils.getIdentityHexString(this));
}

@Override
public Flux<DataBuffer> getBody() {
return this.inbound.receive()
Expand Down Expand Up @@ -167,7 +182,7 @@ private static String getSameSite(Cookie cookie) {
void releaseAfterCancel(HttpMethod method) {
if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) {
if (logger.isDebugEnabled()) {
logger.debug(this.logPrefix + "Releasing body, not yet subscribed.");
logger.debug("[" + getId() + "]" + "Releasing body, not yet subscribed.");
}
this.inbound.receive().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {});
}
Expand All @@ -186,4 +201,18 @@ public String toString() {
"status=" + getRawStatusCode() + '}';
}


private static class ChannelOperationsIdHelper {

@Nullable
public static String getId(HttpClientResponse response) {
if (response instanceof reactor.netty.ChannelOperationsId) {
return (logger.isDebugEnabled() ?
((reactor.netty.ChannelOperationsId) response).asLongText() :
((reactor.netty.ChannelOperationsId) response).asShortText());
}
return null;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,14 +98,14 @@ public Mono<ClientResponse> exchange(ClientRequest clientRequest) {
Assert.notNull(clientRequest, "ClientRequest must not be null");
HttpMethod httpMethod = clientRequest.method();
URI url = clientRequest.url();
String logPrefix = clientRequest.logPrefix();

return this.connector
.connect(httpMethod, url, httpRequest -> clientRequest.writeTo(httpRequest, this.strategies))
.doOnRequest(n -> logRequest(clientRequest))
.doOnCancel(() -> logger.debug(logPrefix + "Cancel signal (to close connection)"))
.doOnCancel(() -> logger.debug(clientRequest.logPrefix() + "Cancel signal (to close connection)"))
.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, t -> wrapException(t, clientRequest))
.map(httpResponse -> {
String logPrefix = getLogPrefix(clientRequest, httpResponse);
logResponse(httpResponse, logPrefix);
return new DefaultClientResponse(
httpResponse, this.strategies, logPrefix, httpMethod.name() + " " + url,
Expand All @@ -120,6 +120,10 @@ private void logRequest(ClientRequest request) {
);
}

private String getLogPrefix(ClientRequest request, ClientHttpResponse response) {
return request.logPrefix() + "[" + response.getId() + "] ";
}

private void logResponse(ClientHttpResponse response, String logPrefix) {
LogFormatUtils.traceDebug(logger, traceOn -> {
int code = response.getRawStatusCode();
Expand Down

0 comments on commit 55aa8e9

Please sign in to comment.