Skip to content

Commit

Permalink
Implement sendSync in OkHttpClient (#29601)
Browse files Browse the repository at this point in the history
* simplify request body creation.

* sync okhttp client.

* test buffered responses.

* chlog.

* body's closeable.
  • Loading branch information
kasobol-msft authored Jun 23, 2022
1 parent edcf468 commit d69610b
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 57 deletions.
1 change: 1 addition & 0 deletions sdk/core/azure-core-http-okhttp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
### Bugs Fixed

### Other Changes
- Added synchronous implementation of `HttpClient.sendSync`.

## 1.10.1 (2022-06-03)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,28 @@
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Objects;

/**
* HttpClient implementation for OkHttp.
*/
class OkHttpAsyncHttpClient implements HttpClient {

private static final Mono<RequestBody> EMPTY_REQUEST_BODY_MONO = Mono.just(RequestBody.create(new byte[0]));
private static final ClientLogger LOGGER = new ClientLogger(OkHttpAsyncHttpClient.class);
private static final RequestBody EMPTY_REQUEST_BODY = RequestBody.create(new byte[0]);

final OkHttpClient httpClient;

Expand Down Expand Up @@ -74,26 +78,41 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
// 3. If Flux<ByteBuffer> asynchronous then subscribe does not block caller thread
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r)
//
toOkHttpRequest(request, progressReporter).subscribe(okHttpRequest -> {
try {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
sink.onCancel(call::cancel);
} catch (Exception ex) {
sink.error(ex);
}
}, sink::error);
Mono.fromCallable(() -> toOkHttpRequest(request, progressReporter))
.subscribe(okHttpRequest -> {
try {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
sink.onCancel(call::cancel);
} catch (Exception ex) {
sink.error(ex);
}
}, sink::error);
}));
}

@Override
public HttpResponse sendSync(HttpRequest request, Context context) {
boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
ProgressReporter progressReporter = Contexts.with(context).getHttpRequestProgressReporter();

Request okHttpRequest = toOkHttpRequest(request, progressReporter);
try {
Response okHttpResponse = httpClient.newCall(okHttpRequest).execute();
return toHttpResponse(request, okHttpResponse, eagerlyReadResponse);
} catch (IOException e) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
}
}

/**
* Converts the given azure-core request to okhttp request.
*
* @param request the azure-core request
* @param progressReporter the {@link ProgressReporter}. Can be null.
* @return the Mono emitting okhttp request
* @return the okhttp request
*/
private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request, ProgressReporter progressReporter) {
private okhttp3.Request toOkHttpRequest(HttpRequest request, ProgressReporter progressReporter) {
Request.Builder requestBuilder = new Request.Builder()
.url(request.getUrl());

Expand All @@ -106,19 +125,17 @@ private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request, ProgressRepor
}

if (request.getHttpMethod() == HttpMethod.GET) {
return Mono.just(requestBuilder.get().build());
return requestBuilder.get().build();
} else if (request.getHttpMethod() == HttpMethod.HEAD) {
return Mono.just(requestBuilder.head().build());
return requestBuilder.head().build();
}

return toOkHttpRequestBody(request.getBodyAsBinaryData(), request.getHeaders())
.map(okhttpRequestBody -> {
if (progressReporter != null) {
okhttpRequestBody = new OkHttpProgressReportingRequestBody(okhttpRequestBody, progressReporter);
}
return requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build();
});
RequestBody okHttpRequestBody = toOkHttpRequestBody(request.getBodyAsBinaryData(), request.getHeaders());
if (progressReporter != null) {
okHttpRequestBody = new OkHttpProgressReportingRequestBody(okHttpRequestBody, progressReporter);
}
return requestBuilder.method(request.getHttpMethod().toString(), okHttpRequestBody)
.build();
}

/**
Expand All @@ -128,34 +145,33 @@ private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request, ProgressRepor
* @param headers the headers associated with the original request
* @return the Mono emitting okhttp request
*/
private Mono<RequestBody> toOkHttpRequestBody(BinaryData bodyContent, HttpHeaders headers) {
String contentType = headers.getValue("Content-Type");
MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType);

private RequestBody toOkHttpRequestBody(BinaryData bodyContent, HttpHeaders headers) {
if (bodyContent == null) {
return EMPTY_REQUEST_BODY_MONO;
return EMPTY_REQUEST_BODY;
}

String contentType = headers.getValue("Content-Type");
MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType);

BinaryDataContent content = BinaryDataHelper.getContent(bodyContent);

if (content instanceof ByteArrayContent) {
return Mono.just(RequestBody.create(content.toBytes(), mediaType));
} else if (content instanceof StringContent
if (content instanceof ByteArrayContent
|| content instanceof StringContent
|| content instanceof SerializableContent) {
return Mono.fromCallable(() -> RequestBody.create(content.toBytes(), mediaType));
return RequestBody.create(content.toBytes(), mediaType);
} else {
long effectiveContentLength = getRequestContentLength(content, headers);
if (content instanceof InputStreamContent) {
// The OkHttpInputStreamRequestBody doesn't read bytes until it's triggered by OkHttp dispatcher.
return Mono.just(new OkHttpInputStreamRequestBody(
(InputStreamContent) content, effectiveContentLength, mediaType));
return new OkHttpInputStreamRequestBody(
(InputStreamContent) content, effectiveContentLength, mediaType);
} else if (content instanceof FileContent) {
// The OkHttpFileRequestBody doesn't read bytes until it's triggered by OkHttp dispatcher.
return Mono.just(new OkHttpFileRequestBody((FileContent) content, effectiveContentLength, mediaType));
return new OkHttpFileRequestBody((FileContent) content, effectiveContentLength, mediaType);
} else {
// The OkHttpFluxRequestBody doesn't read bytes until it's triggered by OkHttp dispatcher.
return Mono.just(new OkHttpFluxRequestBody(
content, effectiveContentLength, mediaType, httpClient.callTimeoutMillis()));
return new OkHttpFluxRequestBody(
content, effectiveContentLength, mediaType, httpClient.callTimeoutMillis());
}
}
}
Expand All @@ -174,6 +190,27 @@ private static long getRequestContentLength(BinaryDataContent content, HttpHeade
return contentLength;
}

