Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make pull query metrics apply only to pull and not also push #6944

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.config.SessionConfig;
Expand Down Expand Up @@ -58,7 +59,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;

public class QueryEndpoint {

Expand Down Expand Up @@ -99,17 +99,17 @@ public QueryPublisher createQueryPublisher(
final JsonObject properties,
final Context context,
final WorkerExecutor workerExecutor,
final ServiceContext serviceContext) {
final long startTimeNanos = Time.SYSTEM.nanoseconds();
final ServiceContext serviceContext,
final MetricsCallbackHolder metricsCallbackHolder) {
// Must be run on worker as all this stuff is slow
VertxUtils.checkIsWorker();

final ConfiguredStatement<Query> statement = createStatement(sql, properties.getMap());

if (statement.getStatement().isPullQuery()) {
return createPullQueryPublisher(
context, serviceContext, statement, pullQueryMetrics,
startTimeNanos, workerExecutor);
context, serviceContext, statement, pullQueryMetrics, workerExecutor,
metricsCallbackHolder);
} else {
return createPushQueryPublisher(context, serviceContext, statement, workerExecutor);
}
Expand Down Expand Up @@ -146,9 +146,17 @@ private QueryPublisher createPullQueryPublisher(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final long startTimeNanos,
final WorkerExecutor workerExecutor
final WorkerExecutor workerExecutor,
final MetricsCallbackHolder metricsCallbackHolder
) {
// First thing, set the metrics callback so that it gets called, even if we hit an error
metricsCallbackHolder.setCallback((requestBytes, responseBytes, startTimeNanos) -> {
pullQueryMetrics.ifPresent(metrics -> {
metrics.recordRequestSize(requestBytes);
metrics.recordResponseSize(responseBytes);
metrics.recordLatency(startTimeNanos);
});
});

final RoutingOptions routingOptions = new PullQueryConfigRoutingOptions(
ksqlConfig,
Expand Down Expand Up @@ -177,9 +185,6 @@ private QueryPublisher createPullQueryPublisher(

result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
if (throwable == null) {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
}
});

final BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

/**
* Interface for reporting metrics to a resource. A resource may choose to break things down
* arbitrarily, e.g. /query is used for both push and pull queries so we let the resource
* determine how to report the metrics.
*/
public interface MetricsCallback {

/**
* Called to report metrics when the request is complete, error or success
* @param requestBytes The request bytes
* @param responseBytes The response bytes
* @param startTimeNanos The start time of the request in nanos
*/
void reportMetricsOnCompletion(long requestBytes, long responseBytes, long startTimeNanos);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

import java.util.concurrent.atomic.AtomicReference;

/**
* This class give a resource the opportunity to register a set of particular callbacks based upon
* arbitrary criteria. Once the response is complete, the callback is invoked.
*/
public class MetricsCallbackHolder {

private AtomicReference<MetricsCallback> callbackRef = new AtomicReference<>(null);

public MetricsCallbackHolder() {
}

public void setCallback(final MetricsCallback callback) {
this.callbackRef.set(callback);
}

void reportMetrics(final long requestBytes, final long responseBytes, final long startTimeNanos) {
final MetricsCallback callback = callbackRef.get();
if (callback != null) {
callback.reportMetricsOnCompletion(requestBytes, responseBytes, startTimeNanos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.auth.DefaultApiSecurityContext;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand Down Expand Up @@ -55,7 +54,7 @@ static <T> void handleOldApiRequest(
final Server server,
final RoutingContext routingContext,
final Class<T> requestClass,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final BiFunction<T, ApiSecurityContext, CompletableFuture<EndpointResponse>> requestor) {
final long startTimeNanos = Time.SYSTEM.nanoseconds();
final T requestObject;
Expand All @@ -69,28 +68,25 @@ static <T> void handleOldApiRequest(
} else {
requestObject = null;
}
pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordRequestSize(
routingContext.request().bytesRead()));
final CompletableFuture<EndpointResponse> completableFuture = requestor
.apply(requestObject, DefaultApiSecurityContext.create(routingContext));
completableFuture.thenAccept(endpointResponse -> {
handleOldApiResponse(
server, routingContext, endpointResponse, pullQueryMetrics, startTimeNanos);
server, routingContext, endpointResponse, metricsCallbackHolder, startTimeNanos);
}).exceptionally(t -> {
if (t instanceof CompletionException) {
t = t.getCause();
}
handleOldApiResponse(
server, routingContext, mapException(t), pullQueryMetrics, startTimeNanos);
server, routingContext, mapException(t), metricsCallbackHolder, startTimeNanos);
return null;
});
}

static void handleOldApiResponse(
final Server server, final RoutingContext routingContext,
final EndpointResponse endpointResponse,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final long startTimeNanos
) {
final HttpServerResponse response = routingContext.response();
Expand All @@ -111,7 +107,8 @@ static void handleOldApiResponse(
return;
}
response.putHeader(TRANSFER_ENCODING, CHUNKED_ENCODING);
streamEndpointResponse(server, routingContext, streamingOutput);
streamEndpointResponse(server, routingContext, streamingOutput, metricsCallbackHolder,
startTimeNanos);
} else {
if (endpointResponse.getEntity() == null) {
response.end();
Expand All @@ -124,18 +121,15 @@ static void handleOldApiResponse(
}
response.end(responseBody);
}
reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos);
}
pullQueryMetrics
.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics.recordResponseSize(
routingContext.response().bytesWritten()));
pullQueryMetrics.ifPresent(pullQueryExecutorMetrics -> pullQueryExecutorMetrics
.recordLatency(startTimeNanos));

}

private static void streamEndpointResponse(final Server server,
final RoutingContext routingContext,
final StreamingOutput streamingOutput) {
final StreamingOutput streamingOutput,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final long startTimeNanos) {
final WorkerExecutor workerExecutor = server.getWorkerExecutor();
final VertxCompletableFuture<Void> vcf = new VertxCompletableFuture<>();
workerExecutor.executeBlocking(promise -> {
Expand All @@ -162,6 +156,21 @@ private static void streamEndpointResponse(final Server server,
}
}
}, vcf);
vcf.handle((v, throwable) -> {
reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos);
return null;
});
}

