Skip to content

Commit

Permalink
Fix Bug in Ignoring Response Body (#31952)
Browse files Browse the repository at this point in the history
Fix Bug in Ignoring Response Body
  • Loading branch information
alzimmermsft authored Nov 4, 2022
1 parent 577e3af commit 00e2e72
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.jdk.httpclient.implementation.BodyIgnoringSubscriber;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
Expand All @@ -39,7 +35,6 @@ class JdkHttpClient implements HttpClient {
private static final ClientLogger LOGGER = new ClientLogger(JdkHttpClient.class);
private static final String AZURE_EAGERLY_READ_RESPONSE = "azure-eagerly-read-response";
private static final String AZURE_IGNORE_RESPONSE_BODY = "azure-ignore-response-body";
private static final byte[] IGNORED_BODY = new byte[0];

private final java.net.http.HttpClient jdkHttpClient;

Expand Down Expand Up @@ -67,33 +62,23 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
boolean eagerlyReadResponse = (boolean) context.getData(AZURE_EAGERLY_READ_RESPONSE).orElse(false);
boolean ignoreResponseBody = (boolean) context.getData(AZURE_IGNORE_RESPONSE_BODY).orElse(false);

return Mono.fromCallable(() -> toJdkHttpRequest(request, context))
.flatMap(jdkRequest -> Mono.fromCompletionStage(jdkHttpClient.sendAsync(jdkRequest, ofPublisher()))
.flatMap(jdKResponse -> {
// Ignoring the response body takes precedent over eagerly reading the response body.
// Both should never be true at the same time but this is acts as a safeguard.
if (ignoreResponseBody) {
HttpHeaders headers = fromJdkHttpHeaders(jdKResponse.headers());
int statusCode = jdKResponse.statusCode();

return JdkFlowAdapter.flowPublisherToFlux(jdKResponse.body())
.ignoreElements()
.then(Mono.fromSupplier(() ->
new JdkHttpResponseSync(request, statusCode, headers, IGNORED_BODY)));
}

if (eagerlyReadResponse) {
HttpHeaders headers = fromJdkHttpHeaders(jdKResponse.headers());
int statusCode = jdKResponse.statusCode();

return FluxUtil.collectBytesFromNetworkResponse(JdkFlowAdapter
.flowPublisherToFlux(jdKResponse.body())
.flatMapSequential(Flux::fromIterable), headers)
.map(bytes -> new JdkHttpResponseSync(request, statusCode, headers, bytes));
}

return Mono.just(new JdkHttpResponseAsync(request, jdKResponse));
}));
Mono<java.net.http.HttpRequest> jdkRequestMono = Mono.fromCallable(() -> toJdkHttpRequest(request, context));

if (eagerlyReadResponse || ignoreResponseBody) {
return jdkRequestMono
.flatMap(jdkRequest -> Mono.fromCompletionStage(jdkHttpClient.sendAsync(jdkRequest, ofByteArray())))
.map(jdkResponse -> {
// For now, eagerlyReadResponse and ignoreResponseBody works the same.
HttpHeaders headers = fromJdkHttpHeaders(jdkResponse.headers());
int statusCode = jdkResponse.statusCode();

return new JdkHttpResponseSync(request, statusCode, headers, jdkResponse.body());
});
} else {
return jdkRequestMono
.flatMap(jdkRequest -> Mono.fromCompletionStage(jdkHttpClient.sendAsync(jdkRequest, ofPublisher())))
.map(jdkResponse -> new JdkHttpResponseAsync(request, jdkResponse));
}
}

@Override
Expand All @@ -103,14 +88,15 @@ public HttpResponse sendSync(HttpRequest request, Context context) {

java.net.http.HttpRequest jdkRequest = toJdkHttpRequest(request, context);
try {
// Ignoring the response body takes precedent over eagerly reading the response body.
// Both should never be true at the same time but this is acts as a safeguard.
if (ignoreResponseBody) {
java.net.http.HttpResponse<Void> jdKResponse = jdkHttpClient.send(jdkRequest,
responseInfo -> new BodyIgnoringSubscriber(LOGGER));
return new JdkHttpResponseSync(request, jdKResponse.statusCode(),
fromJdkHttpHeaders(jdKResponse.headers()), IGNORED_BODY);
} else if (eagerlyReadResponse) {
// For now, eagerlyReadResponse and ignoreResponseBody works the same.
// if (ignoreResponseBody) {
// java.net.http.HttpResponse<Void> jdKResponse = jdkHttpClient.send(jdkRequest,
// responseInfo -> new BodyIgnoringSubscriber(LOGGER));
// return new JdkHttpResponseSync(request, jdKResponse.statusCode(),
// fromJdkHttpHeaders(jdKResponse.headers()), IGNORED_BODY);
// }

if (eagerlyReadResponse || ignoreResponseBody) {
java.net.http.HttpResponse<byte[]> jdKResponse = jdkHttpClient.send(jdkRequest, ofByteArray());
return new JdkHttpResponseSync(request, jdKResponse.statusCode(),
fromJdkHttpHeaders(jdKResponse.headers()), jdKResponse.body());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.azure.core.util.Contexts;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
Expand All @@ -52,7 +51,6 @@
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

import static com.azure.core.http.netty.implementation.Utility.closeConnection;
Expand Down Expand Up @@ -259,28 +257,27 @@ private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse
HttpRequest restRequest, boolean disableBufferCopy, boolean eagerlyReadResponse, boolean ignoreResponseBody,
boolean headersEagerlyConverted) {
return (reactorNettyResponse, reactorNettyConnection) -> {
// Ignoring the response body takes precedent over eagerly reading the response body.
// Both should never be true at the same time but this is acts as a safeguard.
if (ignoreResponseBody) {
AtomicBoolean firstNext = new AtomicBoolean(true);
return reactorNettyConnection.inbound().receive()
.doOnNext(ignored -> {
if (!firstNext.compareAndSet(true, false)) {
LOGGER.log(LogLevel.WARNING, () -> "Received HTTP response body when one wasn't expected. "
+ "Response body will be ignored as directed.");
}
})
.ignoreElements()
.doFinally(ignored -> closeConnection(reactorNettyConnection))
.then(Mono.fromSupplier(() -> new NettyAsyncHttpBufferedResponse(reactorNettyResponse, restRequest,
EMPTY_BYTES, headersEagerlyConverted)));
}
// For now, eagerlyReadResponse and ignoreResponseBody works the same.
// if (ignoreResponseBody) {
// AtomicBoolean firstNext = new AtomicBoolean(true);
// return reactorNettyConnection.inbound().receive()
// .doOnNext(ignored -> {
// if (!firstNext.compareAndSet(true, false)) {
// LOGGER.log(LogLevel.WARNING, () -> "Received HTTP response body when one wasn't expected. "
// + "Response body will be ignored as directed.");
// }
// })
// .ignoreElements()
// .doFinally(ignored -> closeConnection(reactorNettyConnection))
// .then(Mono.fromSupplier(() -> new NettyAsyncHttpBufferedResponse(reactorNettyResponse, restRequest,
// EMPTY_BYTES, headersEagerlyConverted)));
// }

/*
* If the response is being eagerly read into memory the flag for buffer copying can be ignored as the
* response MUST be deeply copied to ensure it can safely be used downstream.
*/
if (eagerlyReadResponse) {
if (eagerlyReadResponse || ignoreResponseBody) {
// Set up the body flux and dispose the connection once it has been received.
return reactorNettyConnection.inbound().receive().aggregate().asByteArray()
.doFinally(ignored -> closeConnection(reactorNettyConnection))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.azure.core.util.Contexts;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -205,26 +204,25 @@ private static long getRequestContentLength(BinaryDataContent content, HttpHeade

private static HttpResponse toHttpResponse(HttpRequest request, okhttp3.Response response,
boolean eagerlyReadResponse, boolean ignoreResponseBody, boolean eagerlyConvertHeaders) throws IOException {
// Ignoring the response body takes precedent over eagerly reading the response body.
// Both should never be true at the same time but this is acts as a safeguard.
if (ignoreResponseBody) {
ResponseBody body = response.body();
if (body != null) {
if (body.contentLength() > 0) {
LOGGER.log(LogLevel.WARNING, () -> "Received HTTP response body when one wasn't expected. "
+ "Response body will be ignored as directed.");
}
body.close();
}

return new OkHttpAsyncBufferedResponse(response, request, EMPTY_BODY, eagerlyConvertHeaders);
}
// For now, eagerlyReadResponse and ignoreResponseBody works the same.
// if (ignoreResponseBody) {
// ResponseBody body = response.body();
// if (body != null) {
// if (body.contentLength() > 0) {
// LOGGER.log(LogLevel.WARNING, () -> "Received HTTP response body when one wasn't expected. "
// + "Response body will be ignored as directed.");
// }
// body.close();
// }
//
// return new OkHttpAsyncBufferedResponse(response, request, EMPTY_BODY, eagerlyConvertHeaders);
// }

/*
* Use a buffered response when we are eagerly reading the response from the network and the body isn't
* empty.
*/
if (eagerlyReadResponse) {
if (eagerlyReadResponse || ignoreResponseBody) {
try (ResponseBody body = response.body()) {
byte[] bytes = (body != null) ? body.bytes() : EMPTY_BODY;
return new OkHttpAsyncBufferedResponse(response, request, bytes, eagerlyConvertHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public Mono<HttpResponse> send(HttpRequest request) {
response = new MockHttpResponse(request, statusCode);
} else if (requestPathLower.startsWith("/voideagerreadoom")) {
response = new MockHttpResponse(request, 200);
} else if (requestPathLower.startsWith("/voiderrorreturned")) {
response = new MockHttpResponse(request, 400,
"void exception body thrown".getBytes(StandardCharsets.UTF_8));
}
} else if ("echo.org".equalsIgnoreCase(requestHost)) {
return FluxUtil.collectBytesInByteBufferStream(request.getBody())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2066,10 +2066,6 @@ interface Service28 {
@Head("voideagerreadoom")
@ExpectedResponses({200})
Mono<ResponseBase<Void, Void>> headMonoResponseBaseVoid();

@Head("voideagerreadoom")
@ExpectedResponses({200})
Flux<Void> headFluxVoid();
}

@ParameterizedTest
Expand All @@ -2084,10 +2080,62 @@ private static Stream<Consumer<Service28>> voidDoesNotEagerlyReadResponseSupplie
Service28::headVoid,
Service28::headResponseVoid,
Service28::headResponseBaseVoid,
Service28::headMonoVoid,
Service28::headMonoResponseVoid,
Service28::headMonoResponseBaseVoid,
Service28::headFluxVoid
service28 -> service28.headMonoVoid().block(),
service28 -> service28.headMonoResponseVoid().block(),
service28 -> service28.headMonoResponseBaseVoid().block()
);
}

@Host("http://localhost")
@ServiceInterface(name = "Service29")
interface Service29 {
@Put("voiderrorreturned")
@ExpectedResponses({200})
void headvoid();

@Put("voiderrorreturned")
@ExpectedResponses({200})
Void headVoid();

@Put("voiderrorreturned")
@ExpectedResponses({200})
Response<Void> headResponseVoid();

@Put("voiderrorreturned")
@ExpectedResponses({200})
ResponseBase<Void, Void> headResponseBaseVoid();

@Put("voiderrorreturned")
@ExpectedResponses({200})
Mono<Void> headMonoVoid();

@Put("voiderrorreturned")
@ExpectedResponses({200})
Mono<Response<Void>> headMonoResponseVoid();

@Put("voiderrorreturned")
@ExpectedResponses({200})
Mono<ResponseBase<Void, Void>> headMonoResponseBaseVoid();
}

@ParameterizedTest
@MethodSource("voidErrorReturnsErrorBodySupplier")
public void voidErrorReturnsErrorBody(Consumer<Service29> executable) {
HttpResponseException exception = assertThrows(HttpResponseException.class,
() -> executable.accept(createService(Service29.class)));

assertTrue(exception.getMessage().contains("void exception body thrown"));
}

private static Stream<Consumer<Service29>> voidErrorReturnsErrorBodySupplier() {
return Stream.of(
Service29::headvoid,
Service29::headVoid,
Service29::headResponseVoid,
Service29::headResponseBaseVoid,
service29 -> service29.headMonoVoid().block(),
service29 -> service29.headMonoResponseVoid().block(),
service29 -> service29.headMonoResponseBaseVoid().block()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public static WireMockServer getRestProxyTestsServer() {
server.stubFor(head(urlPathMatching("/voideagerreadoom")).willReturn(aResponse()
.withHeader("Content-Length", "10737418240")));

server.stubFor(put("/voiderrorreturned").willReturn(aResponse()
.withStatus(400)
.withBody("void exception body thrown")));

return server;
}

Expand Down

0 comments on commit 00e2e72

Please sign in to comment.