Skip to content

Commit

Permalink
Add "ConcurrencyAcquireDuration" metric for netty-nio-client (#2903)
Browse files Browse the repository at this point in the history
* Add "ConcurrencyAcquireDuration" metric for netty-nio-client

The time taken to acquire a new channel from a channel pool can be both
non-trivial and highly variable, depending upon whether a new connection
needs to be established, and depending upon the overhead of new
connection establishment (including TLS handshakes). Due to the high
variability, having an explicit metric can be helpful to give a better
picture of latency sources in impacted requests.
  • Loading branch information
Bennett-Lynch authored Dec 10, 2021
1 parent b1dbeff commit 51d067e
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-NettyNIOHTTPClient-b9f6080.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Netty NIO HTTP Client",
"contributor": "",
"type": "feature",
"description": "Add \"ConcurrencyAcquireDuration\" metric for netty-nio-client"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.http;

import java.time.Duration;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.metrics.MetricCategory;
import software.amazon.awssdk.metrics.MetricLevel;
Expand Down Expand Up @@ -104,6 +105,23 @@ public final class HttpMetric {
public static final SdkMetric<Integer> HTTP_STATUS_CODE =
metric("HttpStatusCode", Integer.class, MetricLevel.TRACE);

/**
* The time taken to acquire a channel from the connection pool.
*
* <p>For HTTP/1 operations, a channel is equivalent to a TCP connection. For HTTP/2 operations, a channel is equivalent to
* an HTTP/2 stream channel. For both protocols, the time to acquire a new concurrency permit may include the following:
* <ol>
* <li>Awaiting a concurrency permit, as restricted by the client's max concurrency configuration.</li>
* <li>The time to establish a new connection, depending on whether an existing connection is available in the pool or
* not.</li>
* <li>The time taken to perform a TLS handshake/negotiation, if TLS is enabled.</li>
* </ol>
*
* <p>Note: This metric is currently only supported in 'netty-nio-client'.
*/
public static final SdkMetric<Duration> CONCURRENCY_ACQUIRE_DURATION =
metric("ConcurrencyAcquireDuration", Duration.class, MetricLevel.INFO);

private HttpMetric() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.HttpMetric.CONCURRENCY_ACQUIRE_DURATION;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTION_ID_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.CLOSED_CHANNEL_MESSAGE;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -90,11 +92,20 @@ public NettyRequestExecutor(RequestContext context) {
public CompletableFuture<Void> execute() {
Promise<Channel> channelFuture = context.eventLoopGroup().next().newPromise();
executeFuture = createExecutionFuture(channelFuture);
context.channelPool().acquire(channelFuture);
acquireChannel(channelFuture);
channelFuture.addListener((GenericFutureListener) this::makeRequestListener);
return executeFuture;
}

private void acquireChannel(Promise<Channel> channelFuture) {
NettyRequestMetrics.ifMetricsAreEnabled(context.metricCollector(), metrics -> {
measureTimeTaken(channelFuture, duration -> {
metrics.reportMetric(CONCURRENCY_ACQUIRE_DURATION, duration);
});
});
context.channelPool().acquire(channelFuture);
}

/**
* Convenience method to create the execution future and set up the cancellation logic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.concurrent.Future;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.Http2Metric;
import software.amazon.awssdk.metrics.MetricCollector;
Expand All @@ -39,6 +43,12 @@ public static boolean metricsAreEnabled(MetricCollector metricCollector) {
return metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
}

public static void ifMetricsAreEnabled(MetricCollector metrics, Consumer<MetricCollector> metricsConsumer) {
if (metricsAreEnabled(metrics)) {
metricsConsumer.accept(metrics);
}
}

/**
* Publish stream metrics for the provided stream channel to the provided collector. This should only be invoked after
* the stream has been initialized. If the stream is not initialized when this is invoked, an exception will be thrown.
Expand Down Expand Up @@ -73,4 +83,15 @@ private static void writeHttp2RequestMetrics(MetricCollector metricCollector,
metricCollector.reportMetric(Http2Metric.REMOTE_STREAM_WINDOW_SIZE_IN_BYTES,
http2Connection.remote().flowController().windowSize(stream));
}

/**
* Measure the time taken for a {@link Future} to complete. Does NOT differentiate between success/failure.
*/
public static void measureTimeTaken(Future<?> future, Consumer<Duration> onDone) {
Instant start = Instant.now();
future.addListener(f -> {
Duration elapsed = Duration.between(start, Instant.now());
onDone.accept(elapsed);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void maxClientStreamsLowerThanServerMaxStreamsReportClientMaxStreams() {
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.PENDING_CONCURRENCY_ACQUIRES).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY)).containsExactly(0);
assertThat(metrics.metricValues(HttpMetric.CONCURRENCY_ACQUIRE_DURATION).get(0)).isPositive();
// The stream window doesn't get initialized with the connection
// initial setting and the update appears to be asynchronous so
// this may be the default window size just based on when the
Expand Down Expand Up @@ -113,6 +114,7 @@ public void maxClientStreamsHigherThanServerMaxStreamsReportServerMaxStreams() {
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.PENDING_CONCURRENCY_ACQUIRES).get(0)).isBetween(0, 1);
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isIn(0, 2, 3);
assertThat(metrics.metricValues(HttpMetric.CONCURRENCY_ACQUIRE_DURATION).get(0)).isPositive();
// The stream window doesn't get initialized with the connection
// initial setting and the update appears to be asynchronous so
// this may be the default window size just based on when the
Expand Down

0 comments on commit 51d067e

Please sign in to comment.