Skip to content

Commit

Permalink
Progress reporter promoted to core and used in Http Client. (Azure#29495
Browse files Browse the repository at this point in the history
)

* progress reporter.

* contexts

* comment.

* this works.

* progress handlers for http clients.

* vertx.

* fix build.

* changelogs.

* poke ci

* align naming with existing versions.

* functional interface.

* getContext.

* this might come handy.

* some PR feedback.

* remove progress handler after request./

* some feedback.

* add sendSync test.

* wip.

* contexts.

* make it final.

* changelog.

* rename this.

* sample update.

* Update sdk/core/azure-core/src/main/java/com/azure/core/util/ProgressReporter.java

Co-authored-by: Srikanta <[email protected]>

* pr feedback.

* samples in chlog.

* hide keys for now.

Co-authored-by: Srikanta <[email protected]>
  • Loading branch information
2 people authored and kwonus-msft committed Jun 24, 2022
1 parent 4a444eb commit f478529
Show file tree
Hide file tree
Showing 24 changed files with 881 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -59,8 +61,9 @@ public Mono<HttpResponse> send(HttpRequest request) {
@Override
public Mono<HttpResponse> send(HttpRequest request, Context context) {
boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
ProgressReporter progressReporter = Contexts.with(context).getHttpRequestProgressReporter();

return toJdkHttpRequest(request)
return toJdkHttpRequest(request, progressReporter)
.flatMap(jdkRequest -> Mono.fromCompletionStage(jdkHttpClient.sendAsync(jdkRequest, ofPublisher()))
.flatMap(innerResponse -> {
if (eagerlyReadResponse) {
Expand All @@ -83,7 +86,7 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
* @param request the azure-core request
* @return the Mono emitting HttpRequest
*/
private Mono<java.net.http.HttpRequest> toJdkHttpRequest(HttpRequest request) {
private Mono<java.net.http.HttpRequest> toJdkHttpRequest(HttpRequest request, ProgressReporter progressReporter) {
return Mono.fromCallable(() -> {
final java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder();
try {
Expand Down Expand Up @@ -112,7 +115,14 @@ private Mono<java.net.http.HttpRequest> toJdkHttpRequest(HttpRequest request) {
return builder.method("HEAD", noBody()).build();
default:
final String contentLength = request.getHeaders().getValue("content-length");
final BodyPublisher bodyPublisher = toBodyPublisher(request.getBody(), contentLength);
Flux<ByteBuffer> body = request.getBody();
if (progressReporter != null) {
body = body.map(buffer -> {
progressReporter.reportProgress(buffer.remaining());
return buffer;
});
}
final BodyPublisher bodyPublisher = toBodyPublisher(body, contentLength);
return builder.method(request.getHttpMethod().toString(), bodyPublisher).build();
}
});
Expand Down
10 changes: 10 additions & 0 deletions sdk/core/azure-core-http-netty/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

### Features Added

- Added ability to track progress by passing `ProgressReporter` in the `Context`. For example:
```java
HttpClient httpClient = new NettyAsyncHttpClientBuilder().build();
ProgressReporter progressReporter = ProgressReporter.withProgressListener(progress -> System.out.println(progress));
Context context = Contexts.empty().setHttpRequestProgressReporter(progressReporter).getContext();
HttpRequest request = new HttpRequest(
HttpMethod.PUT, new URL("http://example.com"), new HttpHeaders(), BinaryData.fromString("sample body"))
httpClient.send(request, context).subscribe();
```

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.http.netty.implementation.NettyAsyncHttpResponse;
import com.azure.core.http.netty.implementation.NettyToAzureCoreHttpHeadersWrapper;
import com.azure.core.http.netty.implementation.ReadTimeoutHandler;
import com.azure.core.http.netty.implementation.RequestProgressReportingHandler;
import com.azure.core.http.netty.implementation.ResponseTimeoutHandler;
import com.azure.core.http.netty.implementation.WriteTimeoutHandler;
import com.azure.core.implementation.util.BinaryDataContent;
Expand All @@ -23,7 +24,9 @@
import com.azure.core.implementation.util.StringContent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -114,8 +117,8 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
.orElse(this.responseTimeout);

return nettyClient
.doOnRequest((r, connection) -> addWriteTimeoutHandler(connection, writeTimeout))
.doAfterRequest((r, connection) -> addResponseTimeoutHandler(connection, effectiveResponseTimeout))
.doOnRequest((r, connection) -> addRequestHandlers(connection, context))
.doAfterRequest((r, connection) -> doAfterRequest(connection, effectiveResponseTimeout))
.doOnResponse((response, connection) -> addReadTimeoutHandler(connection, readTimeout))
.doAfterResponseSuccess((response, connection) -> removeReadTimeoutHandler(connection))
.request(HttpMethod.valueOf(request.getHttpMethod().toString()))
Expand Down Expand Up @@ -282,19 +285,30 @@ private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse
}

/*
* Adds write timeout handler once the request is ready to begin sending.
* Adds request handlers:
* - write timeout handler once the request is ready to begin sending.
* - progress handler if progress tracking has been requested.
*/
private static void addWriteTimeoutHandler(Connection connection, long timeoutMillis) {
connection.addHandlerLast(WriteTimeoutHandler.HANDLER_NAME, new WriteTimeoutHandler(timeoutMillis));
private void addRequestHandlers(Connection connection, Context context) {
connection.addHandlerLast(WriteTimeoutHandler.HANDLER_NAME, new WriteTimeoutHandler(writeTimeout));
ProgressReporter progressReporter = Contexts.with(context).getHttpRequestProgressReporter();
connection.removeHandler(RequestProgressReportingHandler.HANDLER_NAME);
if (progressReporter != null) {
connection.addHandlerLast(
RequestProgressReportingHandler.HANDLER_NAME, new RequestProgressReportingHandler(progressReporter));
}
}

/*
* Remove write timeout handler from the connection as the request has finished sending, then add response timeout
* After request has been sent:
* - Remove Progress Handler
* - Remove write timeout handler from the connection as the request has finished sending, then add response timeout
* handler.
*/
private static void addResponseTimeoutHandler(Connection connection, long timeoutMillis) {
private static void doAfterRequest(Connection connection, long responseTimeoutMillis) {
connection.removeHandler(RequestProgressReportingHandler.HANDLER_NAME);
connection.removeHandler(WriteTimeoutHandler.HANDLER_NAME)
.addHandlerLast(ResponseTimeoutHandler.HANDLER_NAME, new ResponseTimeoutHandler(timeoutMillis));
.addHandlerLast(ResponseTimeoutHandler.HANDLER_NAME, new ResponseTimeoutHandler(responseTimeoutMillis));
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.netty.implementation;

import com.azure.core.util.ProgressReporter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;

import java.util.Objects;

public final class RequestProgressReportingHandler extends ChannelOutboundHandlerAdapter {
/**
* Name of the handler when it is added into a ChannelPipeline.
*/
public static final String HANDLER_NAME = "azureRequestProgressHandler";

private final ProgressReporter progressReporter;

public RequestProgressReportingHandler(ProgressReporter progressReporter) {
this.progressReporter = Objects.requireNonNull(progressReporter, "'progressReporter' must not be null");
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {

if (msg instanceof ByteBuf) {
progressReporter.reportProgress(((ByteBuf) msg).readableBytes());
} else if (msg instanceof ByteBufHolder) {
progressReporter.reportProgress(((ByteBufHolder) msg).content().readableBytes());
} else if (msg instanceof FileRegion) {
progressReporter.reportProgress(((FileRegion) msg).count());
}

ctx.write(msg, promise.unvoid());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.netty;

import com.azure.core.http.HttpClient;
import com.azure.core.test.HttpClientTestsWireMockServer;
import com.azure.core.test.http.HttpClientTests;
import com.github.tomakehurst.wiremock.WireMockServer;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import javax.net.ssl.SSLException;

/**
* Reactor Netty {@link HttpClientTests} with https.
* Some request logic branches out if it's https like file uploads.
*/
public class NettyAsyncHttpClientHttpClientWithHttpsTests extends HttpClientTests {
private static WireMockServer server;

private static final HttpClient HTTP_CLIENT_INSTANCE;

static {
try {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();

reactor.netty.http.client.HttpClient nettyHttpClient =
reactor.netty.http.client.HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));

HTTP_CLIENT_INSTANCE = new NettyAsyncHttpClientBuilder(nettyHttpClient).build();
} catch (SSLException e) {
throw new RuntimeException(e);
}
}

@BeforeAll
public static void getWireMockServer() {
server = HttpClientTestsWireMockServer.getHttpClientTestsServer();
server.start();
}

@AfterAll
public static void shutdownWireMockServer() {
if (server != null) {
server.shutdown();
}
}

@Override
protected int getWireMockPort() {
return server.httpsPort();
}

@Override
protected boolean isSecure() {
return true;
}

@Override
protected HttpClient createHttpClient() {
return HTTP_CLIENT_INSTANCE;
}
}
10 changes: 10 additions & 0 deletions sdk/core/azure-core-http-okhttp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

### Features Added

- Added ability to track progress by passing `ProgressReporter` in the `Context`. For example:
```java
HttpClient httpClient = new OkHttpAsyncHttpClientBuilder().build();
ProgressReporter progressReporter = ProgressReporter.withProgressListener(progress -> System.out.println(progress));
Context context = Contexts.empty().setHttpRequestProgressReporter(progressReporter).getContext();
HttpRequest request = new HttpRequest(
HttpMethod.PUT, new URL("http://example.com"), new HttpHeaders(), BinaryData.fromString("sample body"))
httpClient.send(request, context).subscribe();
```

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.core.http.okhttp.implementation.OkHttpFileRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpFluxRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpInputStreamRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpProgressReportingRequestBody;
import com.azure.core.implementation.util.BinaryDataContent;
import com.azure.core.implementation.util.BinaryDataHelper;
import com.azure.core.implementation.util.ByteArrayContent;
Expand All @@ -23,6 +24,8 @@
import com.azure.core.implementation.util.StringContent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.ProgressReporter;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -56,6 +59,7 @@ public Mono<HttpResponse> send(HttpRequest request) {
@Override
public Mono<HttpResponse> send(HttpRequest request, Context context) {
boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
ProgressReporter progressReporter = Contexts.with(context).getHttpRequestProgressReporter();

return Mono.create(sink -> sink.onRequest(value -> {
// Using MonoSink::onRequest for back pressure support.
Expand All @@ -70,7 +74,7 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
// 3. If Flux<ByteBuffer> asynchronous then subscribe does not block caller thread
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r)
//
toOkHttpRequest(request).subscribe(okHttpRequest -> {
toOkHttpRequest(request, progressReporter).subscribe(okHttpRequest -> {
try {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
Expand All @@ -86,9 +90,10 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
* Converts the given azure-core request to okhttp request.
*
* @param request the azure-core request
* @param progressReporter the {@link ProgressReporter}. Can be null.
* @return the Mono emitting okhttp request
*/
private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request, ProgressReporter progressReporter) {
Request.Builder requestBuilder = new Request.Builder()
.url(request.getUrl());

Expand All @@ -107,8 +112,13 @@ private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
}

return toOkHttpRequestBody(request.getBodyAsBinaryData(), request.getHeaders())
.map(okhttpRequestBody -> requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build());
.map(okhttpRequestBody -> {
if (progressReporter != null) {
okhttpRequestBody = new OkHttpProgressReportingRequestBody(okhttpRequestBody, progressReporter);
}
return requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build();
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.okhttp.implementation;

import com.azure.core.util.ProgressReporter;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.Buffer;
import okio.BufferedSink;
import okio.ForwardingSink;
import okio.Okio;
import okio.Sink;

import java.io.IOException;
import java.util.Objects;

/**
* An {@link okhttp3.RequestBody} subtype that adds progress
* reporting functionality on top of other {@link okhttp3.RequestBody}.
*/
public final class OkHttpProgressReportingRequestBody extends RequestBody {
private final RequestBody delegate;
private final ProgressReporter progressReporter;

public OkHttpProgressReportingRequestBody(RequestBody delegate, ProgressReporter progressReporter) {
this.delegate = Objects.requireNonNull(delegate, "'delegate' must not be null");
this.progressReporter = Objects.requireNonNull(progressReporter, "'progressReporter' must not be null");
}

@Override
public MediaType contentType() {
return delegate.contentType();
}

@Override
public long contentLength() throws IOException {
return delegate.contentLength();
}

@Override
public boolean isOneShot() {
return delegate.isOneShot();
}

@Override
public boolean isDuplex() {
return delegate.isDuplex();
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
BufferedSink bufferedSink;

CountingSink countingSink = new CountingSink(sink, progressReporter);
bufferedSink = Okio.buffer(countingSink);

delegate.writeTo(bufferedSink);

bufferedSink.flush();
}

private static final class CountingSink extends ForwardingSink {
private final ProgressReporter progressReporter;

CountingSink(Sink delegate, ProgressReporter progressReporter) {
super(delegate);
this.progressReporter = progressReporter;
}

@Override
public void write(Buffer source, long byteCount) throws IOException {
super.write(source, byteCount);
progressReporter.reportProgress(byteCount);
}
}
}
Loading

0 comments on commit f478529

Please sign in to comment.