Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "ConcurrencyAcquireDuration" metric for netty-nio-client #2903

Merged
merged 5 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Netty NIO HTTP Client",
"contributor": "",
"type": "feature",
"description": "Add \"ConcurrencyAcquireDuration\" metric for netty-nio-client"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.http;

import java.time.Duration;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.metrics.MetricCategory;
import software.amazon.awssdk.metrics.MetricLevel;
Expand Down Expand Up @@ -104,6 +105,23 @@ public final class HttpMetric {
public static final SdkMetric<Integer> HTTP_STATUS_CODE =
metric("HttpStatusCode", Integer.class, MetricLevel.TRACE);

/**
* The time taken to acquire a channel from the connection pool.
*
* <p>For HTTP/1 operations, a channel is equivalent to a TCP connection. For HTTP/2 operations, a channel is equivalent to
* an HTTP/2 stream channel. For both protocols, the time to acquire a new concurrency permit may include the following:
* <ol>
* <li>Awaiting a concurrency permit, as restricted by the client's max concurrency configuration.</li>
* <li>The time to establish a new connection, depending on whether an existing connection is available in the pool or
* not.</li>
* <li>The time taken to perform a TLS handshake/negotiation, if TLS is enabled.</li>
* </ol>
*
* <p>Note: This metric is currently only supported in 'netty-nio-client'.
Bennett-Lynch marked this conversation as resolved.
Show resolved Hide resolved
*/
public static final SdkMetric<Duration> CONCURRENCY_ACQUIRE_DURATION =
metric("ConcurrencyAcquireDuration", Duration.class, MetricLevel.INFO);

private HttpMetric() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.HttpMetric.CONCURRENCY_ACQUIRE_DURATION;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTION_ID_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.CLOSED_CHANNEL_MESSAGE;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -90,11 +92,20 @@ public NettyRequestExecutor(RequestContext context) {
public CompletableFuture<Void> execute() {
Promise<Channel> channelFuture = context.eventLoopGroup().next().newPromise();
executeFuture = createExecutionFuture(channelFuture);
context.channelPool().acquire(channelFuture);
acquireChannel(channelFuture);
channelFuture.addListener((GenericFutureListener) this::makeRequestListener);
return executeFuture;
}

private void acquireChannel(Promise<Channel> channelFuture) {
NettyRequestMetrics.ifMetricsAreEnabled(context.metricCollector(), metrics -> {
measureTimeTaken(channelFuture, duration -> {
metrics.reportMetric(CONCURRENCY_ACQUIRE_DURATION, duration);
});
});
context.channelPool().acquire(channelFuture);
}

/**
* Convenience method to create the execution future and set up the cancellation logic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.concurrent.Future;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.Http2Metric;
import software.amazon.awssdk.metrics.MetricCollector;
Expand All @@ -39,6 +43,12 @@ public static boolean metricsAreEnabled(MetricCollector metricCollector) {
return metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
}

public static void ifMetricsAreEnabled(MetricCollector metrics, Consumer<MetricCollector> metricsConsumer) {
if (metricsAreEnabled(metrics)) {
metricsConsumer.accept(metrics);
}
}

/**
* Publish stream metrics for the provided stream channel to the provided collector. This should only be invoked after
* the stream has been initialized. If the stream is not initialized when this is invoked, an exception will be thrown.
Expand Down Expand Up @@ -73,4 +83,15 @@ private static void writeHttp2RequestMetrics(MetricCollector metricCollector,
metricCollector.reportMetric(Http2Metric.REMOTE_STREAM_WINDOW_SIZE_IN_BYTES,
http2Connection.remote().flowController().windowSize(stream));
}

/**
* Measure the time taken for a {@link Future} to complete. Does NOT differentiate between success/failure.
*/
public static void measureTimeTaken(Future<?> future, Consumer<Duration> onDone) {
Instant start = Instant.now();
future.addListener(f -> {
Bennett-Lynch marked this conversation as resolved.
Show resolved Hide resolved
Duration elapsed = Duration.between(start, Instant.now());
onDone.accept(elapsed);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void maxClientStreamsLowerThanServerMaxStreamsReportClientMaxStreams() {
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.PENDING_CONCURRENCY_ACQUIRES).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY)).containsExactly(0);
assertThat(metrics.metricValues(HttpMetric.CONCURRENCY_ACQUIRE_DURATION).get(0)).isPositive();
// The stream window doesn't get initialized with the connection
// initial setting and the update appears to be asynchronous so
// this may be the default window size just based on when the
Expand Down Expand Up @@ -113,6 +114,7 @@ public void maxClientStreamsHigherThanServerMaxStreamsReportServerMaxStreams() {
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.PENDING_CONCURRENCY_ACQUIRES).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isIn(0, 2, 3);
assertThat(metrics.metricValues(HttpMetric.CONCURRENCY_ACQUIRE_DURATION).get(0)).isPositive();
// The stream window doesn't get initialized with the connection
// initial setting and the update appears to be asynchronous so
// this may be the default window size just based on when the
Expand Down