private static void reportMetrics(
final RoutingContext routingContext,
final Optional<MetricsCallbackHolder> metricsCallbackHolder,
final long startTimeNanos
) {
metricsCallbackHolder.ifPresent(mc -> mc.reportMetrics(
routingContext.request().bytesRead(),
routingContext.response().bytesWritten(),
startTimeNanos));
}

public static EndpointResponse mapException(final Throwable exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.vertx.ext.web.RoutingContext;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,8 +77,11 @@ public void handle(final RoutingContext routingContext) {
return;
}

final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder();
final long startTimeNanos = Time.SYSTEM.nanoseconds();
endpoints.createQueryPublisher(queryStreamArgs.get().sql, queryStreamArgs.get().properties,
context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext))
context, server.getWorkerExecutor(), DefaultApiSecurityContext.create(routingContext),
metricsCallbackHolder)
.thenAccept(queryPublisher -> {

final QueryResponseMetadata metadata;
Expand All @@ -88,7 +92,13 @@ public void handle(final RoutingContext routingContext) {
queryPublisher.getColumnTypes());

// When response is complete, publisher should be closed
routingContext.response().endHandler(v -> queryPublisher.close());
routingContext.response().endHandler(v -> {
queryPublisher.close();
metricsCallbackHolder.reportMetrics(
routingContext.request().bytesRead(),
routingContext.response().bytesWritten(),
startTimeNanos);
});
} else {
final PushQueryHolder query = connectionQueryManager
.createApiQuery(queryPublisher, routingContext.request());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,17 @@ private void handleQueryRequest(final RoutingContext routingContext) {

final CompletableFuture<Void> connectionClosedFuture = new CompletableFuture<>();
routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null));
handleOldApiRequest(server, routingContext, KsqlRequest.class, pullQueryMetrics,
final MetricsCallbackHolder metricsCallbackHolder = new MetricsCallbackHolder();
handleOldApiRequest(server, routingContext, KsqlRequest.class,
Optional.of(metricsCallbackHolder),
(request, apiSecurityContext) ->
endpoints
.executeQueryRequest(
request, server.getWorkerExecutor(), connectionClosedFuture,
DefaultApiSecurityContext.create(routingContext),
isInternalRequest(routingContext),
getContentType(routingContext)
getContentType(routingContext),
metricsCallbackHolder
)

);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.HeartbeatMessage;
Expand Down Expand Up @@ -51,7 +52,8 @@ public interface Endpoints {
* @return A CompletableFuture representing the future result of the operation
*/
CompletableFuture<QueryPublisher> createQueryPublisher(String sql, JsonObject properties,
Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext);
Context context, WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext,
MetricsCallbackHolder metricsCallbackHolder);

/**
* Create a subscriber which will receive a stream of inserts from the API server and process
Expand Down Expand Up @@ -83,7 +85,8 @@ CompletableFuture<EndpointResponse> executeQueryRequest(
KsqlRequest request, WorkerExecutor workerExecutor,
CompletableFuture<Void> connectionClosedFuture, ApiSecurityContext apiSecurityContext,
Optional<Boolean> isInternalRequest,
KsqlMediaType mediaType);
KsqlMediaType mediaType,
MetricsCallbackHolder metricsCallbackHolder);

CompletableFuture<EndpointResponse> executeInfo(ApiSecurityContext apiSecurityContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.api.impl.QueryEndpoint;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -142,7 +143,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
final JsonObject properties,
final Context context,
final WorkerExecutor workerExecutor,
final ApiSecurityContext apiSecurityContext) {
final ApiSecurityContext apiSecurityContext,
final MetricsCallbackHolder metricsCallbackHolder) {
final KsqlSecurityContext ksqlSecurityContext = ksqlSecurityContextProvider
.provide(apiSecurityContext);
return executeOnWorker(() -> {
Expand All @@ -155,7 +157,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
properties,
context,
workerExecutor,
ksqlSecurityContext.getServiceContext());
ksqlSecurityContext.getServiceContext(),
metricsCallbackHolder);
} finally {
ksqlSecurityContext.getServiceContext().close();
}
Expand Down Expand Up @@ -194,15 +197,17 @@ public CompletableFuture<EndpointResponse> executeQueryRequest(
final CompletableFuture<Void> connectionClosedFuture,
final ApiSecurityContext apiSecurityContext,
final Optional<Boolean> isInternalRequest,
final KsqlMediaType mediaType
final KsqlMediaType mediaType,
final MetricsCallbackHolder metricsCallbackHolder
) {
return executeOldApiEndpointOnWorker(apiSecurityContext,
ksqlSecurityContext -> streamedQueryResource.streamQuery(
ksqlSecurityContext,
request,
connectionClosedFuture,
isInternalRequest,
mediaType
mediaType,
metricsCallbackHolder
), workerExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,9 @@ public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> sub
true
);

result.onCompletionOrException((v, t) -> decrementer.decrementAtMostOnce());
result.onCompletion(v -> {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
});
result.onCompletionOrException((v, throwable) -> {
decrementer.decrementAtMostOnce();
if (throwable == null) {
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
}
pullQueryMetrics.ifPresent(p -> p.recordLatency(startTimeNanos));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems changing the behavior that even if the result contains an error, we would still record latency. Is it intentional?

});

final PullQuerySubscription subscription = new PullQuerySubscription(
Expand Down
Loading