diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index 7f0972b7265..ca41d8769ff 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -210,11 +210,13 @@ private void handleStatementWithTerminatedQueries( statement, command, commandId, commandStatusFuture, mode, offset); } catch (final KsqlException exception) { log.error("Failed to handle: " + command, exception); + final CommandStatus errorStatus = new CommandStatus( CommandStatus.Status.ERROR, ExceptionUtil.stackTraceToString(exception) ); - putFinalStatus(commandId, commandStatusFuture, errorStatus); + putStatus(commandId, commandStatusFuture, errorStatus); + throw exception; } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index c729d386259..6f710c9d11a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; @@ -50,7 +51,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.InOrder; import org.mockito.Mock; @@ -61,6 +64,9 @@ public class CommandRunnerTest { private static long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000; + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Mock private InteractiveStatementExecutor statementExecutor; @Mock @@ -105,7 +111,7 @@ public void setup() { commandRunner = new CommandRunner( statementExecutor, commandStore, - 1, + 3, clusterTerminator, executor, serverState, @@ -175,6 +181,37 @@ public void shouldPullAndRunStatements() { inOrder.verify(statementExecutor).handleStatement(queuedCommand3); } + + @Test + public void shouldRetryOnException() { + // Given: + givenQueuedCommands(queuedCommand1, queuedCommand2); + doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doNothing().when(statementExecutor).handleStatement(queuedCommand2); + + // When: + commandRunner.fetchAndRunCommands(); + + // Then: + final InOrder inOrder = inOrder(statementExecutor); + inOrder.verify(statementExecutor, times(1)).handleStatement(queuedCommand1); + inOrder.verify(statementExecutor, times(3)).handleStatement(queuedCommand2); + } + + @Test + public void shouldThrowExceptionIfOverMaxRetries() { + // Given: + givenQueuedCommands(queuedCommand1, queuedCommand2); + doThrow(new RuntimeException()).when(statementExecutor).handleStatement(queuedCommand2); + + // Expect: + expectedException.expect(RuntimeException.class); + + // When: + commandRunner.fetchAndRunCommands(); + } + @Test public void shouldEarlyOutIfNewCommandsContainsTerminate() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java index 8341eef1be6..78e19491a35 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandTest.java @@ -59,24 +59,6 @@ public void shouldDeserializeWithoutKsqlConfigCorrectly() throws IOException { assertThat(command.getOriginalProperties(), equalTo(Collections.emptyMap())); } - @Test - public void shouldDeserializeWithoutUseOffsetAsQueryIDCorrectly() throws IOException { - final String commandStr = "{" + - "\"statement\": \"test statement;\", " + - "\"streamsProperties\": {\"foo\": \"bar\"}, " + - "\"originalProperties\": {\"biz\": \"baz\"} " + - "}"; - final ObjectMapper mapper = JsonMapper.INSTANCE.mapper; - final Command command = mapper.readValue(commandStr, Command.class); - assertThat(command.getStatement(), equalTo("test statement;")); - final Map expecteOverwriteProperties - = Collections.singletonMap("foo", "bar"); - assertThat(command.getOverwriteProperties(), equalTo(expecteOverwriteProperties)); - final Map expectedOriginalProperties - = Collections.singletonMap("biz", "baz"); - assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties)); - } - private void grep(final String string, final String regex) { assertThat(string.matches(regex), is(true)); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index c69857254ec..dc2837aaa22 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -67,6 +67,8 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlServerException; +import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.Collections; @@ -97,6 +99,11 @@ public class InteractiveStatementExecutorTest { private static final Map PRE_VERSION_5_NULL_ORIGINAL_PROPS = null; + private static final String CREATE_STREAM_FOO_STATMENT = "CREATE STREAM foo (" + + "biz bigint," + + " baz varchar) " + + "WITH (kafka_topic = 'foo', " + + "value_format = 'json');"; private KsqlEngine ksqlEngine; private InteractiveStatementExecutor statementExecutor; @@ -289,11 +296,7 @@ public void shouldBuildQueriesWithPersistedConfig() { public void shouldCompleteFutureOnSuccess() { // Given: final Command command = new Command( - "CREATE STREAM foo (" - + "biz bigint," - + " baz varchar) " - + "WITH (kafka_topic = 'foo', " - + "value_format = 'json');", + CREATE_STREAM_FOO_STATMENT, emptyMap(), ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); final CommandId commandId = new CommandId(CommandId.Type.STREAM, @@ -318,16 +321,12 @@ public void shouldCompleteFutureOnSuccess() { } @Test - public void shouldCompleteFutureOnFailure() { + public void shouldThrowExceptionIfCommandFails() { // Given: shouldCompleteFutureOnSuccess(); final Command command = new Command( - "CREATE STREAM foo (" - + "biz bigint," - + " baz varchar) " - + "WITH (kafka_topic = 'foo', " - + "value_format = 'json');", + CREATE_STREAM_FOO_STATMENT, emptyMap(), ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); final CommandId commandId = new CommandId(CommandId.Type.STREAM, @@ -336,19 +335,22 @@ public void shouldCompleteFutureOnFailure() { final CommandStatusFuture status = mock(CommandStatusFuture.class); // When: - handleStatement(command, commandId, Optional.of(status), 0L); - - // Then: + try { + handleStatement(command, commandId, Optional.of(status), 0L); + } catch (KsqlStatementException e) { + // Then: + assertEquals("Cannot add stream 'FOO': A stream with the same name already exists\n" + + "Statement: " + CREATE_STREAM_FOO_STATMENT, + e.getMessage()); + } InOrder inOrder = Mockito.inOrder(status); ArgumentCaptor argCommandStatus = ArgumentCaptor.forClass(CommandStatus.class); - ArgumentCaptor argFinalCommandStatus = ArgumentCaptor.forClass(CommandStatus.class); - inOrder.verify(status, times(2)).setStatus(argCommandStatus.capture()); - inOrder.verify(status, times(1)).setFinalStatus(argFinalCommandStatus.capture()); + inOrder.verify(status, times(3)).setStatus(argCommandStatus.capture()); List commandStatusList = argCommandStatus.getAllValues(); assertEquals(CommandStatus.Status.PARSING, commandStatusList.get(0).getStatus()); assertEquals(CommandStatus.Status.EXECUTING, commandStatusList.get(1).getStatus()); - assertEquals(CommandStatus.Status.ERROR, argFinalCommandStatus.getValue().getStatus()); + assertEquals(CommandStatus.Status.ERROR, commandStatusList.get(2).getStatus()); } @Test @@ -367,6 +369,8 @@ public void shouldHandlePriorStatements() { CommandId.Action.CREATE); // When: + expectedException.expect(KsqlStatementException.class); + for (int i = 0; i < priorCommands.size(); i++) { final Pair pair = priorCommands.get(i); statementExecutor.handleRestore( @@ -418,6 +422,7 @@ public void shouldEnforceReferentialIntegrity() { createStreamsAndStartTwoPersistentQueries(); // Now try to drop streams/tables to test referential integrity + expectedException.expect(KsqlStatementException.class); tryDropThatViolatesReferentialIntegrity(); // Terminate the queries using the stream/table @@ -599,7 +604,9 @@ public void shouldRestoreLegacyRunScriptCommand() { new CommandId(CommandId.Type.STREAM, "RunScript", CommandId.Action.EXECUTE), new Command( runScriptStatement, - Collections.singletonMap("ksql.run.script.statements", queryStatement), emptyMap()), + Collections.singletonMap("ksql.run.script.statements", queryStatement), + emptyMap() + ), Optional.empty(), 0L ) diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index d885c6587ae..cee9143c7c0 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -624,50 +624,6 @@ public void shouldNotDeleteTopicsOnRecoveryEvenIfLegacyDropCommandAlreadyInComma assertThat(topicClient.listTopicNames(), hasItem("B")); } - @Test - public void shouldRecoverLogWithRepeatedTerminates() { - server1.submitCommands( - "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", - "CREATE STREAM B AS SELECT * FROM A;" - ); - server2.executeCommands(); - server1.submitCommands( - "TERMINATE CSAS_B_1;", - "INSERT INTO B SELECT * FROM A;", - "TERMINATE InsertQuery_3;" - ); - server2.submitCommands("TERMINATE CSAS_B_1;"); - shouldRecover(commands); - } - - @Test - public void shouldRecoverLogWithDropWithRacingInsert() { - server1.submitCommands( - "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", - "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_1;" - ); - server2.executeCommands(); - server1.submitCommands("INSERT INTO B SELECT * FROM A;"); - server2.submitCommands("DROP STREAM B;"); - shouldRecover(commands); - } - - @Test - public void shouldRecoverLogWithTerminateAfterDrop() { - topicClient.preconditionTopicExists("B"); - - server1.submitCommands( - "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", - "CREATE STREAM B (COLUMN STRING) WITH (KAFKA_TOPIC='B', VALUE_FORMAT='JSON');" - ); - server2.executeCommands(); - server1.submitCommands("INSERT INTO B SELECT * FROM A;"); - server2.submitCommands("DROP STREAM B;"); - server1.submitCommands("TERMINATE InsertQuery_2;"); - shouldRecover(commands); - } - @Test public void shouldRecoverQueryIDsByOffset() { commands.addAll(