From e5bd35d79371f1e80577cd15cfba03a1a38484ca Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Mon, 2 Dec 2019 09:54:50 -0800 Subject: [PATCH 1/3] feat: add metric for commandRunner status --- .../ksql/rest/server/KsqlRestApplication.java | 3 +- .../server/computation/CommandRunner.java | 59 +++++++++++--- .../CommandRunnerStatusMetric.java | 78 +++++++++++++++++++ .../server/computation/CommandRunnerTest.java | 6 +- .../rest/server/computation/RecoveryTest.java | 3 +- 5 files changed, 136 insertions(+), 13 deletions(-) create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 658821aa422d..9318eead1ba2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -519,7 +519,8 @@ static KsqlRestApplication buildApplication( commandStore, maxStatementRetries, new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), - serverState + serverState, + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 046a3e994d78..6e6df4f79d60 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -21,10 +21,12 @@ import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; +import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.RetryUtil; import java.io.Closeable; import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -32,6 +34,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,12 +48,17 @@ */ public class CommandRunner implements Closeable { + private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app"; + private static final String METRIC_GROUP_POST_FIX = "-command-runner-status"; + private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX; + private static final Logger log = LoggerFactory.getLogger(CommandRunner.class); private static final int STATEMENT_RETRY_MS = 100; private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000; private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(MAX_STATEMENT_RETRY_MS); private static final int SHUTDOWN_TIMEOUT_MS = 3 * MAX_STATEMENT_RETRY_MS; + private static final Duration COMMAND_RUNNER_HEALTH_TIMEOUT = Duration.ofMillis(15000); private final InteractiveStatementExecutor statementExecutor; private final CommandQueue commandStore; @@ -58,13 +67,21 @@ public class CommandRunner implements Closeable { private final int maxRetries; private final ClusterTerminator clusterTerminator; private final ServerState serverState; + private final CommandRunnerStatusMetric commandRunnerStatusMetric; + private final AtomicReference> currentCommandRef; + + protected enum CommandRunnerStatus { + RUNNING, + ERROR + } public CommandRunner( final InteractiveStatementExecutor statementExecutor, final CommandQueue commandStore, final int maxRetries, final ClusterTerminator clusterTerminator, - final ServerState serverState + final ServerState serverState, + final String ksqlServiceId ) { this( statementExecutor, @@ -72,7 +89,8 @@ public CommandRunner( maxRetries, clusterTerminator, Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")), - serverState + serverState, + ksqlServiceId ); } @@ -83,7 +101,8 @@ public CommandRunner( final int maxRetries, final ClusterTerminator clusterTerminator, final ExecutorService executor, - final ServerState serverState + final ServerState serverState, + final String ksqlServiceId ) { this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); @@ -91,6 +110,8 @@ public CommandRunner( this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator"); this.executor = Objects.requireNonNull(executor, "executor"); this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.currentCommandRef = new AtomicReference<>(null); + this.commandRunnerStatusMetric = new CommandRunnerStatusMetric(ksqlServiceId, this); } /** @@ -114,6 +135,7 @@ public void close() { } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } + commandRunnerStatusMetric.close(); commandStore.close(); } @@ -128,13 +150,17 @@ public void processPriorCommands() { return; } restoreCommands.forEach( - command -> RetryUtil.retryWithBackoff( - maxRetries, - STATEMENT_RETRY_MS, - MAX_STATEMENT_RETRY_MS, - () -> statementExecutor.handleRestore(command), - WakeupException.class - ) + command -> { + currentCommandRef.set(new Pair<>(command, Instant.now())); + RetryUtil.retryWithBackoff( + maxRetries, + STATEMENT_RETRY_MS, + MAX_STATEMENT_RETRY_MS, + () -> statementExecutor.handleRestore(command), + WakeupException.class + ); + currentCommandRef.set(null); + } ); final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine(); ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start); @@ -174,6 +200,7 @@ private void executeStatement(final QueuedCommand queuedCommand) { } }; + currentCommandRef.set(new Pair<>(queuedCommand, Instant.now())); RetryUtil.retryWithBackoff( maxRetries, STATEMENT_RETRY_MS, @@ -181,6 +208,7 @@ private void executeStatement(final QueuedCommand queuedCommand) { task, WakeupException.class ); + currentCommandRef.set(null); } private static Optional findTerminateCommand( @@ -204,6 +232,17 @@ private void terminateCluster(final Command command) { log.info("The KSQL server was terminated."); } + protected CommandRunnerStatus checkCommandRunnerStatus() { + final Pair currentCommand = currentCommandRef.get(); + if (currentCommand == null) { + return CommandRunnerStatus.RUNNING; + } + + return Duration.between(currentCommand.right, Instant.now()).toMillis() + < COMMAND_RUNNER_HEALTH_TIMEOUT.toMillis() + ? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR; + } + private class Runner implements Runnable { @Override diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java new file mode 100644 index 000000000000..3359c4c01756 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java @@ -0,0 +1,78 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.metrics.MetricCollectors; +import io.confluent.ksql.util.KsqlConstants; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Objects; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; + +/** + * Emits a JMX metric that indicates the health of the CommandRunner thread. + */ +public class CommandRunnerStatusMetric implements Closeable { + + private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app"; + private static final String METRIC_GROUP_POST_FIX = "-command-runner"; + private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX; + + private final Metrics metrics; + private final MetricName metricName; + + CommandRunnerStatusMetric( + final String ksqlServiceId, + final CommandRunner commandRunner + ) { + this( + MetricCollectors.getMetrics(), + commandRunner, + ksqlServiceId + ); + } + + @VisibleForTesting + CommandRunnerStatusMetric( + final Metrics metrics, + final CommandRunner commandRunner, + final String ksqlServiceId + ) { + this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.metricName = metrics.metricName( + "status", + KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId + metricGroupName, + "The status of the commandRunner thread as it processes the command topic.", + Collections.emptyMap() + ); + + this.metrics.addMetric(metricName, (Gauge) + (config, now) -> commandRunner.checkCommandRunnerStatus().name()); + } + + /** + * Close the metric + */ + @Override + public void close() { + metrics.removeMetric(metricName); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 5aa9515cf176..51086a2fc8dc 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -29,6 +29,7 @@ import static org.mockito.hamcrest.MockitoHamcrest.argThat; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; @@ -72,6 +73,7 @@ public class CommandRunnerTest { @Before public void setup() { + MetricCollectors.initialize(); when(statementExecutor.getKsqlEngine()).thenReturn(ksqlEngine); when(command.getStatement()).thenReturn("something that is not terminate"); @@ -90,7 +92,9 @@ public void setup() { 1, clusterTerminator, executor, - serverState); + serverState, + "ksql-service-id" + ); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index e202dccd3e13..1d8d98917883 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -207,7 +207,8 @@ private class KsqlServer { fakeCommandQueue, 1, mock(ClusterTerminator.class), - serverState + serverState, + "ksql-service-id" ); this.ksqlResource = new KsqlResource( From 119a5b49c0cf4608fe487655b64429a3d6adab46 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Wed, 4 Dec 2019 11:22:06 -0800 Subject: [PATCH 2/3] separate metric into separate class and added tests --- .../ksql/rest/server/KsqlRestApplication.java | 4 +- .../ksql/rest/server/KsqlRestConfig.java | 14 +++ .../server/computation/CommandRunner.java | 34 +++--- .../CommandRunnerStatusMetric.java | 12 +- .../CommandRunnerStatusMetricTest.java | 108 ++++++++++++++++++ .../server/computation/CommandRunnerTest.java | 82 ++++++++++++- .../rest/server/computation/RecoveryTest.java | 6 +- 7 files changed, 238 insertions(+), 22 deletions(-) create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 9318eead1ba2..7ff247c22683 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -520,7 +520,9 @@ static KsqlRestApplication buildApplication( maxStatementRetries, new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), serverState, - ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS)), + metricsPrefix ); final KsqlResource ksqlResource = new KsqlResource( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index e98aad32d6e9..fd4bf3cc0fb7 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -77,6 +77,13 @@ public class KsqlRestConfig extends RestConfig { "Minimum time between consecutive health check evaluations. Health check queries before " + "the interval has elapsed will receive cached responses."; + static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS = + KSQL_CONFIG_PREFIX + "server.command.runner.healthcheck.ms"; + + private static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC = + "How long to wait for the command runner to process a command from the command topic " + + "before reporting an error metric."; + private static final ConfigDef CONFIG_DEF; static { @@ -122,6 +129,13 @@ public class KsqlRestConfig extends RestConfig { 5000L, Importance.LOW, KSQL_HEALTHCHECK_INTERVAL_MS_DOC + ) + .define( + KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS, + Type.LONG, + 15000L, + Importance.LOW, + KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 6e6df4f79d60..50157158a434 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -35,7 +35,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +46,12 @@ * Also responsible for taking care of any exceptions that occur in the process. */ public class CommandRunner implements Closeable { - - private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app"; - private static final String METRIC_GROUP_POST_FIX = "-command-runner-status"; - private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX; - private static final Logger log = LoggerFactory.getLogger(CommandRunner.class); private static final int STATEMENT_RETRY_MS = 100; private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000; private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(MAX_STATEMENT_RETRY_MS); private static final int SHUTDOWN_TIMEOUT_MS = 3 * MAX_STATEMENT_RETRY_MS; - private static final Duration COMMAND_RUNNER_HEALTH_TIMEOUT = Duration.ofMillis(15000); private final InteractiveStatementExecutor statementExecutor; private final CommandQueue commandStore; @@ -67,10 +60,12 @@ public class CommandRunner implements Closeable { private final int maxRetries; private final ClusterTerminator clusterTerminator; private final ServerState serverState; + private final CommandRunnerStatusMetric commandRunnerStatusMetric; private final AtomicReference> currentCommandRef; + private final Duration commandRunnerHealthTimeout; - protected enum CommandRunnerStatus { + public enum CommandRunnerStatus { RUNNING, ERROR } @@ -81,7 +76,9 @@ public CommandRunner( final int maxRetries, final ClusterTerminator clusterTerminator, final ServerState serverState, - final String ksqlServiceId + final String ksqlServiceId, + final Duration commandRunnerHealthTimeout, + final String metricsGroupPrefix ) { this( statementExecutor, @@ -90,7 +87,9 @@ public CommandRunner( clusterTerminator, Executors.newSingleThreadExecutor(r -> new Thread(r, "CommandRunner")), serverState, - ksqlServiceId + ksqlServiceId, + commandRunnerHealthTimeout, + metricsGroupPrefix ); } @@ -102,7 +101,9 @@ public CommandRunner( final ClusterTerminator clusterTerminator, final ExecutorService executor, final ServerState serverState, - final String ksqlServiceId + final String ksqlServiceId, + final Duration commandRunnerHealthTimeout, + final String metricsGroupPrefix ) { this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); @@ -110,8 +111,11 @@ public CommandRunner( this.clusterTerminator = Objects.requireNonNull(clusterTerminator, "clusterTerminator"); this.executor = Objects.requireNonNull(executor, "executor"); this.serverState = Objects.requireNonNull(serverState, "serverState"); + this.commandRunnerHealthTimeout = + Objects.requireNonNull(commandRunnerHealthTimeout, "commandRunnerHealthTimeout"); this.currentCommandRef = new AtomicReference<>(null); - this.commandRunnerStatusMetric = new CommandRunnerStatusMetric(ksqlServiceId, this); + this.commandRunnerStatusMetric = + new CommandRunnerStatusMetric(ksqlServiceId, this, metricsGroupPrefix); } /** @@ -232,17 +236,17 @@ private void terminateCluster(final Command command) { log.info("The KSQL server was terminated."); } - protected CommandRunnerStatus checkCommandRunnerStatus() { + CommandRunnerStatus checkCommandRunnerStatus() { final Pair currentCommand = currentCommandRef.get(); if (currentCommand == null) { return CommandRunnerStatus.RUNNING; } return Duration.between(currentCommand.right, Instant.now()).toMillis() - < COMMAND_RUNNER_HEALTH_TIMEOUT.toMillis() + < commandRunnerHealthTimeout.toMillis() ? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR; } - + private class Runner implements Runnable { @Override diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java index 3359c4c01756..ebb9ee968ea2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java @@ -34,19 +34,21 @@ public class CommandRunnerStatusMetric implements Closeable { private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-rest-app"; private static final String METRIC_GROUP_POST_FIX = "-command-runner"; - private static final String metricGroupName = DEFAULT_METRIC_GROUP_PREFIX + METRIC_GROUP_POST_FIX; private final Metrics metrics; private final MetricName metricName; + private final String metricGroupName; CommandRunnerStatusMetric( final String ksqlServiceId, - final CommandRunner commandRunner + final CommandRunner commandRunner, + final String metricGroupPrefix ) { this( MetricCollectors.getMetrics(), commandRunner, - ksqlServiceId + ksqlServiceId, + metricGroupPrefix.isEmpty() ? DEFAULT_METRIC_GROUP_PREFIX : metricGroupPrefix ); } @@ -54,9 +56,11 @@ public class CommandRunnerStatusMetric implements Closeable { CommandRunnerStatusMetric( final Metrics metrics, final CommandRunner commandRunner, - final String ksqlServiceId + final String ksqlServiceId, + final String metricsGroupPrefix ) { this.metrics = Objects.requireNonNull(metrics, "metrics"); + this.metricGroupName = metricsGroupPrefix + METRIC_GROUP_POST_FIX; this.metricName = metrics.metricName( "status", KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId + metricGroupName, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java new file mode 100644 index 000000000000..0e56d3da3362 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetricTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.computation; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; + +@RunWith(MockitoJUnitRunner.class) +public class CommandRunnerStatusMetricTest { + + private static final MetricName METRIC_NAME = + new MetricName("bob", "g1", "d1", ImmutableMap.of()); + private static final String KSQL_SERVICE_ID = "kcql-1-"; + + @Mock + private Metrics metrics; + @Mock + private CommandRunner commandRunner; + @Captor + private ArgumentCaptor> gaugeCaptor; + + private CommandRunnerStatusMetric commandRunnerStatusMetric; + + @Before + public void setUp() { + when(metrics.metricName(any(), any(), any(), anyMap())).thenReturn(METRIC_NAME); + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING); + + commandRunnerStatusMetric = new CommandRunnerStatusMetric(metrics, commandRunner, KSQL_SERVICE_ID, "rest"); + } + + @Test + public void shouldAddMetricOnCreation() { + // When: + // Listener created in setup + + // Then: + verify(metrics).metricName("status", "_confluent-ksql-kcql-1-rest-command-runner", + "The status of the commandRunner thread as it processes the command topic.", + Collections.emptyMap()); + + verify(metrics).addMetric(eq(METRIC_NAME), isA(Gauge.class)); + } + + @Test + public void shouldInitiallyBeRunningState() { + // When: + // CommandRunnerStatusMetric created in setup + + // Then: + assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.RUNNING.name())); + } + + @Test + public void shouldUpdateToErrorState() { + // When: + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.ERROR); + + // Then: + assertThat(currentGaugeValue(), is(CommandRunner.CommandRunnerStatus.ERROR.name())); + } + + @Test + public void shouldRemoveMetricOnClose() { + // When: + commandRunnerStatusMetric.close(); + + // Then: + verify(metrics).removeMetric(METRIC_NAME); + } + + private String currentGaugeValue() { + verify(metrics).addMetric(any(), gaugeCaptor.capture()); + return gaugeCaptor.getValue().value(null, 0L); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index 51086a2fc8dc..e3f112b4d02b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -15,7 +15,9 @@ package io.confluent.ksql.rest.server.computation; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; @@ -35,7 +37,13 @@ import io.confluent.ksql.rest.util.TerminateCluster; import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,6 +54,7 @@ @RunWith(MockitoJUnitRunner.class) public class CommandRunnerTest { + private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 2000; @Mock private InteractiveStatementExecutor statementExecutor; @@ -93,7 +102,9 @@ public void setup() { clusterTerminator, executor, serverState, - "ksql-service-id" + "ksql-service-id", + Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT), + "" ); } @@ -171,6 +182,75 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() { verify(statementExecutor, never()).handleRestore(queuedCommand3); } + @Test + public void shouldReportRunningIfNotStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException { + try { + checkCommandRunnerStatus( + COMMAND_RUNNER_HEALTH_TIMEOUT - 500, + COMMAND_RUNNER_HEALTH_TIMEOUT - 1000, + CommandRunner.CommandRunnerStatus.RUNNING + ); + } catch (Exception e) { + // fail test if an exception happens + assertThat(true, equalTo(false)); + } + } + + @Test + public void shouldReportErrorIfStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException { + try { + checkCommandRunnerStatus( + COMMAND_RUNNER_HEALTH_TIMEOUT + 1000, + COMMAND_RUNNER_HEALTH_TIMEOUT + 500, + CommandRunner.CommandRunnerStatus.ERROR + ); + } catch (Exception e) { + // fail test if an exception happens + assertThat(true, equalTo(false)); + } + } + + private void checkCommandRunnerStatus( + long commandProcessingTimeMs, + long timeToCheckMetricMs, + CommandRunner.CommandRunnerStatus expectedStatus + ) throws BrokenBarrierException, InterruptedException, ExecutionException { + // Given: + givenQueuedCommands(queuedCommand1); + doAnswer((Answer) invocation -> { + Thread.sleep(commandProcessingTimeMs); + return null; + }).when(statementExecutor).handleStatement(queuedCommand1); + + // When: + final CyclicBarrier gate = new CyclicBarrier(3); + AtomicReference expectedException = new AtomicReference<>(null); + (new Thread(() -> { + try { + gate.await(); + commandRunner.fetchAndRunCommands(); + } catch (Exception e) { + expectedException.set(e); + } + })).start(); + + CompletableFuture statusFuture = new CompletableFuture<>(); + (new Thread(() -> { + try { + gate.await(); + Thread.sleep(timeToCheckMetricMs); + statusFuture.complete(commandRunner.checkCommandRunnerStatus()); + } catch (Exception e) { + expectedException.set(e); + } + })).start(); + + // Then: + gate.await(); + assertThat(statusFuture.get(), equalTo(expectedStatus)); + assertThat(expectedException.get(), equalTo(null)); + } + @Test public void shouldEarlyOutOnShutdown() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 1d8d98917883..d885c6587ae4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -34,6 +34,7 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.MetaStoreImpl; import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; @@ -202,13 +203,16 @@ private class KsqlServer { queryIdGenerator ); + MetricCollectors.initialize(); this.commandRunner = new CommandRunner( statementExecutor, fakeCommandQueue, 1, mock(ClusterTerminator.class), serverState, - "ksql-service-id" + "ksql-service-id", + Duration.ofMillis(2000), + "" ); this.ksqlResource = new KsqlResource( From 6a27b995fa8338b6a3d528044647b12cbc107c77 Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Thu, 5 Dec 2019 12:11:56 -0800 Subject: [PATCH 3/3] add clock --- .../ksql/rest/server/KsqlRestApplication.java | 3 +- .../ksql/rest/server/KsqlRestConfig.java | 10 +- .../server/computation/CommandRunner.java | 21 +++-- .../server/computation/CommandRunnerTest.java | 92 +++++++------------ 4 files changed, 57 insertions(+), 69 deletions(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 7ff247c22683..40b894b08d29 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -521,7 +521,8 @@ static KsqlRestApplication buildApplication( new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), serverState, ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), - Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS)), + Duration.ofMillis(restConfig.getLong( + KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)), metricsPrefix ); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index fd4bf3cc0fb7..a2e0e97ea7e2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -77,10 +77,10 @@ public class KsqlRestConfig extends RestConfig { "Minimum time between consecutive health check evaluations. Health check queries before " + "the interval has elapsed will receive cached responses."; - static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS = - KSQL_CONFIG_PREFIX + "server.command.runner.healthcheck.ms"; + static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS = + KSQL_CONFIG_PREFIX + "server.command.blocked.threshold.error.ms"; - private static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC = + private static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC = "How long to wait for the command runner to process a command from the command topic " + "before reporting an error metric."; @@ -131,11 +131,11 @@ public class KsqlRestConfig extends RestConfig { KSQL_HEALTHCHECK_INTERVAL_MS_DOC ) .define( - KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS, + KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS, Type.LONG, 15000L, Importance.LOW, - KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC + KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 50157158a434..5adb297d554a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -25,6 +25,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.RetryUtil; import java.io.Closeable; +import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Collections; @@ -64,6 +65,7 @@ public class CommandRunner implements Closeable { private final CommandRunnerStatusMetric commandRunnerStatusMetric; private final AtomicReference> currentCommandRef; private final Duration commandRunnerHealthTimeout; + private final Clock clock; public enum CommandRunnerStatus { RUNNING, @@ -89,7 +91,8 @@ public CommandRunner( serverState, ksqlServiceId, commandRunnerHealthTimeout, - metricsGroupPrefix + metricsGroupPrefix, + Clock.systemUTC() ); } @@ -103,7 +106,8 @@ public CommandRunner( final ServerState serverState, final String ksqlServiceId, final Duration commandRunnerHealthTimeout, - final String metricsGroupPrefix + final String metricsGroupPrefix, + final Clock clock ) { this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor"); this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); @@ -116,6 +120,7 @@ public CommandRunner( this.currentCommandRef = new AtomicReference<>(null); this.commandRunnerStatusMetric = new CommandRunnerStatusMetric(ksqlServiceId, this, metricsGroupPrefix); + this.clock = clock; } /** @@ -155,7 +160,7 @@ public void processPriorCommands() { } restoreCommands.forEach( command -> { - currentCommandRef.set(new Pair<>(command, Instant.now())); + currentCommandRef.set(new Pair<>(command, clock.instant())); RetryUtil.retryWithBackoff( maxRetries, STATEMENT_RETRY_MS, @@ -204,7 +209,7 @@ private void executeStatement(final QueuedCommand queuedCommand) { } }; - currentCommandRef.set(new Pair<>(queuedCommand, Instant.now())); + currentCommandRef.set(new Pair<>(queuedCommand, clock.instant())); RetryUtil.retryWithBackoff( maxRetries, STATEMENT_RETRY_MS, @@ -241,12 +246,16 @@ CommandRunnerStatus checkCommandRunnerStatus() { if (currentCommand == null) { return CommandRunnerStatus.RUNNING; } - - return Duration.between(currentCommand.right, Instant.now()).toMillis() + + return Duration.between(currentCommand.right, clock.instant()).toMillis() < commandRunnerHealthTimeout.toMillis() ? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR; } + Pair getCurrentCommand() { + return currentCommandRef.get(); + } + private class Runner implements Runnable { @Override diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index e3f112b4d02b..c729d386259f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -16,12 +16,15 @@ package io.confluent.ksql.rest.server.computation; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNotNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -29,19 +32,21 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; import io.confluent.ksql.rest.util.TerminateCluster; + +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; @@ -54,7 +59,7 @@ @RunWith(MockitoJUnitRunner.class) public class CommandRunnerTest { - private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 2000; + private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000; @Mock private InteractiveStatementExecutor statementExecutor; @@ -67,6 +72,8 @@ public class CommandRunnerTest { @Mock private KsqlEngine ksqlEngine; @Mock + private Clock clock; + @Mock private Command command; @Mock private Command clusterTerminate; @@ -104,7 +111,8 @@ public void setup() { serverState, "ksql-service-id", Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT), - "" + "", + clock ); } @@ -183,71 +191,41 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() { } @Test - public void shouldReportRunningIfNotStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException { - try { - checkCommandRunnerStatus( - COMMAND_RUNNER_HEALTH_TIMEOUT - 500, - COMMAND_RUNNER_HEALTH_TIMEOUT - 1000, - CommandRunner.CommandRunnerStatus.RUNNING - ); - } catch (Exception e) { - // fail test if an exception happens - assertThat(true, equalTo(false)); - } - } - - @Test - public void shouldReportErrorIfStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException { - try { - checkCommandRunnerStatus( - COMMAND_RUNNER_HEALTH_TIMEOUT + 1000, - COMMAND_RUNNER_HEALTH_TIMEOUT + 500, - CommandRunner.CommandRunnerStatus.ERROR - ); - } catch (Exception e) { - // fail test if an exception happens - assertThat(true, equalTo(false)); - } - } - - private void checkCommandRunnerStatus( - long commandProcessingTimeMs, - long timeToCheckMetricMs, - CommandRunner.CommandRunnerStatus expectedStatus - ) throws BrokenBarrierException, InterruptedException, ExecutionException { + public void shouldTransitionFromRunningToError() throws InterruptedException { // Given: givenQueuedCommands(queuedCommand1); - doAnswer((Answer) invocation -> { - Thread.sleep(commandProcessingTimeMs); + + final Instant current = Instant.now(); + final CountDownLatch handleStatementLatch = new CountDownLatch(1); + final CountDownLatch commandSetLatch = new CountDownLatch(1); + when(clock.instant()).thenReturn(current) + .thenReturn(current.plusMillis(500)) + .thenReturn(current.plusMillis(1500)) + .thenReturn(current.plusMillis(2500)); + doAnswer(invocation -> { + commandSetLatch.countDown(); + handleStatementLatch.await(); return null; }).when(statementExecutor).handleStatement(queuedCommand1); // When: - final CyclicBarrier gate = new CyclicBarrier(3); AtomicReference expectedException = new AtomicReference<>(null); - (new Thread(() -> { + final Thread commandRunnerThread = (new Thread(() -> { try { - gate.await(); commandRunner.fetchAndRunCommands(); } catch (Exception e) { expectedException.set(e); } - })).start(); - - CompletableFuture statusFuture = new CompletableFuture<>(); - (new Thread(() -> { - try { - gate.await(); - Thread.sleep(timeToCheckMetricMs); - statusFuture.complete(commandRunner.checkCommandRunnerStatus()); - } catch (Exception e) { - expectedException.set(e); - } - })).start(); + })); // Then: - gate.await(); - assertThat(statusFuture.get(), equalTo(expectedStatus)); + commandRunnerThread.start(); + commandSetLatch.await(); + assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.RUNNING)); + assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.ERROR)); + handleStatementLatch.countDown(); + commandRunnerThread.join(); + assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.RUNNING)); assertThat(expectedException.get(), equalTo(null)); }