diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java index cd8953158899..99aab9c52714 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.RateLimiter; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.config.SessionConfig; @@ -58,7 +59,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.apache.kafka.common.utils.Time; public class QueryEndpoint { @@ -99,8 +99,8 @@ public QueryPublisher createQueryPublisher( final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ServiceContext serviceContext) { - final long startTimeNanos = Time.SYSTEM.nanoseconds(); + final ServiceContext serviceContext, + final MetricsCallbackHolder metricsCallbackHolder) { // Must be run on worker as all this stuff is slow VertxUtils.checkIsWorker(); @@ -108,8 +108,8 @@ public QueryPublisher createQueryPublisher( if (statement.getStatement().isPullQuery()) { return createPullQueryPublisher( - context, serviceContext, statement, pullQueryMetrics, - startTimeNanos, workerExecutor); + context, serviceContext, statement, pullQueryMetrics, workerExecutor, + metricsCallbackHolder); } else { return createPushQueryPublisher(context, serviceContext, statement, workerExecutor); } @@ -146,9 +146,17 @@ private QueryPublisher createPullQueryPublisher( final ServiceContext serviceContext, final ConfiguredStatement statement, final Optional pullQueryMetrics, - final long startTimeNanos, - final WorkerExecutor workerExecutor + final WorkerExecutor workerExecutor, + final MetricsCallbackHolder metricsCallbackHolder ) { + // First thing, set the metrics callback so that it gets called, even if we hit an error + metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> { + pullQueryMetrics.ifPresent(metrics -> { + metrics.recordRequestSize(requestBytes); + metrics.recordResponseSize(responseBytes); + metrics.recordLatency(startTimeNanos); + }); + }); final RoutingOptions routingOptions = new PullQueryConfigRoutingOptions( ksqlConfig, @@ -177,9 +185,6 @@ private QueryPublisher createPullQueryPublisher( result.onCompletionOrException((v, throwable) -> { decrementer.decrementAtMostOnce(); - if (throwable == null) { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - } }); final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java new file mode 100644 index 000000000000..128b88ad1cd3 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallback.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + +/** + * Interface for reporting metrics to a resource. A resource may choose to break things down + * arbitrarily, e.g. /query is used for both push and pull queries so we let the resource + * determine how to report the metrics. + */ +public interface MetricsCallback { + + /** + * Called to report metrics when the request is complete, error or success + * @param requestBytes The request bytes + * @param responseBytes The response bytes + * @param startTimeNanos The start time of the request in nanos + */ + void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos); +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java new file mode 100644 index 000000000000..0a2453f06a2d --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/MetricsCallbackHolder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.server; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class give a resource the opportunity to register a set of particular callbacks based upon + * arbitrary criteria. Once the response is complete, the callback is invoked. + */ +public class MetricsCallbackHolder { + + private AtomicReference callbackRef = new AtomicReference<>(null); + + public MetricsCallbackHolder() { + } + + public void setCallback(final MetricsCallback callback) { + this.callbackRef.set(callback); + } + + void reportMetrics(final long requestBytes, final long responseBytes, final long startTimeNanos) { + final MetricsCallback callback = callbackRef.get(); + if (callback != null) { + callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos); + } + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java index 94b963cb0cdf..b4722828802e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/OldApiUtils.java @@ -21,7 +21,6 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.auth.DefaultApiSecurityContext; -import io.confluent.ksql.internal.PullQueryExecutorMetrics; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; @@ -55,7 +54,7 @@ static void handleOldApiRequest( final Server server, final RoutingContext routingContext, final Class requestClass, - final Optional pullQueryMetrics, + final Optional metricsCallbackHolder, final BiFunction> requestor) { final long startTimeNanos = Time.SYSTEM.nanoseconds(); final T requestObject; @@ -69,20 +68,17 @@ static void handleOldApiRequest( } else { requestObject = null; } - pullQueryMetrics - .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize( - routingContext.request().bytesRead())); final CompletableFuture completableFuture = requestor .apply(requestObject, DefaultApiSecurityContext.create(routingContext)); completableFuture.thenAccept(endpointResponse -> { handleOldApiResponse( - server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos); + server, routingContext, endpointResponse, metricsCallbackHolder, startTimeNanos); }).exceptionally(t -> { if (t instanceof CompletionException) { t = t.getCause(); } handleOldApiResponse( - server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos); + server, routingContext, mapException(t), metricsCallbackHolder, startTimeNanos); return null; }); } @@ -90,7 +86,7 @@ static void handleOldApiRequest( static void handleOldApiResponse( final Server server, final RoutingContext routingContext, final EndpointResponse endpointResponse, - final Optional pullQueryMetrics, + final Optional metricsCallbackHolder, final long startTimeNanos ) { final HttpServerResponse response = routingContext.response(); @@ -111,7 +107,8 @@ static void handleOldApiResponse( return; } response.putHeader(TRANSFER_ENCODING, CHUNKED_ENCODING); - streamEndpointResponse(server, routingContext, streamingOutput); + streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbackHolder, + startTimeNanos); } else { if (endpointResponse.getEntity() == null) { response.end(); @@ -124,18 +121,15 @@ static void handleOldApiResponse( } response.end(responseBody); } + reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); } - pullQueryMetrics - .ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize( - routingContext.response().bytesWritten())); - pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics - .recordLatency(startTimeNanos)); - } private static void streamEndpointResponse(final Server server, final RoutingContext routingContext, - final StreamingOutput streamingOutput) { + final StreamingOutput streamingOutput, + final Optional metricsCallbackHolder, + final long startTimeNanos) { final WorkerExecutor workerExecutor = server.getWorkerExecutor(); final VertxCompletableFuture vcf = new VertxCompletableFuture<>(); workerExecutor.executeBlocking(promise -> { @@ -162,6 +156,21 @@ private static void streamEndpointResponse(final Server server, } } }, vcf); + vcf.handle((v, throwable) -> { + reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); + return null; + }); + } + + private static void reportMetrics( + final RoutingContext routingContext, + final Optional metricsCallbackHolder, + final long startTimeNanos + ) { + metricsCallbackHolder.ifPresent(mc -> mc.reportMetrics( + routingContext.request().bytesRead(), + routingContext.response().bytesWritten(), + startTimeNanos)); } public static EndpointResponse mapException(final Throwable exception) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java index cff4921f15de..5b4bcd1c2d48 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java @@ -26,6 +26,7 @@ import io.vertx.ext.web.RoutingContext; import java.util.Objects; import java.util.Optional; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +77,11 @@ public void handle(final RoutingContext routingContext) { return; } + final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder(); + final long startTimeNanos = Time.SYSTEM.nanoseconds(); endpoints.createQueryPublisher(queryStreamArgs.get().sql, queryStreamArgs.get().properties, - context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext)) + context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext), + metricsCallbackHolder) .thenAccept(queryPublisher -> { final QueryResponseMetadata metadata; @@ -88,7 +92,13 @@ public void handle(final RoutingContext routingContext) { queryPublisher.getColumnTypes()); // When response is complete, publisher should be closed - routingContext.response().endHandler(v -> queryPublisher.close()); + routingContext.response().endHandler(v -> { + queryPublisher.close(); + metricsCallbackHolder.reportMetrics( + routingContext.request().bytesRead(), + routingContext.response().bytesWritten(), + startTimeNanos); + }); } else { final PushQueryHolder query = connectionQueryManager .createApiQuery(queryPublisher, routingContext.request()); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 0495ebef7bb9..d3778311d3f1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -237,14 +237,17 @@ private void handleQueryRequest(final RoutingContext routingContext) { final CompletableFuture connectionClosedFuture = new CompletableFuture<>(); routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null)); - handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics, + final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder(); + handleOldApiRequest(server, routingContext, KsqlRequest.class, + Optional.of(metricsCallbackHolder), (request, apiSecurityContext) -> endpoints .executeQueryRequest( request, server.getWorkerExecutor(), connectionClosedFuture, DefaultApiSecurityContext.create(routingContext), isInternalRequest(routingContext), - getContentType(routingContext) + getContentType(routingContext), + metricsCallbackHolder ) ); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java index 7af027a1dfed..b61c5314482a 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/spi/Endpoints.java @@ -18,6 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.rest.EndpointResponse; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; import io.confluent.ksql.rest.entity.HeartbeatMessage; @@ -51,7 +52,8 @@ public interface Endpoints { * @return A CompletableFuture representing the future result of the operation */ CompletableFuture createQueryPublisher(String sql, JsonObject properties, - Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext); + Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext, + MetricsCallbackHolder metricsCallbackHolder); /** * Create a subscriber which will receive a stream of inserts from the API server and process @@ -83,7 +85,8 @@ CompletableFuture executeQueryRequest( KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType); + KsqlMediaType mediaType, + MetricsCallbackHolder metricsCallbackHolder); CompletableFuture executeInfo(ApiSecurityContext apiSecurityContext); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java index d1b0bca2fc40..70d6c7bf7391 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerEndpoints.java @@ -24,6 +24,7 @@ import io.confluent.ksql.api.impl.QueryEndpoint; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.engine.KsqlEngine; @@ -142,7 +143,8 @@ public CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { final KsqlSecurityContext ksqlSecurityContext = ksqlSecurityContextProvider .provide(apiSecurityContext); return executeOnWorker(() -> { @@ -155,7 +157,8 @@ public CompletableFuture createQueryPublisher(final String sql, properties, context, workerExecutor, - ksqlSecurityContext.getServiceContext()); + ksqlSecurityContext.getServiceContext(), + metricsCallbackHolder); } finally { ksqlSecurityContext.getServiceContext().close(); } @@ -194,7 +197,8 @@ public CompletableFuture executeQueryRequest( final CompletableFuture connectionClosedFuture, final ApiSecurityContext apiSecurityContext, final Optional isInternalRequest, - final KsqlMediaType mediaType + final KsqlMediaType mediaType, + final MetricsCallbackHolder metricsCallbackHolder ) { return executeOldApiEndpointOnWorker(apiSecurityContext, ksqlSecurityContext -> streamedQueryResource.streamQuery( @@ -202,7 +206,8 @@ public CompletableFuture executeQueryRequest( request, connectionClosedFuture, isInternalRequest, - mediaType + mediaType, + metricsCallbackHolder ), workerExecutor); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index ad214c97c314..b86e061330f2 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -108,15 +108,9 @@ public synchronized void subscribe(final Subscriber> sub true ); - result.onCompletionOrException((v, t) -> decrementer.decrementAtMostOnce()); - result.onCompletion(v -> { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - }); result.onCompletionOrException((v, throwable) -> { decrementer.decrementAtMostOnce(); - if (throwable == null) { - pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); - } + pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos)); }); final PullQuerySubscription subscription = new PullQuerySubscription( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index d304bc000a85..46905072be52 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; import io.confluent.ksql.analyzer.PullQueryValidator; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.PullQueryExecutionUtil; @@ -193,7 +194,8 @@ public EndpointResponse streamQuery( final KsqlRequest request, final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, - final KsqlMediaType mediaType + final KsqlMediaType mediaType, + final MetricsCallbackHolder metricsCallbackHolder ) { throwIfNotConfigured(); activenessRegistrar.updateLastRequestTime(); @@ -204,7 +206,7 @@ public EndpointResponse streamQuery( commandQueue, request, commandQueueCatchupTimeout); return handleStatement(securityContext, request, statement, connectionClosedFuture, - isInternalRequest, mediaType); + isInternalRequest, mediaType, metricsCallbackHolder); } private void throwIfNotConfigured() { @@ -233,7 +235,8 @@ private EndpointResponse handleStatement( final PreparedStatement statement, final CompletableFuture connectionClosedFuture, final Optional isInternalRequest, - final KsqlMediaType mediaType + final KsqlMediaType mediaType, + final MetricsCallbackHolder metricsCallbackHolder ) { try { authorizationValidator.ifPresent(validator -> @@ -255,8 +258,8 @@ private EndpointResponse handleStatement( configProperties, request.getRequestProperties(), isInternalRequest, - pullQueryMetrics, - connectionClosedFuture + connectionClosedFuture, + metricsCallbackHolder ); } @@ -295,9 +298,18 @@ private EndpointResponse handlePullQuery( final Map configOverrides, final Map requestProperties, final Optional isInternalRequest, - final Optional pullQueryMetrics, - final CompletableFuture connectionClosedFuture + final CompletableFuture connectionClosedFuture, + final MetricsCallbackHolder metricsCallbackHolder ) { + // First thing, set the metrics callback so that it gets called, even if we hit an error + metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> { + pullQueryMetrics.ifPresent(metrics -> { + metrics.recordRequestSize(requestBytes); + metrics.recordResponseSize(responseBytes); + metrics.recordLatency(startTimeNanos); + }); + }); + final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, configOverrides)); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java index d14319b52cf0..5e2656c057c6 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java @@ -18,6 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.api.utils.RowGenerator; @@ -65,7 +66,8 @@ public class TestEndpoints implements Endpoints { @Override public synchronized CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { CompletableFuture completableFuture = new CompletableFuture<>(); if (createQueryPublisherException != null) { createQueryPublisherException.fillInStackTrace(); @@ -143,7 +145,7 @@ public CompletableFuture executeTerminate(final ClusterTermina public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java index 24199178f2a4..be7108c2f8d5 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/InsertsStreamRunner.java @@ -18,6 +18,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BaseSubscriber; @@ -162,7 +163,8 @@ public CompletableFuture createQueryPublisher(final String sql, final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { return null; } @@ -192,7 +194,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java index 7ed289233527..a944914b3f7d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/PullQueryRunner.java @@ -25,6 +25,7 @@ import io.confluent.ksql.api.auth.ApiSecurityContext; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; import io.confluent.ksql.reactive.BufferedPublisher; @@ -120,7 +121,8 @@ public synchronized CompletableFuture createQueryPublisher(final final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { PullQueryPublisher publisher = new PullQueryPublisher(context, DEFAULT_ROWS); publishers.add(publisher); return CompletableFuture.completedFuture(publisher); @@ -152,7 +154,7 @@ public CompletableFuture executeTerminate( public CompletableFuture executeQueryRequest(KsqlRequest request, WorkerExecutor workerExecutor, CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java index 99910daecdfc..a3d36e8b280b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/perf/QueryStreamRunner.java @@ -24,6 +24,7 @@ import io.confluent.ksql.api.impl.BlockingQueryPublisher; import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.QueryHandle; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.api.spi.QueryPublisher; @@ -99,7 +100,8 @@ public synchronized CompletableFuture createQueryPublisher(final final JsonObject properties, final Context context, final WorkerExecutor workerExecutor, - final ApiSecurityContext apiSecurityContext) { + final ApiSecurityContext apiSecurityContext, + final MetricsCallbackHolder metricsCallbackHolder) { QueryStreamPublisher publisher = new QueryStreamPublisher(context, server.getWorkerExecutor()); publisher.setQueryHandle(new TestQueryHandle(), false); @@ -138,7 +140,8 @@ public CompletableFuture executeQueryRequest( CompletableFuture connectionClosedFuture, ApiSecurityContext apiSecurityContext, Optional isInternalRequest, - KsqlMediaType mediaType) { + KsqlMediaType mediaType, + final MetricsCallbackHolder metricsCallbackHolder) { return null; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 566535ea5246..62653b4deb3a 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -49,6 +49,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.api.server.MetricsCallbackHolder; import io.confluent.ksql.api.server.StreamingOutput; import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.engine.KsqlEngine; @@ -272,7 +273,8 @@ ERROR_CODE_BAD_STATEMENT, errorMsg, PULL_QUERY_STRING, new KsqlEntityList())) new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); // Then: @@ -342,7 +344,8 @@ public void shouldRateLimit() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); // Then: @@ -352,9 +355,6 @@ public void shouldRateLimit() { @Test public void shouldReachConcurrentLimit() { // Given: - when(mockKsqlEngine.executePullQuery(any(), any(), any(), any(), any(), any(), anyBoolean())) - .thenReturn(pullQueryResult); - when(pullQueryResult.getPullQueryQueue()).thenReturn(pullQueryQueue); when(rateLimiter.tryAcquire()).thenReturn(true); when(concurrencyLimiter.increment()).thenThrow(new KsqlException("concurrencyLimiter Error!")); @@ -365,7 +365,8 @@ public void shouldReachConcurrentLimit() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT); + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder()); // Then: assertThat(response.getStatus(), is(500)); @@ -411,7 +412,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ) ); @@ -434,7 +436,8 @@ public void shouldReturn400OnBadStatement() { new KsqlRequest("query", Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ) ); @@ -452,7 +455,8 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); // Then: @@ -467,7 +471,8 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); // Then: @@ -489,7 +494,8 @@ public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumb new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), 3L), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ) ); @@ -513,7 +519,8 @@ public void shouldNotCreateExternalClientsForPullQuery() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); // Then: @@ -538,7 +545,8 @@ public void shouldReturnForbiddenKafkaAccessForPullQueryAuthorizationDenied() { new KsqlRequest(PULL_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -594,7 +602,8 @@ public void shouldThrowOnDenyListedStreamProperty() { ), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); // Then: @@ -676,7 +685,8 @@ public void shouldStreamRowsCorrectly() throws Throwable { new KsqlRequest(queryString, requestStreamsProperties, Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); final PipedOutputStream responseOutputStream = new EOFPipedOutputStream(); final PipedInputStream responseInputStream = new PipedInputStream(responseOutputStream, 1); @@ -820,7 +830,8 @@ public void shouldUpdateTheLastRequestTime() { new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); // Then: @@ -842,7 +853,8 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() new KsqlRequest(PUSH_QUERY_STRING, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); final KsqlErrorMessage responseEntity = (KsqlErrorMessage) response.getEntity(); @@ -868,7 +880,8 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ); assertEquals(response.getStatus(), AUTHORIZATION_ERROR_RESPONSE.getStatus()); @@ -899,7 +912,8 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { new KsqlRequest(PRINT_TOPIC, Collections.emptyMap(), Collections.emptyMap(), null), new CompletableFuture<>(), Optional.empty(), - KsqlMediaType.LATEST_FORMAT + KsqlMediaType.LATEST_FORMAT, + new MetricsCallbackHolder() ) );