From ba3f39adc4dffaa72ef37f4335116f58b8e9fb30 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Wed, 3 Feb 2021 13:04:57 -0800 Subject: [PATCH 1/3] fix: Make pull query metrics apply only to pull and not also push --- .../ksql/api/server/OldApiUtils.java | 71 ++++++++++++++----- .../ksql/api/server/ServerVerticle.java | 7 +- .../io/confluent/ksql/api/spi/Endpoints.java | 4 +- .../ksql/rest/server/KsqlServerEndpoints.java | 7 +- .../streaming/StreamedQueryResource.java | 29 ++++++-- .../io/confluent/ksql/api/TestEndpoints.java | 3 +- .../ksql/api/perf/InsertsStreamRunner.java | 3 +- .../ksql/api/perf/PullQueryRunner.java | 3 +- .../ksql/api/perf/QueryStreamRunner.java | 4 +- .../streaming/StreamedQueryResourceTest.java | 52 +++++++++----- 10 files changed, 132 insertions(+), 51 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 94b963cb0cdf..71273bf05721 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -21,7 +21,6 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.auth.DefaultApiSecurityContext; -import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; @@ -39,6 +38,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import org.apache.kafka.common.utils.Time; @@ -55,7 +55,7 @@ static void handleOldApiRequest( final Server server, final RoutingContext routingContext, final Class requestClass, - final Optional pullQueryMetrics, + final Optional metricsCallbacks, final BiFunction> requestor) { final long startTimeNanos = Time.SYSTEM.nanoseconds(); final T requestObject; @@ -69,20 +69,17 @@ static void handleOldApiRequest( } else { requestObject = null; } - pullQueryMetrics - .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( - routingContext.request().bytesRead())); final CompletableFuture completableFuture = requestor .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { handleOldApiResponse( - server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos); + server, routingContext, endpointResponse, metricsCallbacks, startTimeNanos); }).exceptionally(t -> { if (t instanceof CompletionException) { t = t.getCause(); } handleOldApiResponse( - server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos); + server, routingContext, mapException(t), metricsCallbacks, startTimeNanos); return null; }); } @@ -90,7 +87,7 @@ static void handleOldApiRequest( static void handleOldApiResponse( final Server server, final RoutingContext routingContext, final EndpointResponse endpointResponse, - final Optional pullQueryMetrics, + final Optional metricsCallbacks, final long startTimeNanos ) { final HttpServerResponse response = routingContext.response(); @@ -111,7 +108,8 @@ static void handleOldApiResponse( return; } response.putHeader(TRANSFER_ENCODING, CHUNKED_ENCODING); - streamEndpointResponse(server, routingContext, streamingOutput); + streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbacks, + startTimeNanos); } else { if (endpointResponse.getEntity() == null) { response.end(); @@ -124,18 +122,15 @@ static void handleOldApiResponse( } response.end(responseBody); } + reportMetrics(routingContext, metricsCallbacks, startTimeNanos); } - pullQueryMetrics - .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize( - routingContext.response().bytesWritten())); - pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics - .recordLatency(startTimeNanos)); - } private static void streamEndpointResponse(final Server server, final RoutingContext routingContext, - final StreamingOutput streamingOutput) { + final StreamingOutput streamingOutput, + final Optional metricsCallbacks, + final long startTimeNanos) { final WorkerExecutor workerExecutor = server.getWorkerExecutor(); final VertxCompletableFuture vcf = new VertxCompletableFuture<>(); workerExecutor.executeBlocking(promise -> { @@ -162,6 +157,21 @@ private static void streamEndpointResponse(final Server server, } } }, vcf); + vcf.handle((v, throwable) -> { + reportMetrics(routingContext, metricsCallbacks, startTimeNanos); + return null; + }); + } + + private static void reportMetrics( + final RoutingContext routingContext, + final Optional metricsCallbacks, + final long startTimeNanos + ) { + metricsCallbacks.ifPresent(mc -> mc.reportMetrics( + routingContext.request().bytesRead(), + routingContext.response().bytesWritten(), + startTimeNanos)); } public static EndpointResponse mapException(final Throwable exception) { @@ -176,4 +186,33 @@ public static EndpointResponse mapException(final Throwable exception) { .build(); } + /** + * Interface for reporting metrics to a resource. A resource may choose to break things down + * arbitrarily, e.g. /query is used for both push and pull queries so we let the resource + * determine how to report the metrics. + */ + public interface MetricsCallback { + + void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos); + } + + public static class EndpointMetricsCallbacks { + + private AtomicReference callbackRef = new AtomicReference<>(null); + + public EndpointMetricsCallbacks() { + } + + public void setCallback(final MetricsCallback callback) { + this.callbackRef.set(callback); + } + + void reportMetrics(long requestBytes, long responseBytes, long startTimeNanos) { + final MetricsCallback callback = callbackRef.get(); + if (callback != null) { + callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos); + } + } + } + } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 0495ebef7bb9..896621b8ba86 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -21,6 +21,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.auth.DefaultApiSecurityContext; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; @@ -237,14 +238,16 @@ private void handleQueryRequest(final RoutingContext routingContext) { final CompletableFuture connectionClosedFuture = new CompletableFuture<>(); routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null)); - handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics, + EndpointMetricsCallbacks metricsCallbacks = new EndpointMetricsCallbacks(); + handleOldApiRequest(server, routingContext, KsqlRequest.class, Optional.of(metricsCallbacks), (request, apiSecurityContext) -> endpoints .executeQueryRequest( request, server.getWorkerExecutor(), connectionClosedFuture, DefaultApiSecurityContext.create(routingContext), isInternalRequest(routingContext), - getContentType(routingContext) + getContentType(routingContext), + metricsCallbacks ) ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java index 7af027a1dfed..8650d969fd8b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java @@ -18,6 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.entity.HeartbeatMessage; @@ -83,7 +84,8 @@ CompletableFuture executeQueryRequest( KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType); + KsqlMediaType mediaType, + EndpointMetricsCallbacks metricsCallbacks); CompletableFuture executeInfo(ApiSecurityContext apiSecurityContext); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index d1b0bca2fc40..a57e20e7c117 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -24,6 +24,7 @@ import io.confluent.ksql.api.impl.QueryEndpoint; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.engine.KsqlEngine; @@ -194,7 +195,8 @@ public CompletableFuture executeQueryRequest( final CompletableFuture connectionClosedFuture, final ApiSecurityContext apiSecurityContext, final Optional isInternalRequest, - final KsqlMediaType mediaType + final KsqlMediaType mediaType, + final EndpointMetricsCallbacks metricsCallbacks ) { return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> streamedQueryResource.streamQuery( @@ -202,7 +204,8 @@ public CompletableFuture executeQueryRequest( request, connectionClosedFuture, isInternalRequest, - mediaType + mediaType, + metricsCallbacks ), workerExecutor); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index d304bc000a85..9f48ad3c9746 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; import io.confluent.ksql.analyzer.PullQueryValidator; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; @@ -193,7 +194,8 @@ public EndpointResponse streamQuery( final KsqlRequest request, final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, - final KsqlMediaType mediaType + final KsqlMediaType mediaType, + final EndpointMetricsCallbacks metricsCallbacks ) { throwIfNotConfigured(); activenessRegistrar.updateLastRequestTime(); @@ -204,7 +206,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, mediaType); + isInternalRequest, mediaType, metricsCallbacks); } private void throwIfNotConfigured() { @@ -233,7 +235,8 @@ private EndpointResponse handleStatement( final PreparedStatement statement, final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, - final KsqlMediaType mediaType + final KsqlMediaType mediaType, + final EndpointMetricsCallbacks metricsCallbacks ) { try { authorizationValidator.ifPresent(validator -> @@ -255,8 +258,8 @@ private EndpointResponse handleStatement( configProperties, request.getRequestProperties(), isInternalRequest, - pullQueryMetrics, - connectionClosedFuture + connectionClosedFuture, + metricsCallbacks ); } @@ -295,9 +298,11 @@ private EndpointResponse handlePullQuery( final Map configOverrides, final Map requestProperties, final Optional isInternalRequest, - final Optional pullQueryMetrics, - final CompletableFuture connectionClosedFuture + final CompletableFuture connectionClosedFuture, + final EndpointMetricsCallbacks metricsCallbacks ) { + setupMetricsCallbackForPullQuery(metricsCallbacks); + final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); @@ -368,6 +373,16 @@ private EndpointResponse handlePullQuery( } } + private void setupMetricsCallbackForPullQuery(final EndpointMetricsCallbacks metricsCallbacks) { + metricsCallbacks.setCallback((requestBytes, responseBytes, startTimeNanos) -> { + pullQueryMetrics.ifPresent(metrics -> { + metrics.recordRequestSize(requestBytes); + metrics.recordResponseSize(responseBytes); + metrics.recordLatency(startTimeNanos); + }); + }); + } + private EndpointResponse handlePushQuery( final ServiceContext serviceContext, final PreparedStatement statement, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java index d14319b52cf0..6c6e93471043 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java @@ -18,6 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.api.utils.RowGenerator; @@ -143,7 +144,7 @@ public CompletableFuture executeTerminate(final ClusterTermina public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java index 24199178f2a4..ab644bb436d5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java @@ -18,6 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BaseSubscriber; @@ -192,7 +193,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java index 7ed289233527..3be6ae5e99d5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java @@ -25,6 +25,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BufferedPublisher; @@ -152,7 +153,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java index 99910daecdfc..0e03541f92b0 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java @@ -24,6 +24,7 @@ import io.confluent.ksql.api.impl.BlockingQueryPublisher; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; @@ -138,7 +139,8 @@ public CompletableFuture executeQueryRequest( CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, + final EndpointMetricsCallbacks metricsCallbacks) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 566535ea5246..81828ff7bcda 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -49,6 +49,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.server.StreamingOutput; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -272,7 +273,8 @@ ERROR_CODE_BAD_STATEMENT, errorMsg, PULL_QUERY_STRING, new KsqlEntityList())) new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); // Then: @@ -342,7 +344,8 @@ public void shouldRateLimit() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); // Then: @@ -352,9 +355,6 @@ public void shouldRateLimit() { @Test public void shouldReachConcurrentLimit() { // Given: - when(mockKsqlEngine.executePullQuery(any(), any(), any(), any(), any(), any(), anyBoolean())) - .thenReturn(pullQueryResult); - when(pullQueryResult.getPullQueryQueue()).thenReturn(pullQueryQueue); when(rateLimiter.tryAcquire()).thenReturn(true); when(concurrencyLimiter.increment()).thenThrow(new KsqlException("concurrencyLimiter Error!")); @@ -365,7 +365,8 @@ public void shouldReachConcurrentLimit() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT); + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks()); // Then: assertThat(response.getStatus(), is(500)); @@ -411,7 +412,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ) ); @@ -434,7 +436,8 @@ public void shouldReturn400OnBadStatement() { new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ) ); @@ -452,7 +455,8 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); // Then: @@ -467,7 +471,8 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); // Then: @@ -489,7 +494,8 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ) ); @@ -513,7 +519,8 @@ public void shouldNotCreateExternalClientsForPullQuery() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); // Then: @@ -538,7 +545,8 @@ public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -594,7 +602,8 @@ public void shouldThrowOnDenyListedStreamProperty() { ), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); // Then: @@ -676,7 +685,8 @@ public void shouldStreamRowsCorrectly() throws Throwable { new KsqlRequest(queryString, requestStreamsProperties, Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); @@ -820,7 +830,8 @@ public void shouldUpdateTheLastRequestTime() { new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); // Then: @@ -842,7 +853,8 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -868,7 +880,8 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ); assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); @@ -899,7 +912,8 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new EndpointMetricsCallbacks() ) ); From b9b5f4dd3dec0ac81970304861f0f6ea78ce0dce Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Wed, 10 Mar 2021 14:29:34 -0800 Subject: [PATCH 2/3] Feedback, plus Also use on /stream-query --- .../ksql/api/impl/QueryEndpoint.java | 25 ++++++---- .../ksql/api/server/MetricsCallback.java | 32 ++++++++++++ .../api/server/MetricsCallbackHolder.java | 41 +++++++++++++++ .../ksql/api/server/OldApiUtils.java | 50 ++++--------------- .../ksql/api/server/QueryStreamHandler.java | 12 ++++- .../ksql/api/server/ServerVerticle.java | 8 +-- .../io/confluent/ksql/api/spi/Endpoints.java | 7 +-- .../ksql/rest/server/KsqlServerEndpoints.java | 12 +++-- .../streaming/PullQueryPublisher.java | 8 +-- .../streaming/StreamedQueryResource.java | 31 ++++++------ .../io/confluent/ksql/api/TestEndpoints.java | 7 +-- .../ksql/api/perf/InsertsStreamRunner.java | 7 +-- .../ksql/api/perf/PullQueryRunner.java | 7 +-- .../ksql/api/perf/QueryStreamRunner.java | 7 +-- .../streaming/StreamedQueryResourceTest.java | 34 ++++++------- 15 files changed, 172 insertions(+), 116 deletions(-) create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java create mode 100644 ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index cd8953158899..99aab9c52714 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.RateLimiter; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.config.SessionConfig; @@ -58,7 +59,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.apache.kafka.common.utils.Time; public class QueryEndpoint { @@ -99,8 +99,8 @@ public QueryPublisher createQueryPublisher( final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ServiceContext serviceContext) { - final long startTimeNanos = Time.SYSTEM.nanoseconds(); + final ServiceContext serviceContext, + final MetricsCallbackHolder metricsCallbackHolder) { // Must be run on worker as all this stuff is slow VertxUtils.checkIsWorker(); @@ -108,8 +108,8 @@ public QueryPublisher createQueryPublisher( if (statement.getStatement().isPullQuery()) { return createPullQueryPublisher( - context, serviceContext, statement, pullQueryMetrics, - startTimeNanos, workerExecutor); + context, serviceContext, statement, pullQueryMetrics, workerExecutor, + metricsCallbackHolder); } else { return createPushQueryPublisher(context, serviceContext, statement, workerExecutor); } @@ -146,9 +146,17 @@ private QueryPublisher createPullQueryPublisher( final ServiceContext serviceContext, final ConfiguredStatement statement, final Optional pullQueryMetrics, - final long startTimeNanos, - final WorkerExecutor workerExecutor + final WorkerExecutor workerExecutor, + final MetricsCallbackHolder metricsCallbackHolder ) { + // First thing, set the metrics callback so that it gets called, even if we hit an error + metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> { + pullQueryMetrics.ifPresent(metrics -> { + metrics.recordRequestSize(requestBytes); + metrics.recordResponseSize(responseBytes); + metrics.recordLatency(startTimeNanos); + }); + }); final RoutingOptions routingOptions = new PullQueryConfigRoutingOptions( ksqlConfig, @@ -177,9 +185,6 @@ private QueryPublisher createPullQueryPublisher( result.onCompletionOrException((v, throwable) -> { decrementer.decrementAtMostOnce(); - if (throwable == null) { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - } }); final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java new file mode 100644 index 000000000000..128b88ad1cd3 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + +/** + * Interface for reporting metrics to a resource. A resource may choose to break things down + * arbitrarily, e.g. /query is used for both push and pull queries so we let the resource + * determine how to report the metrics. + */ +public interface MetricsCallback { + + /** + * Called to report metrics when the request is complete, error or success + * @param requestBytes The request bytes + * @param responseBytes The response bytes + * @param startTimeNanos The start time of the request in nanos + */ + void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos); +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java new file mode 100644 index 000000000000..0a2453f06a2d --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class give a resource the opportunity to register a set of particular callbacks based upon + * arbitrary criteria. Once the response is complete, the callback is invoked. + */ +public class MetricsCallbackHolder { + + private AtomicReference callbackRef = new AtomicReference<>(null); + + public MetricsCallbackHolder() { + } + + public void setCallback(final MetricsCallback callback) { + this.callbackRef.set(callback); + } + + void reportMetrics(final long requestBytes, final long responseBytes, final long startTimeNanos) { + final MetricsCallback callback = callbackRef.get(); + if (callback != null) { + callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos); + } + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 71273bf05721..b4722828802e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -38,7 +38,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import org.apache.kafka.common.utils.Time; @@ -55,7 +54,7 @@ static void handleOldApiRequest( final Server server, final RoutingContext routingContext, final Class requestClass, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final BiFunction> requestor) { final long startTimeNanos = Time.SYSTEM.nanoseconds(); final T requestObject; @@ -73,13 +72,13 @@ static void handleOldApiRequest( .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { handleOldApiResponse( - server, routingContext, endpointResponse, metricsCallbacks, startTimeNanos); + server, routingContext, endpointResponse, metricsCallbackHolder, startTimeNanos); }).exceptionally(t -> { if (t instanceof CompletionException) { t = t.getCause(); } handleOldApiResponse( - server, routingContext, mapException(t), metricsCallbacks, startTimeNanos); + server, routingContext, mapException(t), metricsCallbackHolder, startTimeNanos); return null; }); } @@ -87,7 +86,7 @@ static void handleOldApiRequest( static void handleOldApiResponse( final Server server, final RoutingContext routingContext, final EndpointResponse endpointResponse, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final long startTimeNanos ) { final HttpServerResponse response = routingContext.response(); @@ -108,7 +107,7 @@ static void handleOldApiResponse( return; } response.putHeader(TRANSFER_ENCODING, CHUNKED_ENCODING); - streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbacks, + streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbackHolder, startTimeNanos); } else { if (endpointResponse.getEntity() == null) { @@ -122,14 +121,14 @@ static void handleOldApiResponse( } response.end(responseBody); } - reportMetrics(routingContext, metricsCallbacks, startTimeNanos); + reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); } } private static void streamEndpointResponse(final Server server, final RoutingContext routingContext, final StreamingOutput streamingOutput, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final long startTimeNanos) { final WorkerExecutor workerExecutor = server.getWorkerExecutor(); final VertxCompletableFuture vcf = new VertxCompletableFuture<>(); @@ -158,17 +157,17 @@ private static void streamEndpointResponse(final Server server, } }, vcf); vcf.handle((v, throwable) -> { - reportMetrics(routingContext, metricsCallbacks, startTimeNanos); + reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); return null; }); } private static void reportMetrics( final RoutingContext routingContext, - final Optional metricsCallbacks, + final Optional metricsCallbackHolder, final long startTimeNanos ) { - metricsCallbacks.ifPresent(mc -> mc.reportMetrics( + metricsCallbackHolder.ifPresent(mc -> mc.reportMetrics( routingContext.request().bytesRead(), routingContext.response().bytesWritten(), startTimeNanos)); @@ -186,33 +185,4 @@ public static EndpointResponse mapException(final Throwable exception) { .build(); } - /** - * Interface for reporting metrics to a resource. A resource may choose to break things down - * arbitrarily, e.g. /query is used for both push and pull queries so we let the resource - * determine how to report the metrics. - */ - public interface MetricsCallback { - - void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos); - } - - public static class EndpointMetricsCallbacks { - - private AtomicReference callbackRef = new AtomicReference<>(null); - - public EndpointMetricsCallbacks() { - } - - public void setCallback(final MetricsCallback callback) { - this.callbackRef.set(callback); - } - - void reportMetrics(long requestBytes, long responseBytes, long startTimeNanos) { - final MetricsCallback callback = callbackRef.get(); - if (callback != null) { - callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos); - } - } - } - } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java index cff4921f15de..6def4c7bafcd 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java @@ -26,6 +26,7 @@ import io.vertx.ext.web.RoutingContext; import java.util.Objects; import java.util.Optional; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +77,11 @@ public void handle(final RoutingContext routingContext) { return; } + final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder(); + final long startTimeNanos = Time.SYSTEM.nanoseconds(); endpoints.createQueryPublisher(queryStreamArgs.get().sql, queryStreamArgs.get().properties, - context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext)) + context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext), + metricsCallbackHolder) .thenAccept(queryPublisher -> { final QueryResponseMetadata metadata; @@ -101,6 +105,12 @@ public void handle(final RoutingContext routingContext) { // When response is complete, publisher should be closed and query unregistered routingContext.response().endHandler(v -> query.close()); } + routingContext.response().endHandler(v -> { + metricsCallbackHolder.reportMetrics( + routingContext.request().bytesRead(), + routingContext.response().bytesWritten(), + startTimeNanos); + }); queryStreamResponseWriter.writeMetadata(metadata); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 896621b8ba86..d3778311d3f1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -21,7 +21,6 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.auth.DefaultApiSecurityContext; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; @@ -238,8 +237,9 @@ private void handleQueryRequest(final RoutingContext routingContext) { final CompletableFuture connectionClosedFuture = new CompletableFuture<>(); routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null)); - EndpointMetricsCallbacks metricsCallbacks = new EndpointMetricsCallbacks(); - handleOldApiRequest(server, routingContext, KsqlRequest.class, Optional.of(metricsCallbacks), + final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder(); + handleOldApiRequest(server, routingContext, KsqlRequest.class, + Optional.of(metricsCallbackHolder), (request, apiSecurityContext) -> endpoints .executeQueryRequest( @@ -247,7 +247,7 @@ private void handleQueryRequest(final RoutingContext routingContext) { DefaultApiSecurityContext.create(routingContext), isInternalRequest(routingContext), getContentType(routingContext), - metricsCallbacks + metricsCallbackHolder ) ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java index 8650d969fd8b..b61c5314482a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java @@ -18,7 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.entity.HeartbeatMessage; @@ -52,7 +52,8 @@ public interface Endpoints { * @return A CompletableFuture representing the future result of the operation */ CompletableFuture createQueryPublisher(String sql, JsonObject properties, - Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext); + Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext, + MetricsCallbackHolder metricsCallbackHolder); /** * Create a subscriber which will receive a stream of inserts from the API server and process @@ -85,7 +86,7 @@ CompletableFuture executeQueryRequest( CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, KsqlMediaType mediaType, - EndpointMetricsCallbacks metricsCallbacks); + MetricsCallbackHolder metricsCallbackHolder); CompletableFuture executeInfo(ApiSecurityContext apiSecurityContext); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index a57e20e7c117..70d6c7bf7391 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -24,7 +24,7 @@ import io.confluent.ksql.api.impl.QueryEndpoint; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.engine.KsqlEngine; @@ -143,7 +143,8 @@ public CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { final KsqlSecurityContext ksqlSecurityContext = ksqlSecurityContextProvider .provide(apiSecurityContext); return executeOnWorker(() -> { @@ -156,7 +157,8 @@ public CompletableFuture createQueryPublisher(final String sql, properties, context, workerExecutor, - ksqlSecurityContext.getServiceContext()); + ksqlSecurityContext.getServiceContext(), + metricsCallbackHolder); } finally { ksqlSecurityContext.getServiceContext().close(); } @@ -196,7 +198,7 @@ public CompletableFuture executeQueryRequest( final ApiSecurityContext apiSecurityContext, final Optional isInternalRequest, final KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> streamedQueryResource.streamQuery( @@ -205,7 +207,7 @@ public CompletableFuture executeQueryRequest( connectionClosedFuture, isInternalRequest, mediaType, - metricsCallbacks + metricsCallbackHolder ), workerExecutor); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index ad214c97c314..b86e061330f2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -108,15 +108,9 @@ public synchronized void subscribe(final Subscriber> sub true ); - result.onCompletionOrException((v, t) -> decrementer.decrementAtMostOnce()); - result.onCompletion(v -> { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - }); result.onCompletionOrException((v, throwable) -> { decrementer.decrementAtMostOnce(); - if (throwable == null) { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - } + pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); }); final PullQuerySubscription subscription = new PullQuerySubscription( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 9f48ad3c9746..46905072be52 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -21,7 +21,7 @@ import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; import io.confluent.ksql.analyzer.PullQueryValidator; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; @@ -195,7 +195,7 @@ public EndpointResponse streamQuery( final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, final KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { throwIfNotConfigured(); activenessRegistrar.updateLastRequestTime(); @@ -206,7 +206,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, mediaType, metricsCallbacks); + isInternalRequest, mediaType, metricsCallbackHolder); } private void throwIfNotConfigured() { @@ -236,7 +236,7 @@ private EndpointResponse handleStatement( final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, final KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { try { authorizationValidator.ifPresent(validator -> @@ -259,7 +259,7 @@ private EndpointResponse handleStatement( request.getRequestProperties(), isInternalRequest, connectionClosedFuture, - metricsCallbacks + metricsCallbackHolder ); } @@ -299,9 +299,16 @@ private EndpointResponse handlePullQuery( final Map requestProperties, final Optional isInternalRequest, final CompletableFuture connectionClosedFuture, - final EndpointMetricsCallbacks metricsCallbacks + final MetricsCallbackHolder metricsCallbackHolder ) { - setupMetricsCallbackForPullQuery(metricsCallbacks); + // First thing, set the metrics callback so that it gets called, even if we hit an error + metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> { + pullQueryMetrics.ifPresent(metrics -> { + metrics.recordRequestSize(requestBytes); + metrics.recordResponseSize(responseBytes); + metrics.recordLatency(startTimeNanos); + }); + }); final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); @@ -373,16 +380,6 @@ private EndpointResponse handlePullQuery( } } - private void setupMetricsCallbackForPullQuery(final EndpointMetricsCallbacks metricsCallbacks) { - metricsCallbacks.setCallback((requestBytes, responseBytes, startTimeNanos) -> { - pullQueryMetrics.ifPresent(metrics -> { - metrics.recordRequestSize(requestBytes); - metrics.recordResponseSize(responseBytes); - metrics.recordLatency(startTimeNanos); - }); - }); - } - private EndpointResponse handlePushQuery( final ServiceContext serviceContext, final PreparedStatement statement, diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java index 6c6e93471043..5e2656c057c6 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java @@ -18,7 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.api.utils.RowGenerator; @@ -66,7 +66,8 @@ public class TestEndpoints implements Endpoints { @Override public synchronized CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { CompletableFuture completableFuture = new CompletableFuture<>(); if (createQueryPublisherException != null) { createQueryPublisherException.fillInStackTrace(); @@ -144,7 +145,7 @@ public CompletableFuture executeTerminate(final ClusterTermina public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java index ab644bb436d5..be7108c2f8d5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java @@ -18,7 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BaseSubscriber; @@ -163,7 +163,8 @@ public CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { return null; } @@ -193,7 +194,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java index 3be6ae5e99d5..a944914b3f7d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java @@ -25,7 +25,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BufferedPublisher; @@ -121,7 +121,8 @@ public synchronized CompletableFuture createQueryPublisher(final final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { PullQueryPublisher publisher = new PullQueryPublisher(context, DEFAULT_ROWS); publishers.add(publisher); return CompletableFuture.completedFuture(publisher); @@ -153,7 +154,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType, final EndpointMetricsCallbacks metricsCallbacks) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java index 0e03541f92b0..a3d36e8b280b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java @@ -24,7 +24,7 @@ import io.confluent.ksql.api.impl.BlockingQueryPublisher; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; @@ -100,7 +100,8 @@ public synchronized CompletableFuture createQueryPublisher(final final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { QueryStreamPublisher publisher = new QueryStreamPublisher(context, server.getWorkerExecutor()); publisher.setQueryHandle(new TestQueryHandle(), false); @@ -140,7 +141,7 @@ public CompletableFuture executeQueryRequest( ApiSecurityContext apiSecurityContext, Optional isInternalRequest, KsqlMediaType mediaType, - final EndpointMetricsCallbacks metricsCallbacks) { + final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 81828ff7bcda..62653b4deb3a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -49,7 +49,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.api.server.OldApiUtils.EndpointMetricsCallbacks; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.StreamingOutput; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -274,7 +274,7 @@ ERROR_CODE_BAD_STATEMENT, errorMsg, PULL_QUERY_STRING, new KsqlEntityList())) new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -345,7 +345,7 @@ public void shouldRateLimit() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -366,7 +366,7 @@ public void shouldReachConcurrentLimit() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks()); + new MetricsCallbackHolder()); // Then: assertThat(response.getStatus(), is(500)); @@ -413,7 +413,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) ); @@ -437,7 +437,7 @@ public void shouldReturn400OnBadStatement() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) ); @@ -456,7 +456,7 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -472,7 +472,7 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -495,7 +495,7 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) ); @@ -520,7 +520,7 @@ public void shouldNotCreateExternalClientsForPullQuery() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -546,7 +546,7 @@ public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -603,7 +603,7 @@ public void shouldThrowOnDenyListedStreamProperty() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -686,7 +686,7 @@ public void shouldStreamRowsCorrectly() throws Throwable { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); @@ -831,7 +831,7 @@ public void shouldUpdateTheLastRequestTime() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); // Then: @@ -854,7 +854,7 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -881,7 +881,7 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ); assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); @@ -913,7 +913,7 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { new CompletableFuture<>(), Optional.empty(), KsqlMediaType.LATEST_FORMAT, - new EndpointMetricsCallbacks() + new MetricsCallbackHolder() ) ); From d939ca6ca0f02b009088d86a9bd7c8f0053c9c7c Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Wed, 10 Mar 2021 16:22:08 -0800 Subject: [PATCH 3/3] Fixes metrics call location --- .../ksql/api/server/QueryStreamHandler.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java index 6def4c7bafcd..5b4bcd1c2d48 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java @@ -92,7 +92,13 @@ public void handle(final RoutingContext routingContext) { queryPublisher.getColumnTypes()); // When response is complete, publisher should be closed - routingContext.response().endHandler(v -> queryPublisher.close()); + routingContext.response().endHandler(v -> { + queryPublisher.close(); + metricsCallbackHolder.reportMetrics( + routingContext.request().bytesRead(), + routingContext.response().bytesWritten(), + startTimeNanos); + }); } else { final PushQueryHolder query = connectionQueryManager .createApiQuery(queryPublisher, routingContext.request()); @@ -105,12 +111,6 @@ public void handle(final RoutingContext routingContext) { // When response is complete, publisher should be closed and query unregistered routingContext.response().endHandler(v -> query.close()); } - routingContext.response().endHandler(v -> { - metricsCallbackHolder.reportMetrics( - routingContext.request().bytesRead(), - routingContext.response().bytesWritten(), - startTimeNanos); - }); queryStreamResponseWriter.writeMetadata(metadata);