Skip to content

Commit

Permalink
fix: add logging during restore (#4270)
Browse files Browse the repository at this point in the history
Fixes: #4269
  • Loading branch information
big-andy-coates authored Jan 13, 2020
1 parent 8326151 commit 4e32da6
Showing 1 changed file with 51 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
Expand All @@ -42,12 +41,13 @@

/**
* Handles the logic of reading distributed commands, including pre-existing commands that were
* issued before being initialized, and then delegating their execution to a
* {@link InteractiveStatementExecutor}.
* Also responsible for taking care of any exceptions that occur in the process.
* issued before being initialized, and then delegating their execution to a {@link
* InteractiveStatementExecutor}. Also responsible for taking care of any exceptions that occur in
* the process.
*/
public class CommandRunner implements Closeable {
private static final Logger log = LoggerFactory.getLogger(CommandRunner.class);

private static final Logger LOG = LoggerFactory.getLogger(CommandRunner.class);

private static final int STATEMENT_RETRY_MS = 100;
private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000;
Expand Down Expand Up @@ -152,27 +152,46 @@ public void close() {
* Read and execute all commands on the command topic, starting at the earliest offset.
*/
public void processPriorCommands() {
final List<QueuedCommand> restoreCommands = commandStore.getRestoreCommands();
final Optional<QueuedCommand> terminateCmd = findTerminateCommand(restoreCommands);
if (terminateCmd.isPresent()) {
terminateCluster(terminateCmd.get().getCommand());
return;
try {
final List<QueuedCommand> restoreCommands = commandStore.getRestoreCommands();

LOG.info("Restoring previous state from {} commands.", restoreCommands.size());

final Optional<QueuedCommand> terminateCmd = findTerminateCommand(restoreCommands);
if (terminateCmd.isPresent()) {
LOG.info("Cluster previously terminated: terminating.");
terminateCluster(terminateCmd.get().getCommand());
return;
}

restoreCommands.forEach(
command -> {
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
);
currentCommandRef.set(null);
}
);

final List<PersistentQueryMetadata> queries = statementExecutor
.getKsqlEngine()
.getPersistentQueries();

LOG.info("Restarting {} queries.", queries.size());

queries.forEach(PersistentQueryMetadata::start);

LOG.info("Restore complete");

} catch (final Exception e) {
LOG.error("Error during restore", e);
throw e;
}
restoreCommands.forEach(
command -> {
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
);
currentCommandRef.set(null);
}
);
final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine();
ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start);
}

void fetchAndRunCommands() {
Expand All @@ -187,7 +206,7 @@ void fetchAndRunCommands() {
return;
}

log.trace("Found {} new writes to command topic", commands.size());
LOG.debug("Found {} new writes to command topic", commands.size());
for (final QueuedCommand command : commands) {
if (closed) {
return;
Expand All @@ -198,14 +217,14 @@ void fetchAndRunCommands() {
}

private void executeStatement(final QueuedCommand queuedCommand) {
log.info("Executing statement: " + queuedCommand.getCommand().getStatement());
LOG.info("Executing statement: " + queuedCommand.getCommand().getStatement());

final Runnable task = () -> {
if (closed) {
log.info("Execution aborted as system is closing down");
LOG.info("Execution aborted as system is closing down");
} else {
statementExecutor.handleStatement(queuedCommand);
log.info("Executed statement: " + queuedCommand.getCommand().getStatement());
LOG.info("Executed statement: " + queuedCommand.getCommand().getStatement());
}
};

Expand All @@ -232,13 +251,13 @@ private static Optional<QueuedCommand> findTerminateCommand(
@SuppressWarnings("unchecked")
private void terminateCluster(final Command command) {
serverState.setTerminating();
log.info("Terminating the KSQL server.");
LOG.info("Terminating the KSQL server.");
this.close();
final List<String> deleteTopicList = (List<String>) command.getOverwriteProperties()
.getOrDefault(ClusterTerminateRequest.DELETE_TOPIC_LIST_PROP, Collections.emptyList());

clusterTerminator.terminateCluster(deleteTopicList);
log.info("The KSQL server was terminated.");
LOG.info("The KSQL server was terminated.");
}

CommandRunnerStatus checkCommandRunnerStatus() {
Expand All @@ -252,17 +271,13 @@ CommandRunnerStatus checkCommandRunnerStatus() {
? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR;
}

Pair<QueuedCommand, Instant> getCurrentCommand() {
return currentCommandRef.get();
}

private class Runner implements Runnable {

@Override
public void run() {
try {
while (!closed) {
log.debug("Polling for new writes to command topic");
LOG.trace("Polling for new writes to command topic");
fetchAndRunCommands();
}
} catch (final WakeupException wue) {
Expand Down

0 comments on commit 4e32da6

Please sign in to comment.