Skip to content

Commit

Permalink
feat: never skip command in CommandRunner thread (#3962)
Browse files Browse the repository at this point in the history
* refactor: throw exception from InteractiveStatementExecutor so commands are retried

* feat: never skip commands from command topic
  • Loading branch information
stevenpyzhang authored Dec 6, 2019
1 parent 5ff94b6 commit 89e059b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,13 @@ private void handleStatementWithTerminatedQueries(
statement, command, commandId, commandStatusFuture, mode, offset);
} catch (final KsqlException exception) {
log.error("Failed to handle: " + command, exception);

final CommandStatus errorStatus = new CommandStatus(
CommandStatus.Status.ERROR,
ExceptionUtil.stackTraceToString(exception)
);
putFinalStatus(commandId, commandStatusFuture, errorStatus);
putStatus(commandId, commandStatusFuture, errorStatus);
throw exception;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
Expand All @@ -50,7 +51,9 @@
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
Expand All @@ -61,6 +64,9 @@
public class CommandRunnerTest {
private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000;

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Mock
private InteractiveStatementExecutor statementExecutor;
@Mock
Expand Down Expand Up @@ -105,7 +111,7 @@ public void setup() {
commandRunner = new CommandRunner(
statementExecutor,
commandStore,
1,
3,
clusterTerminator,
executor,
serverState,
Expand Down Expand Up @@ -175,6 +181,37 @@ public void shouldPullAndRunStatements() {
inOrder.verify(statementExecutor).handleStatement(queuedCommand3);
}


@Test
public void shouldRetryOnException() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2);
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doNothing().when(statementExecutor).handleStatement(queuedCommand2);

// When:
commandRunner.fetchAndRunCommands();

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor, times(1)).handleStatement(queuedCommand1);
inOrder.verify(statementExecutor, times(3)).handleStatement(queuedCommand2);
}

@Test
public void shouldThrowExceptionIfOverMaxRetries() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2);
doThrow(new RuntimeException()).when(statementExecutor).handleStatement(queuedCommand2);

// Expect:
expectedException.expect(RuntimeException.class);

// When:
commandRunner.fetchAndRunCommands();
}

@Test
public void shouldEarlyOutIfNewCommandsContainsTerminate() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,6 @@ public void shouldDeserializeWithoutKsqlConfigCorrectly() throws IOException {
assertThat(command.getOriginalProperties(), equalTo(Collections.emptyMap()));
}

@Test
public void shouldDeserializeWithoutUseOffsetAsQueryIDCorrectly() throws IOException {
final String commandStr = "{" +
"\"statement\": \"test statement;\", " +
"\"streamsProperties\": {\"foo\": \"bar\"}, " +
"\"originalProperties\": {\"biz\": \"baz\"} " +
"}";
final ObjectMapper mapper = JsonMapper.INSTANCE.mapper;
final Command command = mapper.readValue(commandStr, Command.class);
assertThat(command.getStatement(), equalTo("test statement;"));
final Map<String, Object> expecteOverwriteProperties
= Collections.singletonMap("foo", "bar");
assertThat(command.getOverwriteProperties(), equalTo(expecteOverwriteProperties));
final Map<String, Object> expectedOriginalProperties
= Collections.singletonMap("biz", "baz");
assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties));
}

