diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java b/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java index 512dec937217..2b99fd97c6ad 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java @@ -30,7 +30,7 @@ public class SpecificQueryIdGenerator implements QueryIdGenerator { public SpecificQueryIdGenerator() { this.nextId = 0L; - this.alreadyUsed = true; + this.alreadyUsed = false; } public void setNextId(final long nextId) { @@ -50,6 +50,6 @@ public String getNext() { @Override public QueryIdGenerator createSandbox() { - return new SequentialQueryIdGenerator(nextId + 1); + return new SequentialQueryIdGenerator(nextId); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java index b5115b27e29f..92aa0a3e0c35 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/query/id/SpecificQueryIdGeneratorTest.java @@ -40,7 +40,10 @@ public void shouldGenerateIdBasedOnSetNextId() { assertThat(generator.getNext(), is("5")); } - + @Test + public void shouldReturnZeroIdForFirstQuery() { + assertThat(generator.getNext(), is("0")); + } @Test(expected = KsqlServerException.class) public void shouldThrowWhenGetNextBeforeSet() { @@ -54,6 +57,6 @@ public void shouldReturnSequentialGeneratorFromLastId() { generator.setNextId(3L); final QueryIdGenerator copy = generator.createSandbox(); assertThat(copy, instanceOf(SequentialQueryIdGenerator.class)); - assertThat(copy.getNext(), is("4")); + assertThat(copy.getNext(), is("3")); } } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index 89ca6eb41a94..7e7761f0b0db 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -191,7 +191,7 @@ private void handleStatementWithTerminatedQueries( ) { try { if (command.getPlan().isPresent()) { - executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode); + executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode, offset); return; } final String statementString = command.getStatement(); @@ -224,7 +224,8 @@ private void executePlan( final CommandId commandId, final Optional commandStatusFuture, final KsqlPlan plan, - final Mode mode + final Mode mode, + final long offset ) { final KsqlConfig mergedConfig = buildMergedConfig(command); final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of( @@ -238,8 +239,11 @@ private void executePlan( new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement") ); final ExecuteResult result = ksqlEngine.execute(serviceContext, configured); - if (mode == Mode.EXECUTE) { - result.getQuery().ifPresent(QueryMetadata::start); + if (result.getQuery().isPresent()) { + queryIdGenerator.setNextId(offset + 1); + if (mode == Mode.EXECUTE) { + result.getQuery().get().start(); + } } final String successMessage = getSuccessMessage(result); final CommandStatus successStatus = @@ -317,8 +321,6 @@ private PersistentQueryMetadata startQuery( final ConfiguredStatement configured = ConfiguredStatement.of( statement, command.getOverwriteProperties(), mergedConfig); - queryIdGenerator.setNextId(offset); - final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured); final QueryMetadata queryMetadata = ksqlEngine @@ -328,6 +330,8 @@ private PersistentQueryMetadata startQuery( .getQuery() .orElseThrow(() -> new IllegalStateException("Statement did not return a query")); + queryIdGenerator.setNextId(offset + 1); + if (!(queryMetadata instanceof PersistentQueryMetadata)) { throw new KsqlException(String.format( "Unexpected query metadata type: %s; was expecting %s", diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index d3ebec489c5b..44bd43a9933e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -345,6 +345,18 @@ public void shouldExecutePlannedCommand() { verify(mockEngine).execute(serviceContext, ConfiguredKsqlPlan.of(plan, emptyMap(), ksqlConfig)); } + @Test + public void shouldSetNextQueryIdToNextOffsetWhenExecutingPlannedCommand() { + // Given: + givenMockPlannedQuery(); + + // When: + handleStatement(statementExecutorWithMocks, plannedCommand, COMMAND_ID, Optional.empty(), 2L); + + // Then: + verify(mockQueryIdGenerator).setNextId(3L); + } + @Test public void shouldUpdateStatusOnCompletedPlannedCommand() { // Given: @@ -570,6 +582,25 @@ public void shouldEnforceReferentialIntegrity() { CoreMatchers.equalTo(CommandStatus.Status.SUCCESS)); } + @Test + public void shouldSetNextQueryIdToNextOffsetWhenExecutingRestoreCommand() { + // Given: + mockReplayCSAS(new QueryId("csas-query-id")); + + // When: + statementExecutorWithMocks.handleRestore( + new QueuedCommand( + new CommandId(Type.STREAM, "foo", Action.CREATE), + new Command("CSAS", emptyMap(), emptyMap(), Optional.empty()), + Optional.empty(), + 2L + ) + ); + + // Then: + verify(mockQueryIdGenerator).setNextId(3L); + } + @Test public void shouldSkipStartWhenReplayingLog() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 9876553979c7..82f90254d469 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.metrics.MetricCollectors; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.query.id.SpecificQueryIdGenerator; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.CommandId; @@ -105,7 +106,7 @@ public void tearDown() { serviceContext.close(); } - private KsqlEngine createKsqlEngine() { + private KsqlEngine createKsqlEngine(final QueryIdGenerator queryIdGenerator) { final KsqlEngineMetrics engineMetrics = mock(KsqlEngineMetrics.class); return KsqlEngineTestUtil.createKsqlEngine( serviceContext, @@ -190,7 +191,8 @@ private class KsqlServer { final ServerState serverState; KsqlServer(final List commandLog) { - this.ksqlEngine = createKsqlEngine(); + final SpecificQueryIdGenerator queryIdGenerator = new SpecificQueryIdGenerator(); + this.ksqlEngine = createKsqlEngine(queryIdGenerator); this.fakeCommandQueue = new FakeCommandQueue(commandLog, transactionalProducer); serverState = new ServerState(); serverState.setReady(); @@ -561,7 +563,7 @@ public void shouldRecoverRecreates() { server1.submitCommands( "CREATE STREAM A (C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT C1 FROM A;", - "TERMINATE CSAS_B_1;", + "TERMINATE CSAS_B_0;", "DROP STREAM B;", "CREATE STREAM B AS SELECT C2 FROM A;" ); @@ -573,7 +575,7 @@ public void shouldRecoverTerminates() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_1;" + "TERMINATE CSAS_B_0;" ); shouldRecover(commands); } @@ -583,7 +585,7 @@ public void shouldRecoverDrop() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_1;", + "TERMINATE CSAS_B_0;", "DROP STREAM B;" ); shouldRecover(commands); @@ -595,7 +597,7 @@ public void shouldNotDeleteTopicsOnRecovery() { server1.submitCommands( "CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');", "CREATE STREAM B AS SELECT * FROM A;", - "TERMINATE CSAS_B_1;", + "TERMINATE CSAS_B_0;", "DROP STREAM B DELETE TOPIC;" ); @@ -657,7 +659,7 @@ public void shouldRecoverQueryIDs() { final Set queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries()) .keySet(); - assertThat(queryIdNames, contains(new QueryId("CSAS_C_7"))); + assertThat(queryIdNames, contains(new QueryId("CSAS_C_0"))); } }