Skip to content

Commit

Permalink
fix: change query id generation to work with planned commands (#4149)
Browse files Browse the repository at this point in the history
* fix: change query id generation to work with planned commands

This patch changes up how we generate query IDs to play nice with
planned commands. Before this change, statements would get the current
offset as their query id. However planned commands get their query IDs
before being enqueued, so they should really get the _next_ expected
offset as their ID. This patch changes up the id generation to work
this way. The next ID is set _after_ statemetns/plans are executed,
and is set to the next expected offset.

Co-Authored-By: Victoria Xia <[email protected]>
  • Loading branch information
rodesai and vcrfxia authored Dec 19, 2019
1 parent 6e558da commit 91c421a
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SpecificQueryIdGenerator implements QueryIdGenerator {

public SpecificQueryIdGenerator() {
this.nextId = 0L;
this.alreadyUsed = true;
this.alreadyUsed = false;
}

public void setNextId(final long nextId) {
Expand All @@ -50,6 +50,6 @@ public String getNext() {

@Override
public QueryIdGenerator createSandbox() {
return new SequentialQueryIdGenerator(nextId + 1);
return new SequentialQueryIdGenerator(nextId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public void shouldGenerateIdBasedOnSetNextId() {
assertThat(generator.getNext(), is("5"));
}


@Test
public void shouldReturnZeroIdForFirstQuery() {
assertThat(generator.getNext(), is("0"));
}

@Test(expected = KsqlServerException.class)
public void shouldThrowWhenGetNextBeforeSet() {
Expand All @@ -54,6 +57,6 @@ public void shouldReturnSequentialGeneratorFromLastId() {
generator.setNextId(3L);
final QueryIdGenerator copy = generator.createSandbox();
assertThat(copy, instanceOf(SequentialQueryIdGenerator.class));
assertThat(copy.getNext(), is("4"));
assertThat(copy.getNext(), is("3"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private void handleStatementWithTerminatedQueries(
) {
try {
if (command.getPlan().isPresent()) {
executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode);
executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode, offset);
return;
}
final String statementString = command.getStatement();
Expand Down Expand Up @@ -224,7 +224,8 @@ private void executePlan(
final CommandId commandId,
final Optional<CommandStatusFuture> commandStatusFuture,
final KsqlPlan plan,
final Mode mode
final Mode mode,
final long offset
) {
final KsqlConfig mergedConfig = buildMergedConfig(command);
final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of(
Expand All @@ -238,8 +239,11 @@ private void executePlan(
new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement")
);
final ExecuteResult result = ksqlEngine.execute(serviceContext, configured);
if (mode == Mode.EXECUTE) {
result.getQuery().ifPresent(QueryMetadata::start);
if (result.getQuery().isPresent()) {
queryIdGenerator.setNextId(offset + 1);
if (mode == Mode.EXECUTE) {
result.getQuery().get().start();
}
}
final String successMessage = getSuccessMessage(result);
final CommandStatus successStatus =
Expand Down Expand Up @@ -317,8 +321,6 @@ private PersistentQueryMetadata startQuery(
final ConfiguredStatement<?> configured = ConfiguredStatement.of(
statement, command.getOverwriteProperties(), mergedConfig);

queryIdGenerator.setNextId(offset);

final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured);
final QueryMetadata queryMetadata =
ksqlEngine
Expand All @@ -328,6 +330,8 @@ private PersistentQueryMetadata startQuery(
.getQuery()
.orElseThrow(() -> new IllegalStateException("Statement did not return a query"));

queryIdGenerator.setNextId(offset + 1);

if (!(queryMetadata instanceof PersistentQueryMetadata)) {
throw new KsqlException(String.format(
"Unexpected query metadata type: %s; was expecting %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,18 @@ public void shouldExecutePlannedCommand() {
verify(mockEngine).execute(serviceContext, ConfiguredKsqlPlan.of(plan, emptyMap(), ksqlConfig));
}

@Test
public void shouldSetNextQueryIdToNextOffsetWhenExecutingPlannedCommand() {
// Given:
givenMockPlannedQuery();

// When:
handleStatement(statementExecutorWithMocks, plannedCommand, COMMAND_ID, Optional.empty(), 2L);

// Then:
verify(mockQueryIdGenerator).setNextId(3L);
}

@Test
public void shouldUpdateStatusOnCompletedPlannedCommand() {
// Given:
Expand Down Expand Up @@ -570,6 +582,25 @@ public void shouldEnforceReferentialIntegrity() {
CoreMatchers.equalTo(CommandStatus.Status.SUCCESS));
}

@Test
public void shouldSetNextQueryIdToNextOffsetWhenExecutingRestoreCommand() {
// Given:
mockReplayCSAS(new QueryId("csas-query-id"));

// When:
statementExecutorWithMocks.handleRestore(
new QueuedCommand(
new CommandId(Type.STREAM, "foo", Action.CREATE),
new Command("CSAS", emptyMap(), emptyMap(), Optional.empty()),
Optional.empty(),
2L
)
);

// Then:
verify(mockQueryIdGenerator).setNextId(3L);
}

@Test
public void shouldSkipStartWhenReplayingLog() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
Expand Down Expand Up @@ -105,7 +106,7 @@ public void tearDown() {
serviceContext.close();
}

private KsqlEngine createKsqlEngine() {
private KsqlEngine createKsqlEngine(final QueryIdGenerator queryIdGenerator) {
final KsqlEngineMetrics engineMetrics = mock(KsqlEngineMetrics.class);
return KsqlEngineTestUtil.createKsqlEngine(
serviceContext,
Expand Down Expand Up @@ -190,7 +191,8 @@ private class KsqlServer {
final ServerState serverState;

KsqlServer(final List<QueuedCommand> commandLog) {
this.ksqlEngine = createKsqlEngine();
final SpecificQueryIdGenerator queryIdGenerator = new SpecificQueryIdGenerator();
this.ksqlEngine = createKsqlEngine(queryIdGenerator);
this.fakeCommandQueue = new FakeCommandQueue(commandLog, transactionalProducer);
serverState = new ServerState();
serverState.setReady();
Expand Down Expand Up @@ -561,7 +563,7 @@ public void shouldRecoverRecreates() {
server1.submitCommands(
"CREATE STREAM A (C1 STRING, C2 INT) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT C1 FROM A;",
"TERMINATE CSAS_B_1;",
"TERMINATE CSAS_B_0;",
"DROP STREAM B;",
"CREATE STREAM B AS SELECT C2 FROM A;"
);
Expand All @@ -573,7 +575,7 @@ public void shouldRecoverTerminates() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;"
"TERMINATE CSAS_B_0;"
);
shouldRecover(commands);
}
Expand All @@ -583,7 +585,7 @@ public void shouldRecoverDrop() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;",
"TERMINATE CSAS_B_0;",
"DROP STREAM B;"
);
shouldRecover(commands);
Expand All @@ -595,7 +597,7 @@ public void shouldNotDeleteTopicsOnRecovery() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM B AS SELECT * FROM A;",
"TERMINATE CSAS_B_1;",
"TERMINATE CSAS_B_0;",
"DROP STREAM B DELETE TOPIC;"
);

Expand Down Expand Up @@ -657,7 +659,7 @@ public void shouldRecoverQueryIDs() {
final Set<QueryId> queryIdNames = queriesById(server.ksqlEngine.getPersistentQueries())
.keySet();

assertThat(queryIdNames, contains(new QueryId("CSAS_C_7")));
assertThat(queryIdNames, contains(new QueryId("CSAS_C_0")));
}

}

0 comments on commit 91c421a

Please sign in to comment.