private static HttpResponse toHttpResponse(
HttpRequest request, okhttp3.Response response, boolean eagerlyReadResponse) throws IOException {
/*
* Use a buffered response when we are eagerly reading the response from the network and the body isn't
* empty.
*/
if (eagerlyReadResponse) {
try (ResponseBody body = response.body()) {
if (Objects.nonNull(body)) {
byte[] bytes = body.bytes();
return new OkHttpAsyncBufferedResponse(response, request, bytes);
} else {
// Body is null, use the non-buffering response.
return new OkHttpAsyncResponse(response, request);
}
}
} else {
return new OkHttpAsyncResponse(response, request);
}
}

private static class OkHttpCallback implements okhttp3.Callback {
private final MonoSink<HttpResponse> sink;
private final HttpRequest request;
Expand All @@ -200,27 +237,12 @@ public void onFailure(okhttp3.Call call, IOException e) {
@SuppressWarnings("NullableProblems")
@Override
public void onResponse(okhttp3.Call call, okhttp3.Response response) {
/*
* Use a buffered response when we are eagerly reading the response from the network and the body isn't
* empty.
*/
if (eagerlyReadResponse) {
ResponseBody body = response.body();
if (Objects.nonNull(body)) {
try {
byte[] bytes = body.bytes();
body.close();
sink.success(new OkHttpAsyncBufferedResponse(response, request, bytes));
} catch (IOException ex) {
// Reading the body bytes may cause an IOException, if it happens propagate it.
sink.error(ex);
}
} else {
// Body is null, use the non-buffering response.
sink.success(new OkHttpAsyncResponse(response, request));
}
} else {
sink.success(new OkHttpAsyncResponse(response, request));
try {
HttpResponse httpResponse = toHttpResponse(request, response, eagerlyReadResponse);
sink.success(httpResponse);
} catch (IOException ex) {
// Reading the body bytes may cause an IOException, if it happens propagate it.
sink.error(ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.http.vertx.implementation;

import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -39,4 +40,9 @@ public Mono<byte[]> getBodyAsByteArray() {
return Mono.just(this.body.getBytes());
});
}

@Override
public HttpResponse buffer() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;

/**
* Generic test suite for {@link HttpClient HttpClients}.
Expand Down Expand Up @@ -242,6 +243,64 @@ public void bomWithDifferentHeader() {
assertEquals(expected, actual);
}

/**
* Tests that client returns buffered response if requested via azure-eagerly-read-response Context flag.
*/
@SyncAsyncTest
public void shouldBufferResponse() {
HttpRequest request = new HttpRequest(
HttpMethod.PUT,
getRequestUrl(ECHO_RESPONSE),
new HttpHeaders(),
BinaryData.fromString("test body"));

Context context = Context.NONE.addData("azure-eagerly-read-response", true);

HttpResponse response = SyncAsyncExtension.execute(
() -> createHttpClient().sendSync(request, context),
() -> createHttpClient().send(request, context)
);

// Buffering buffered response is identity transformation.
HttpResponse bufferedResponse = response.buffer();
assertSame(response, bufferedResponse);
}

/**
* Tests that buffered response is indeed buffered, i.e. content can be accessed many times.
*/
@SyncAsyncTest
public void bufferedResponseCanBeReadMultipleTimes() {
BinaryData requestBody = BinaryData.fromString("test body");
HttpRequest request = new HttpRequest(
HttpMethod.PUT,
getRequestUrl(ECHO_RESPONSE),
new HttpHeaders(),
requestBody);

Context context = Context.NONE.addData("azure-eagerly-read-response", true);

HttpResponse response = SyncAsyncExtension.execute(
() -> createHttpClient().sendSync(request, context),
() -> createHttpClient().send(request, context)
);

// Read response twice using all accessors.
assertEquals(requestBody.toString(), response.getBodyAsString().block());
assertEquals(requestBody.toString(), response.getBodyAsString().block());

assertArrayEquals(requestBody.toBytes(), response.getBodyAsByteArray().block());
assertArrayEquals(requestBody.toBytes(), response.getBodyAsByteArray().block());

assertArrayEquals(requestBody.toBytes(), response.getBodyAsInputStream()
.map(s -> BinaryData.fromStream(s).toBytes()).block());
assertArrayEquals(requestBody.toBytes(), response.getBodyAsInputStream()
.map(s -> BinaryData.fromStream(s).toBytes()).block());

assertArrayEquals(requestBody.toBytes(), BinaryData.fromFlux(response.getBody()).map(BinaryData::toBytes).block());
assertArrayEquals(requestBody.toBytes(), BinaryData.fromFlux(response.getBody()).map(BinaryData::toBytes).block());
}

/**
* Tests that send random bytes in various forms to an endpoint that echoes bytes back to sender.
* @param requestBody The BinaryData that contains random bytes.
Expand Down

0 comments on commit d69610b

Please sign in to comment.