Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress reporter promoted to core and used in Http Client. #29495

Merged
merged 29 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a6c25be
progress reporter.
kasobol-msft Jun 15, 2022
ce1a37c
contexts
kasobol-msft Jun 15, 2022
528a324
comment.
kasobol-msft Jun 15, 2022
38a4292
this works.
kasobol-msft Jun 15, 2022
cb2275b
progress handlers for http clients.
kasobol-msft Jun 15, 2022
7f40ad0
vertx.
kasobol-msft Jun 15, 2022
e828a3b
fix build.
kasobol-msft Jun 15, 2022
2e84598
changelogs.
kasobol-msft Jun 15, 2022
1940661
poke ci
kasobol-msft Jun 15, 2022
3f585e3
align naming with existing versions.
kasobol-msft Jun 16, 2022
36f4d3c
functional interface.
kasobol-msft Jun 16, 2022
a323065
getContext.
kasobol-msft Jun 16, 2022
559444b
this might come handy.
kasobol-msft Jun 16, 2022
b2308ea
some PR feedback.
kasobol-msft Jun 16, 2022
6eae4f5
remove progress handler after request./
kasobol-msft Jun 16, 2022
054a1b8
Merge remote-tracking branch 'upstream/main' into progress-reporter
kasobol-msft Jun 16, 2022
5f779a8
some feedback.
kasobol-msft Jun 16, 2022
551bef7
Merge branch 'main' into progress-reporter
kasobol-msft Jun 17, 2022
7affa8f
add sendSync test.
kasobol-msft Jun 17, 2022
c0b305f
wip.
kasobol-msft Jun 21, 2022
f5b0647
contexts.
kasobol-msft Jun 21, 2022
64e33ac
make it final.
kasobol-msft Jun 21, 2022
3c5e054
changelog.
kasobol-msft Jun 21, 2022
c9ec937
rename this.
kasobol-msft Jun 21, 2022
a37ef28
sample update.
kasobol-msft Jun 21, 2022
bc331bc
Update sdk/core/azure-core/src/main/java/com/azure/core/util/Progress…
kasobol-msft Jun 21, 2022
bfdd5b1
pr feedback.
kasobol-msft Jun 21, 2022
3bc97cd
samples in chlog.
kasobol-msft Jun 21, 2022
67837d1
hide keys for now.
kasobol-msft Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -59,8 +61,9 @@ public Mono<HttpResponse> send(HttpRequest request) {
@Override
public Mono<HttpResponse> send(HttpRequest request, Context context) {
boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
ProgressReporter progressReporter = Contexts.with(context).getProgressReporter();

return toJdkHttpRequest(request)
return toJdkHttpRequest(request, progressReporter)
.flatMap(jdkRequest -> Mono.fromCompletionStage(jdkHttpClient.sendAsync(jdkRequest, ofPublisher()))
.flatMap(innerResponse -> {
if (eagerlyReadResponse) {
Expand All @@ -83,7 +86,7 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
* @param request the azure-core request
* @return the Mono emitting HttpRequest
*/
private Mono<java.net.http.HttpRequest> toJdkHttpRequest(HttpRequest request) {
private Mono<java.net.http.HttpRequest> toJdkHttpRequest(HttpRequest request, ProgressReporter progressReporter) {
return Mono.fromCallable(() -> {
final java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder();
try {
Expand Down Expand Up @@ -112,7 +115,14 @@ private Mono<java.net.http.HttpRequest> toJdkHttpRequest(HttpRequest request) {
return builder.method("HEAD", noBody()).build();
default:
final String contentLength = request.getHeaders().getValue("content-length");
final BodyPublisher bodyPublisher = toBodyPublisher(request.getBody(), contentLength);
Flux<ByteBuffer> body = request.getBody();
if (progressReporter != null) {
body = body.map(buffer -> {
progressReporter.reportProgress(buffer.remaining());
return buffer;
});
}
final BodyPublisher bodyPublisher = toBodyPublisher(body, contentLength);
return builder.method(request.getHttpMethod().toString(), bodyPublisher).build();
}
});
Expand Down
4 changes: 4 additions & 0 deletions sdk/core/azure-core-http-netty/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Features Added

- Added ability to track progress by passing `ProgressReporter` in the `Context`.
I.e., `Contexts.with(context).setProgressReporter(progressReporter)`
before calling `HttpClient.send(HttpRequest, Context)` API.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. s/ProgressReporter/ProgressListener
  2. Just to be clear - the user would need to getContext() and pass that in, rather than expect to mutate the existing Context passed in to the with method with the additional progressReporter. I sat here for a second thinking "do we have two different usage patterns here?" before concluding that the naive (and incorrect, but only with knowledge about how Context works) interpretation of the code sample above would be to do the following:
Context ctx = Context.NONE;
Contexts.with(ctx).setProgressListener(progressListener);
httpClient.send(request, ctx);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.http.netty.implementation.NettyAsyncHttpResponse;
import com.azure.core.http.netty.implementation.NettyToAzureCoreHttpHeadersWrapper;
import com.azure.core.http.netty.implementation.ReadTimeoutHandler;
import com.azure.core.http.netty.implementation.RequestProgressReportingHandler;
import com.azure.core.http.netty.implementation.ResponseTimeoutHandler;
import com.azure.core.http.netty.implementation.WriteTimeoutHandler;
import com.azure.core.implementation.util.BinaryDataContent;
Expand All @@ -23,7 +24,9 @@
import com.azure.core.implementation.util.StringContent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.logging.ClientLogger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -114,8 +117,8 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
.orElse(this.responseTimeout);

return nettyClient
.doOnRequest((r, connection) -> addWriteTimeoutHandler(connection, writeTimeout))
.doAfterRequest((r, connection) -> addResponseTimeoutHandler(connection, effectiveResponseTimeout))
.doOnRequest((r, connection) -> addRequestHandlers(connection, context))
.doAfterRequest((r, connection) -> doAfterRequest(connection, effectiveResponseTimeout))
.doOnResponse((response, connection) -> addReadTimeoutHandler(connection, readTimeout))
.doAfterResponseSuccess((response, connection) -> removeReadTimeoutHandler(connection))
.request(HttpMethod.valueOf(request.getHttpMethod().toString()))
Expand Down Expand Up @@ -282,19 +285,30 @@ private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse
}

/*
* Adds write timeout handler once the request is ready to begin sending.
* Adds request handlers:
* - write timeout handler once the request is ready to begin sending.
* - progress handler if progress tracking has been requested.
*/
private static void addWriteTimeoutHandler(Connection connection, long timeoutMillis) {
connection.addHandlerLast(WriteTimeoutHandler.HANDLER_NAME, new WriteTimeoutHandler(timeoutMillis));
private void addRequestHandlers(Connection connection, Context context) {
connection.addHandlerLast(WriteTimeoutHandler.HANDLER_NAME, new WriteTimeoutHandler(writeTimeout));
ProgressReporter progressReporter = Contexts.with(context).getProgressReporter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clarify what determines if something that is user-settable via a Context becomes elevated to be an API on Contexts? For example, I see above code that gets the AZURE_RESPONSE_TIMEOUT and AZURE_EAGERLY_READ_RESPONSE values out of Context. At what stage do we do this elevation, and then, does progress reporter meet that criteria?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that stuff that has to cross core-sdks boundary is a candidate where HLC logic has interest in orchestrating it.
Especially when there's N:1 relationship of interest, i.e. N SDKS need to use it with core (core is 1 here).
For example:

  • progress reporting is present in 3 storage services plus comms. We should not use magic key for this.
  • The AZURE_EAGERLY_READ_RESPONSE is a contract between RestProxy and HttpClients. Doesn't cross core boundary. Shouldn't be API
  • AZURE_RESPONSE_TIMEOUT (or some form of timeout tbd). We have sync APIs that take Duration timeout in storage, tables. This needs to (plus-minus math transformations in retry policy) travel across sdks-core boundary. This is a good candidate for API.

connection.removeHandler(RequestProgressReportingHandler.HANDLER_NAME);
alzimmermsft marked this conversation as resolved.
Show resolved Hide resolved
if (progressReporter != null) {
connection.addHandlerLast(
alzimmermsft marked this conversation as resolved.
Show resolved Hide resolved
RequestProgressReportingHandler.HANDLER_NAME, new RequestProgressReportingHandler(progressReporter));
}
}

/*
* Remove write timeout handler from the connection as the request has finished sending, then add response timeout
* After request has been sent:
* - Remove Progress Handler
* - Remove write timeout handler from the connection as the request has finished sending, then add response timeout
* handler.
*/
private static void addResponseTimeoutHandler(Connection connection, long timeoutMillis) {
private static void doAfterRequest(Connection connection, long responseTimeoutMillis) {
connection.removeHandler(RequestProgressReportingHandler.HANDLER_NAME);
connection.removeHandler(WriteTimeoutHandler.HANDLER_NAME)
.addHandlerLast(ResponseTimeoutHandler.HANDLER_NAME, new ResponseTimeoutHandler(timeoutMillis));
.addHandlerLast(ResponseTimeoutHandler.HANDLER_NAME, new ResponseTimeoutHandler(responseTimeoutMillis));
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.netty.implementation;

import com.azure.core.util.ProgressReporter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;

import java.util.Objects;

public final class RequestProgressReportingHandler extends ChannelOutboundHandlerAdapter {
/**
* Name of the handler when it is added into a ChannelPipeline.
*/
public static final String HANDLER_NAME = "azureRequestProgressHandler";

private final ProgressReporter progressReporter;

public RequestProgressReportingHandler(ProgressReporter progressReporter) {
this.progressReporter = Objects.requireNonNull(progressReporter, "'progressReporter' must not be null");
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {

if (msg instanceof ByteBuf) {
progressReporter.reportProgress(((ByteBuf) msg).readableBytes());
} else if (msg instanceof ByteBufHolder) {
progressReporter.reportProgress(((ByteBufHolder) msg).content().readableBytes());
} else if (msg instanceof FileRegion) {
progressReporter.reportProgress(((FileRegion) msg).count());
}

ctx.write(msg, promise.unvoid());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.netty;

import com.azure.core.http.HttpClient;
import com.azure.core.test.HttpClientTestsWireMockServer;
import com.azure.core.test.http.HttpClientTests;
import com.github.tomakehurst.wiremock.WireMockServer;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import javax.net.ssl.SSLException;

/**
* Reactor Netty {@link HttpClientTests} with https.
* Some request logic branches out if it's https like file uploads.
*/
public class NettyAsyncHttpClientHttpClientWithHttpsTests extends HttpClientTests {
private static WireMockServer server;

private static final HttpClient HTTP_CLIENT_INSTANCE;

static {
try {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();

reactor.netty.http.client.HttpClient nettyHttpClient =
reactor.netty.http.client.HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));

HTTP_CLIENT_INSTANCE = new NettyAsyncHttpClientBuilder(nettyHttpClient).build();
} catch (SSLException e) {
throw new RuntimeException(e);
}
}

@BeforeAll
public static void getWireMockServer() {
server = HttpClientTestsWireMockServer.getHttpClientTestsServer();
server.start();
}

@AfterAll
public static void shutdownWireMockServer() {
if (server != null) {
server.shutdown();
}
}

@Override
protected int getWireMockPort() {
return server.httpsPort();
}

@Override
protected boolean isSecure() {
return true;
}

@Override
protected HttpClient createHttpClient() {
return HTTP_CLIENT_INSTANCE;
}
}
4 changes: 4 additions & 0 deletions sdk/core/azure-core-http-okhttp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Features Added

- Added ability to track progress by passing `ProgressReporter` in the `Context`.
I.e., `Contexts.with(context).setProgressReporter(progressReporter)`
before calling `HttpClient.send(HttpRequest, Context)` API.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above


### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.core.http.okhttp.implementation.OkHttpFileRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpFluxRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpInputStreamRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpProgressReportingRequestBody;
import com.azure.core.implementation.util.BinaryDataContent;
import com.azure.core.implementation.util.BinaryDataHelper;
import com.azure.core.implementation.util.ByteArrayContent;
Expand All @@ -23,6 +24,8 @@
import com.azure.core.implementation.util.StringContent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.ProgressReporter;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -56,6 +59,7 @@ public Mono<HttpResponse> send(HttpRequest request) {
@Override
public Mono<HttpResponse> send(HttpRequest request, Context context) {
boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
ProgressReporter progressReporter = Contexts.with(context).getProgressReporter();

return Mono.create(sink -> sink.onRequest(value -> {
// Using MonoSink::onRequest for back pressure support.
Expand All @@ -70,7 +74,7 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
// 3. If Flux<ByteBuffer> asynchronous then subscribe does not block caller thread
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r)
//
toOkHttpRequest(request).subscribe(okHttpRequest -> {
toOkHttpRequest(request, progressReporter).subscribe(okHttpRequest -> {
try {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
Expand All @@ -86,9 +90,10 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
* Converts the given azure-core request to okhttp request.
*
* @param request the azure-core request
* @param progressReporter the {@link ProgressReporter}. Can be null.
* @return the Mono emitting okhttp request
*/
private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request, ProgressReporter progressReporter) {
Request.Builder requestBuilder = new Request.Builder()
.url(request.getUrl());

Expand All @@ -107,8 +112,13 @@ private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
}

return toOkHttpRequestBody(request.getBodyAsBinaryData(), request.getHeaders())
.map(okhttpRequestBody -> requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build());
.map(okhttpRequestBody -> {
if (progressReporter != null) {
okhttpRequestBody = new OkHttpProgressReportingRequestBody(okhttpRequestBody, progressReporter);
}
return requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build();
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.okhttp.implementation;

import com.azure.core.util.ProgressReporter;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.Buffer;
import okio.BufferedSink;
import okio.ForwardingSink;
import okio.Okio;
import okio.Sink;

import java.io.IOException;
import java.util.Objects;

/**
* An {@link okhttp3.RequestBody} subtype that adds progress
* reporting functionality on top of other {@link okhttp3.RequestBody}.
*/
public class OkHttpProgressReportingRequestBody extends RequestBody {
private final RequestBody delegate;
private final ProgressReporter progressReporter;

public OkHttpProgressReportingRequestBody(RequestBody delegate, ProgressReporter progressReporter) {
this.delegate = Objects.requireNonNull(delegate, "'delegate' must not be null");
this.progressReporter = Objects.requireNonNull(progressReporter, "'progressReporter' must not be null");
}

@Override
public MediaType contentType() {
return delegate.contentType();
}

@Override
public long contentLength() throws IOException {
return delegate.contentLength();
}

@Override
public boolean isOneShot() {
return delegate.isOneShot();
}

@Override
public boolean isDuplex() {
return delegate.isDuplex();
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
BufferedSink bufferedSink;

CountingSink countingSink = new CountingSink(sink, progressReporter);
bufferedSink = Okio.buffer(countingSink);

delegate.writeTo(bufferedSink);

bufferedSink.flush();
}

private static final class CountingSink extends ForwardingSink {
private final ProgressReporter progressReporter;

CountingSink(Sink delegate, ProgressReporter progressReporter) {
super(delegate);
this.progressReporter = progressReporter;
}

@Override
public void write(Buffer source, long byteCount) throws IOException {
super.write(source, byteCount);
progressReporter.reportProgress(byteCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public OkHttpStreamableRequestBody(T content, long effectiveContentLength, Media
this.mediaType = mediaType;
}

@Override
public boolean isOneShot() {
return true;
}

@Override
public final MediaType contentType() {
return mediaType;
Expand Down
Loading