private void grep(final String string, final String regex) {
assertThat(string.matches(regex), is(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.Collections;
Expand Down Expand Up @@ -97,6 +99,11 @@
public class InteractiveStatementExecutorTest {

private static final Map<String, String> PRE_VERSION_5_NULL_ORIGINAL_PROPS = null;
private static final String CREATE_STREAM_FOO_STATMENT = "CREATE STREAM foo ("
+ "biz bigint,"
+ " baz varchar) "
+ "WITH (kafka_topic = 'foo', "
+ "value_format = 'json');";

private KsqlEngine ksqlEngine;
private InteractiveStatementExecutor statementExecutor;
Expand Down Expand Up @@ -289,11 +296,7 @@ public void shouldBuildQueriesWithPersistedConfig() {
public void shouldCompleteFutureOnSuccess() {
// Given:
final Command command = new Command(
"CREATE STREAM foo ("
+ "biz bigint,"
+ " baz varchar) "
+ "WITH (kafka_topic = 'foo', "
+ "value_format = 'json');",
CREATE_STREAM_FOO_STATMENT,
emptyMap(),
ksqlConfig.getAllConfigPropsWithSecretsObfuscated());
final CommandId commandId = new CommandId(CommandId.Type.STREAM,
Expand All @@ -318,16 +321,12 @@ public void shouldCompleteFutureOnSuccess() {
}

@Test
public void shouldCompleteFutureOnFailure() {
public void shouldThrowExceptionIfCommandFails() {
// Given:
shouldCompleteFutureOnSuccess();

final Command command = new Command(
"CREATE STREAM foo ("
+ "biz bigint,"
+ " baz varchar) "
+ "WITH (kafka_topic = 'foo', "
+ "value_format = 'json');",
CREATE_STREAM_FOO_STATMENT,
emptyMap(),
ksqlConfig.getAllConfigPropsWithSecretsObfuscated());
final CommandId commandId = new CommandId(CommandId.Type.STREAM,
Expand All @@ -336,19 +335,22 @@ public void shouldCompleteFutureOnFailure() {
final CommandStatusFuture status = mock(CommandStatusFuture.class);

// When:
handleStatement(command, commandId, Optional.of(status), 0L);

// Then:
try {
handleStatement(command, commandId, Optional.of(status), 0L);
} catch (KsqlStatementException e) {
// Then:
assertEquals("Cannot add stream 'FOO': A stream with the same name already exists\n" +
"Statement: " + CREATE_STREAM_FOO_STATMENT,
e.getMessage());
}
InOrder inOrder = Mockito.inOrder(status);
ArgumentCaptor<CommandStatus> argCommandStatus = ArgumentCaptor.forClass(CommandStatus.class);
ArgumentCaptor<CommandStatus> argFinalCommandStatus = ArgumentCaptor.forClass(CommandStatus.class);
inOrder.verify(status, times(2)).setStatus(argCommandStatus.capture());
inOrder.verify(status, times(1)).setFinalStatus(argFinalCommandStatus.capture());
inOrder.verify(status, times(3)).setStatus(argCommandStatus.capture());

List<CommandStatus> commandStatusList = argCommandStatus.getAllValues();
assertEquals(CommandStatus.Status.PARSING, commandStatusList.get(0).getStatus());
assertEquals(CommandStatus.Status.EXECUTING, commandStatusList.get(1).getStatus());
assertEquals(CommandStatus.Status.ERROR, argFinalCommandStatus.getValue().getStatus());
assertEquals(CommandStatus.Status.ERROR, commandStatusList.get(2).getStatus());
}

@Test
Expand All @@ -367,6 +369,8 @@ public void shouldHandlePriorStatements() {
CommandId.Action.CREATE);

// When:
expectedException.expect(KsqlStatementException.class);

for (int i = 0; i < priorCommands.size(); i++) {
final Pair<CommandId, Command> pair = priorCommands.get(i);
statementExecutor.handleRestore(
Expand Down Expand Up @@ -418,6 +422,7 @@ public void shouldEnforceReferentialIntegrity() {
createStreamsAndStartTwoPersistentQueries();

// Now try to drop streams/tables to test referential integrity
expectedException.expect(KsqlStatementException.class);
tryDropThatViolatesReferentialIntegrity();

// Terminate the queries using the stream/table
Expand Down Expand Up @@ -599,7 +604,9 @@ public void shouldRestoreLegacyRunScriptCommand() {
new CommandId(CommandId.Type.STREAM, "RunScript", CommandId.Action.EXECUTE),
new Command(
runScriptStatement,
Collections.singletonMap("ksql.run.script.statements", queryStatement), emptyMap()),
Collections.singletonMap("ksql.run.script.statements", queryStatement),
emptyMap()
),
Optional.empty(),
0L
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,50 +624,6 @@ public void shouldNotDeleteTopicsOnRecoveryEvenIfLegacyDropCommandAlreadyInComma
assertThat(topicClient.listTopicNames(), hasItem("B"));
}

@Test
public void shouldRecoverLogWithRepeatedTerminates() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;"
);
server2.executeCommands();
server1.submitCommands(
"TERMINATE CSAS_B_1;",
"INSERT INTO B SELECT * FROM A;",
"TERMINATE InsertQuery_3;"
);
server2.submitCommands("TERMINATE CSAS_B_1;");
shouldRecover(commands);
}

@Test
public void shouldRecoverLogWithDropWithRacingInsert() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;"
);
server2.executeCommands();
server1.submitCommands("INSERT INTO B SELECT * FROM A;");
server2.submitCommands("DROP STREAM B;");
shouldRecover(commands);
}

@Test
public void shouldRecoverLogWithTerminateAfterDrop() {
topicClient.preconditionTopicExists("B");

server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B (COLUMN STRING) WITH (KAFKA_TOPIC='B', VALUE_FORMAT='JSON');"
);
server2.executeCommands();
server1.submitCommands("INSERT INTO B SELECT * FROM A;");
server2.submitCommands("DROP STREAM B;");
server1.submitCommands("TERMINATE InsertQuery_2;");
shouldRecover(commands);
}

@Test
public void shouldRecoverQueryIDsByOffset() {
commands.addAll(
Expand Down

0 comments on commit 89e059b

Please sign in to comment.