Skip to content

Commit

Permalink
add clock
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Dec 5, 2019
1 parent 119a5b4 commit 6a27b99
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ static KsqlRestApplication buildApplication(
new ClusterTerminator(ksqlEngine, serviceContext, managedTopics),
serverState,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS)),
Duration.ofMillis(restConfig.getLong(
KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)),
metricsPrefix
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public class KsqlRestConfig extends RestConfig {
"Minimum time between consecutive health check evaluations. Health check queries before "
+ "the interval has elapsed will receive cached responses.";

static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS =
KSQL_CONFIG_PREFIX + "server.command.runner.healthcheck.ms";
static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS =
KSQL_CONFIG_PREFIX + "server.command.blocked.threshold.error.ms";

private static final String KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC =
private static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC =
"How long to wait for the command runner to process a command from the command topic "
+ "before reporting an error metric.";

Expand Down Expand Up @@ -131,11 +131,11 @@ public class KsqlRestConfig extends RestConfig {
KSQL_HEALTHCHECK_INTERVAL_MS_DOC
)
.define(
KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS,
KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS,
Type.LONG,
15000L,
Importance.LOW,
KSQL_COMMAND_RUNNER_HEALTH_CHECK_MS_DOC
KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
import java.io.Closeable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class CommandRunner implements Closeable {
private final CommandRunnerStatusMetric commandRunnerStatusMetric;
private final AtomicReference<Pair<QueuedCommand, Instant>> currentCommandRef;
private final Duration commandRunnerHealthTimeout;
private final Clock clock;

public enum CommandRunnerStatus {
RUNNING,
Expand All @@ -89,7 +91,8 @@ public CommandRunner(
serverState,
ksqlServiceId,
commandRunnerHealthTimeout,
metricsGroupPrefix
metricsGroupPrefix,
Clock.systemUTC()
);
}

Expand All @@ -103,7 +106,8 @@ public CommandRunner(
final ServerState serverState,
final String ksqlServiceId,
final Duration commandRunnerHealthTimeout,
final String metricsGroupPrefix
final String metricsGroupPrefix,
final Clock clock
) {
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
this.commandStore = Objects.requireNonNull(commandStore, "commandStore");
Expand All @@ -116,6 +120,7 @@ public CommandRunner(
this.currentCommandRef = new AtomicReference<>(null);
this.commandRunnerStatusMetric =
new CommandRunnerStatusMetric(ksqlServiceId, this, metricsGroupPrefix);
this.clock = clock;
}

/**
Expand Down Expand Up @@ -155,7 +160,7 @@ public void processPriorCommands() {
}
restoreCommands.forEach(
command -> {
currentCommandRef.set(new Pair<>(command, Instant.now()));
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
Expand Down Expand Up @@ -204,7 +209,7 @@ private void executeStatement(final QueuedCommand queuedCommand) {
}
};

currentCommandRef.set(new Pair<>(queuedCommand, Instant.now()));
currentCommandRef.set(new Pair<>(queuedCommand, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
Expand Down Expand Up @@ -241,12 +246,16 @@ CommandRunnerStatus checkCommandRunnerStatus() {
if (currentCommand == null) {
return CommandRunnerStatus.RUNNING;
}

return Duration.between(currentCommand.right, Instant.now()).toMillis()
return Duration.between(currentCommand.right, clock.instant()).toMillis()
< commandRunnerHealthTimeout.toMillis()
? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR;
}

Pair<QueuedCommand, Instant> getCurrentCommand() {
return currentCommandRef.get();
}

private class Runner implements Runnable {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,37 @@
package io.confluent.ksql.rest.server.computation;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNotNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
Expand All @@ -54,7 +59,7 @@

@RunWith(MockitoJUnitRunner.class)
public class CommandRunnerTest {
private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 2000;
private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000;

@Mock
private InteractiveStatementExecutor statementExecutor;
Expand All @@ -67,6 +72,8 @@ public class CommandRunnerTest {
@Mock
private KsqlEngine ksqlEngine;
@Mock
private Clock clock;
@Mock
private Command command;
@Mock
private Command clusterTerminate;
Expand Down Expand Up @@ -104,7 +111,8 @@ public void setup() {
serverState,
"ksql-service-id",
Duration.ofMillis(COMMAND_RUNNER_HEALTH_TIMEOUT),
""
"",
clock
);
}

Expand Down Expand Up @@ -183,71 +191,41 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() {
}

@Test
public void shouldReportRunningIfNotStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException {
try {
checkCommandRunnerStatus(
COMMAND_RUNNER_HEALTH_TIMEOUT - 500,
COMMAND_RUNNER_HEALTH_TIMEOUT - 1000,
CommandRunner.CommandRunnerStatus.RUNNING
);
} catch (Exception e) {
// fail test if an exception happens
assertThat(true, equalTo(false));
}
}

@Test
public void shouldReportErrorIfStuckProcessingCommand() throws BrokenBarrierException, InterruptedException, ExecutionException {
try {
checkCommandRunnerStatus(
COMMAND_RUNNER_HEALTH_TIMEOUT + 1000,
COMMAND_RUNNER_HEALTH_TIMEOUT + 500,
CommandRunner.CommandRunnerStatus.ERROR
);
} catch (Exception e) {
// fail test if an exception happens
assertThat(true, equalTo(false));
}
}

private void checkCommandRunnerStatus(
long commandProcessingTimeMs,
long timeToCheckMetricMs,
CommandRunner.CommandRunnerStatus expectedStatus
) throws BrokenBarrierException, InterruptedException, ExecutionException {
public void shouldTransitionFromRunningToError() throws InterruptedException {
// Given:
givenQueuedCommands(queuedCommand1);
doAnswer((Answer) invocation -> {
Thread.sleep(commandProcessingTimeMs);

final Instant current = Instant.now();
final CountDownLatch handleStatementLatch = new CountDownLatch(1);
final CountDownLatch commandSetLatch = new CountDownLatch(1);
when(clock.instant()).thenReturn(current)
.thenReturn(current.plusMillis(500))
.thenReturn(current.plusMillis(1500))
.thenReturn(current.plusMillis(2500));
doAnswer(invocation -> {
commandSetLatch.countDown();
handleStatementLatch.await();
return null;
}).when(statementExecutor).handleStatement(queuedCommand1);

// When:
final CyclicBarrier gate = new CyclicBarrier(3);
AtomicReference<Exception> expectedException = new AtomicReference<>(null);
(new Thread(() -> {
final Thread commandRunnerThread = (new Thread(() -> {
try {
gate.await();
commandRunner.fetchAndRunCommands();
} catch (Exception e) {
expectedException.set(e);
}
})).start();

CompletableFuture<CommandRunner.CommandRunnerStatus> statusFuture = new CompletableFuture<>();
(new Thread(() -> {
try {
gate.await();
Thread.sleep(timeToCheckMetricMs);
statusFuture.complete(commandRunner.checkCommandRunnerStatus());
} catch (Exception e) {
expectedException.set(e);
}
})).start();
}));

// Then:
gate.await();
assertThat(statusFuture.get(), equalTo(expectedStatus));
commandRunnerThread.start();
commandSetLatch.await();
assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.RUNNING));
assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.ERROR));
handleStatementLatch.countDown();
commandRunnerThread.join();
assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.RUNNING));
assertThat(expectedException.get(), equalTo(null));
}

Expand Down

0 comments on commit 6a27b99

Please sign in to comment.