diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java index 68fd6dbf56b1..6e581dfa6c63 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java @@ -16,14 +16,18 @@ package io.confluent.ksql; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -88,6 +92,24 @@ public interface KsqlExecutionContext { */ PreparedStatement prepare(ParsedStatement stmt); + /** + * Executes a query using the supplied service context. + */ + TransientQueryMetadata executeQuery( + ServiceContext serviceContext, + ConfiguredStatement statement + ); + + /** + * Computes a plan for executing a DDL/DML statement using the supplied service context. + */ + KsqlPlan plan(ServiceContext serviceContext, ConfiguredStatement statement); + + /** + * Executes a KSQL plan using the supplied service context. + */ + ExecuteResult execute(ServiceContext serviceContext, ConfiguredKsqlPlan plan); + /** * Execute the supplied statement, updating the meta store and registering any query. * diff --git a/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java b/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java index ec4575698821..065c26a21b9a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java @@ -28,6 +28,7 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.UnsetProperty; @@ -197,10 +198,11 @@ private static ExecuteResult execute( final CustomExecutor executor = CustomExecutors.EXECUTOR_MAP.getOrDefault( configured.getStatement().getClass(), - (serviceContext, s, props) -> executionContext.execute(serviceContext, s)); + (passedExecutionContext, s, props) -> passedExecutionContext.execute( + passedExecutionContext.getServiceContext(), s)); return executor.apply( - executionContext.getServiceContext(), + executionContext, configured, mutableSessionPropertyOverrides ); @@ -209,7 +211,7 @@ private static ExecuteResult execute( @FunctionalInterface private interface CustomExecutor { ExecuteResult apply( - ServiceContext serviceContext, + KsqlExecutionContext executionContext, ConfiguredStatement statement, Map mutableSessionPropertyOverrides ); @@ -218,13 +220,17 @@ ExecuteResult apply( @SuppressWarnings("unchecked") private enum CustomExecutors { - SET_PROPERTY(SetProperty.class, (serviceContext, stmt, props) -> { + SET_PROPERTY(SetProperty.class, (executionContext, stmt, props) -> { PropertyOverrider.set((ConfiguredStatement) stmt, props); return ExecuteResult.of("Successfully executed " + stmt.getStatement()); }), - UNSET_PROPERTY(UnsetProperty.class, (serviceContext, stmt, props) -> { + UNSET_PROPERTY(UnsetProperty.class, (executionContext, stmt, props) -> { PropertyOverrider.unset((ConfiguredStatement) stmt, props); return ExecuteResult.of("Successfully executed " + stmt.getStatement()); + }), + QUERY(Query.class, (executionContext, stmt, props) -> { + return ExecuteResult.of( + executionContext.executeQuery(executionContext.getServiceContext(), stmt.cast())); }) ; @@ -256,11 +262,12 @@ private CustomExecutor getExecutor() { } public ExecuteResult execute( - final ServiceContext serviceContext, + final KsqlExecutionContext executionContext, final ConfiguredStatement statement, final Map mutableSessionPropertyOverrides ) { - return executor.apply(serviceContext, statement, mutableSessionPropertyOverrides); + return executor.apply( + executionContext, statement, mutableSessionPropertyOverrides); } } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index d259d3c2ff2e..f58e47d36cdd 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -98,15 +98,8 @@ static EngineExecutor create( return new EngineExecutor(engineContext, serviceContext, ksqlConfig, overriddenProperties); } - ExecuteResult execute(final ConfiguredStatement statement) { - if (statement.getStatement() instanceof Query) { - return ExecuteResult.of(executeQuery(statement.cast())); - } - return execute(plan(statement)); - } - @SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty - private ExecuteResult execute(final KsqlPlan plan) { + ExecuteResult execute(final KsqlPlan plan) { final Optional ddlResult = plan.getDdlCommand() .map(ddl -> executeDdl(ddl, plan.getStatementText())); @@ -119,7 +112,7 @@ private ExecuteResult execute(final KsqlPlan plan) { } @SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty - private TransientQueryMetadata executeQuery(final ConfiguredStatement statement) { + TransientQueryMetadata executeQuery(final ConfiguredStatement statement) { final ExecutorPlans plans = planQuery(statement, statement.getStatement(), Optional.empty()); final OutputNode outputNode = plans.logicalPlan.getNode().get(); final QueryExecutor executor = engineContext.createQueryExecutor( @@ -139,7 +132,7 @@ private TransientQueryMetadata executeQuery(final ConfiguredStatement sta } @SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty - private KsqlPlan plan(final ConfiguredStatement statement) { + KsqlPlan plan(final ConfiguredStatement statement) { try { throwOnNonExecutableStatement(statement); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index d88725b6ba3d..fa7827a06484 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -31,6 +31,7 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.QueryContainer; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.schema.registry.SchemaRegistryUtil; @@ -38,6 +39,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.io.Closeable; import java.util.List; import java.util.Objects; @@ -160,19 +162,49 @@ public PreparedStatement prepare(final ParsedStatement stmt) { } @Override - public ExecuteResult execute( + public KsqlPlan plan( final ServiceContext serviceContext, - final ConfiguredStatement statement - ) { - final ExecuteResult result = EngineExecutor + final ConfiguredStatement statement) { + return EngineExecutor .create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides()) - .execute(statement); + .plan(statement); + } + @Override + public ExecuteResult execute(final ServiceContext serviceContext, final ConfiguredKsqlPlan plan) { + final ExecuteResult result = EngineExecutor + .create(primaryContext, serviceContext, plan.getConfig(), plan.getOverrides()) + .execute(plan.getPlan()); result.getQuery().ifPresent(this::registerQuery); - return result; } + @Override + public ExecuteResult execute( + final ServiceContext serviceContext, + final ConfiguredStatement statement + ) { + return execute( + serviceContext, + ConfiguredKsqlPlan.of( + plan(serviceContext, statement), + statement.getOverrides(), + statement.getConfig() + ) + ); + } + + @Override + public TransientQueryMetadata executeQuery( + final ServiceContext serviceContext, + final ConfiguredStatement statement) { + final TransientQueryMetadata query = EngineExecutor + .create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides()) + .executeQuery(statement); + registerQuery(query); + return query; + } + @Override public void close() { allLiveQueries.forEach(QueryMetadata::close); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index 8bb8700fe007..bd80d1f84675 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -20,11 +20,14 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.Sandbox; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.List; import java.util.Optional; @@ -79,14 +82,57 @@ public PreparedStatement prepare(final ParsedStatement stmt) { return engineContext.prepare(stmt); } + @Override + public KsqlPlan plan( + final ServiceContext serviceContext, + final ConfiguredStatement statement + ) { + return EngineExecutor.create( + engineContext, + serviceContext, + statement.getConfig(), + statement.getOverrides() + ).plan(statement); + } + + @Override + public ExecuteResult execute( + final ServiceContext serviceContext, + final ConfiguredKsqlPlan ksqlPlan + ) { + return EngineExecutor.create( + engineContext, + serviceContext, + ksqlPlan.getConfig(), + ksqlPlan.getOverrides() + ).execute(ksqlPlan.getPlan()); + } + @Override public ExecuteResult execute( final ServiceContext serviceContext, final ConfiguredStatement statement ) { - final EngineExecutor executor = EngineExecutor - .create(engineContext, serviceContext, statement.getConfig(), statement.getOverrides()); + return execute( + serviceContext, + ConfiguredKsqlPlan.of( + plan(serviceContext, statement), + statement.getOverrides(), + statement.getConfig() + ) + ); + } - return executor.execute(statement); + @Override + public TransientQueryMetadata executeQuery( + final ServiceContext serviceContext, + final ConfiguredStatement statement + ) { + return EngineExecutor.create( + engineContext, + serviceContext, + statement.getConfig(), + statement.getOverrides() + ).executeQuery(statement); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ConfiguredKsqlPlan.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ConfiguredKsqlPlan.java new file mode 100644 index 000000000000..29fadfdce7ef --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ConfiguredKsqlPlan.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.planner.plan; + +import io.confluent.ksql.engine.KsqlPlan; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; +import java.util.Objects; + +public final class ConfiguredKsqlPlan { + private final KsqlPlan plan; + private final Map overrides; + private final KsqlConfig config; + + public static ConfiguredKsqlPlan of( + final KsqlPlan plan, + final Map overrides, + final KsqlConfig config + ) { + return new ConfiguredKsqlPlan(plan, overrides, config); + } + + private ConfiguredKsqlPlan( + final KsqlPlan plan, + final Map overrides, + final KsqlConfig config + ) { + this.plan = Objects.requireNonNull(plan, "plan"); + this.overrides = Objects.requireNonNull(overrides, "overrides"); + this.config = Objects.requireNonNull(config, "config"); + } + + public KsqlPlan getPlan() { + return plan; + } + + public Map getOverrides() { + return overrides; + } + + public KsqlConfig getConfig() { + return config; + } +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java index 46d9eafaf8e1..c315b830fe7e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java @@ -123,7 +123,8 @@ public void setUp() { when(ksqlEngine.prepare(PARSED_STMT_0)).thenReturn((PreparedStatement) PREPARED_STMT_0); when(ksqlEngine.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); - when(ksqlEngine.execute(any(), any())).thenReturn(ExecuteResult.of("success")); + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of("success")); when(ksqlEngine.createSandbox(any())).thenReturn(sandbox); @@ -204,7 +205,7 @@ public void shouldThrowIfParseFails() { @Test public void shouldThrowIfSandboxExecuteThrows() { // Given: - when(sandbox.execute(any(), any())) + when(sandbox.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new KsqlException("Bad tings happen")); // Expect @@ -218,7 +219,7 @@ public void shouldThrowIfSandboxExecuteThrows() { @Test public void shouldThrowIfExecuteThrows() { // Given: - when(ksqlEngine.execute(any(), any())) + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new KsqlException("Bad tings happen")); // Expect @@ -232,7 +233,7 @@ public void shouldThrowIfExecuteThrows() { @Test public void shouldNotExecuteAnyStatementsIfTryExecuteThrows() { // Given: - when(sandbox.execute(any(), any())) + when(sandbox.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new KsqlException("Bad tings happen")); // When: @@ -243,13 +244,13 @@ public void shouldNotExecuteAnyStatementsIfTryExecuteThrows() { } // Then: - verify(ksqlEngine, never()).execute(any(), any()); + verify(ksqlEngine, never()).execute(any(), any(ConfiguredStatement.class)); } @Test public void shouldStartPersistentQueries() { // Given: - when(ksqlEngine.execute(any(), any())) + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of(persistentQuery)); // When: @@ -262,7 +263,7 @@ public void shouldStartPersistentQueries() { @Test public void shouldNotBlowUpOnSqlThatDoesNotResultInPersistentQueries() { // Given: - when(ksqlEngine.execute(any(), any())) + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of(transientQuery)); // When: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 184189cc4e2f..edbdd864f3b0 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -659,12 +659,12 @@ public void shouldNotDeleteSchemaNorTopicForTable() throws Exception { @Test public void shouldCleanUpInternalTopicsOnClose() { // Given: - final QueryMetadata query = KsqlEngineTestUtil.execute( + final QueryMetadata query = KsqlEngineTestUtil.executeQuery( serviceContext, ksqlEngine, "select * from test1 EMIT CHANGES;", KSQL_CONFIG, Collections.emptyMap() - ).get(0); + ); query.start(); @@ -720,12 +720,12 @@ public void shouldRemoveTransientQueryFromEngineWhenClosed() { // Given: final int startingLiveQueries = ksqlEngine.numberOfLiveQueries(); - final QueryMetadata query = KsqlEngineTestUtil.execute( + final QueryMetadata query = KsqlEngineTestUtil.executeQuery( serviceContext, ksqlEngine, "select * from test1 EMIT CHANGES;", KSQL_CONFIG, Collections.emptyMap() - ).get(0); + ); // When: query.close(); @@ -1308,4 +1308,4 @@ private void givenSqlAlreadyExecuted(final String sql) { sandbox = ksqlEngine.createSandbox(serviceContext); } -} \ No newline at end of file +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java index dc0e7e82b595..402204547ea9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java @@ -23,6 +23,7 @@ import io.confluent.ksql.metastore.MutableMetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector; @@ -32,6 +33,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collections; import java.util.List; import java.util.Map; @@ -84,6 +86,25 @@ public static List execute( return execute(serviceContext, engine, sql, ksqlConfig, overriddenProperties, Optional.empty()); } + public static TransientQueryMetadata executeQuery( + final ServiceContext serviceContext, + final KsqlEngine engine, + final String sql, + final KsqlConfig ksqlConfig, + final Map overriddenProperties) { + final ParsedStatement stmt = engine.parse(sql).get(0); + final PreparedStatement prepared = engine.prepare(stmt); + final ConfiguredStatement configured = ConfiguredStatement.of( + prepared, overriddenProperties, ksqlConfig).cast(); + try { + return engine.executeQuery(serviceContext, configured); + } catch (final KsqlStatementException e) { + // use the original statement text in the exception so that tests + // can easily check that the failed statement is the input statement + throw new KsqlStatementException(e.getRawMessage(), stmt.getStatementText(), e.getCause()); + } + } + /** * @param srClient if supplied, then schemas can be inferred from the schema registry. */ diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index f6d7ab797f3f..143100c85318 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -126,6 +126,12 @@ private QueryMetadata buildQuery(final String query) { return execute(CREATE_STREAM_TEST1 + query).get(0); } + private QueryMetadata buildTransientQuery(final String query) { + givenKafkaTopicsExist("test1"); + execute(CREATE_STREAM_TEST1); + return executeQuery(query); + } + @Test public void shouldHaveKStreamDataSource() { final PersistentQueryMetadata metadata = (PersistentQueryMetadata) buildQuery( @@ -135,14 +141,14 @@ public void shouldHaveKStreamDataSource() { @Test public void shouldMakeBareQuery() { - final QueryMetadata queryMetadata = buildQuery(simpleSelectFilter); + final QueryMetadata queryMetadata = buildTransientQuery(simpleSelectFilter); assertThat(queryMetadata, instanceOf(TransientQueryMetadata.class)); } @Test public void shouldBuildTransientQueryWithCorrectSchema() { // When: - final QueryMetadata queryMetadata = buildQuery(simpleSelectFilter); + final QueryMetadata queryMetadata = buildTransientQuery(simpleSelectFilter); // Then: assertThat(queryMetadata.getLogicalSchema(), is(LogicalSchema.builder() @@ -181,15 +187,17 @@ public void shouldMakePersistentQuery() { @Test public void shouldCreateExecutionPlan() { - final String queryString = + final String ddl = "CREATE STREAM TEST1 (" + "COL0 BIGINT, COL1 VARCHAR, COL2 STRING, COL3 DOUBLE," + " COL4 ARRAY, COL5 MAP)" - + " WITH (KAFKA_TOPIC = 'test1', VALUE_FORMAT = 'JSON');" - + "SELECT col0, sum(col3), count(col3) FROM test1" + + " WITH (KAFKA_TOPIC = 'test1', VALUE_FORMAT = 'JSON');"; + final String queryString = + "SELECT col0, sum(col3), count(col3) FROM test1" + " WHERE col0 > 100 GROUP BY col0 EMIT CHANGES;"; givenKafkaTopicsExist("test1"); - final QueryMetadata metadata = execute(queryString).get(0); + execute(ddl); + final QueryMetadata metadata = executeQuery(queryString); final String planText = metadata.getExecutionPlan(); final String[] lines = planText.split("\n"); assertThat(lines[0], startsWith( @@ -491,6 +499,11 @@ private List execute(final String sql) { Collections.emptyMap()); } + private TransientQueryMetadata executeQuery(final String sql) { + return KsqlEngineTestUtil.executeQuery( + serviceContext, ksqlEngine, sql, ksqlConfig, Collections.emptyMap()); + } + private void givenKafkaTopicsExist(final String... names) { Arrays.stream(names).forEach(name -> kafkaTopicClient.createTopic(name, 1, (short) 1, Collections.emptyMap()) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index ad9220dd388d..a3fa31e69f9e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.exception.ExceptionUtil; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.name.SourceName; @@ -29,6 +30,7 @@ import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.TerminateQuery; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.rest.entity.CommandId; @@ -268,8 +270,11 @@ private String executeDdlStatement(final PreparedStatement statement, final C final ConfiguredStatement configured = ConfiguredStatement.of(statement, command.getOverwriteProperties(), mergedConfig); + final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured); return ksqlEngine - .execute(serviceContext, configured) + .execute( + serviceContext, + ConfiguredKsqlPlan.of(plan, command.getOverwriteProperties(), mergedConfig)) .getCommandResult() .get(); } @@ -340,9 +345,14 @@ private PersistentQueryMetadata startQuery( queryIdGenerator.activateNewGenerator(offset); } - final QueryMetadata queryMetadata = ksqlEngine.execute(serviceContext, configured) - .getQuery() - .orElseThrow(() -> new IllegalStateException("Statement did not return a query")); + final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured); + final QueryMetadata queryMetadata = + ksqlEngine + .execute( + serviceContext, + ConfiguredKsqlPlan.of(plan, command.getOverwriteProperties(), mergedConfig)) + .getQuery() + .orElseThrow(() -> new IllegalStateException("Statement did not return a query")); if (!(queryMetadata instanceof PersistentQueryMetadata)) { throw new KsqlException(String.format( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java index be464e06d1e6..c4a11734e768 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java @@ -99,13 +99,24 @@ private static QueryDescription explainStatement( explain.getStatementText().substring("EXPLAIN ".length()), statement); - final QueryMetadata metadata = executionContext.createSandbox(serviceContext) - .execute( - serviceContext, - ConfiguredStatement.of(preparedStatement, explain.getOverrides(), explain.getConfig())) - .getQuery() - .orElseThrow(() -> - new IllegalStateException("The provided statement did not run a ksql query")); + final QueryMetadata metadata; + final KsqlExecutionContext sandbox = executionContext.createSandbox(serviceContext); + if (preparedStatement.getStatement() instanceof Query) { + metadata = sandbox.executeQuery( + serviceContext, + ConfiguredStatement.of( + preparedStatement, explain.getOverrides(), explain.getConfig()).cast() + ); + } else { + metadata = sandbox + .execute( + serviceContext, + ConfiguredStatement + .of(preparedStatement, explain.getOverrides(), explain.getConfig())) + .getQuery() + .orElseThrow(() -> + new IllegalStateException("The provided statement did not run a ksql query")); + } return QueryDescriptionFactory.forQueryMetadata(metadata); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java index e6f6ae597c97..170801806a8d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java @@ -60,11 +60,7 @@ class PushQueryPublisher implements Flow.Publisher> { @SuppressWarnings("OptionalGetWithoutIsPresent") @Override public synchronized void subscribe(final Flow.Subscriber> subscriber) { - final TransientQueryMetadata queryMetadata = - (TransientQueryMetadata) ksqlEngine.execute(serviceContext, query) - .getQuery() - .get(); - + final TransientQueryMetadata queryMetadata = ksqlEngine.executeQuery(serviceContext, query); final PushQuerySubscription subscription = new PushQuerySubscription(subscriber, queryMetadata); log.info("Running query {}", queryMetadata.getQueryApplicationId()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index a0d78826dd1f..8d13a4b1b6b6 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -42,7 +42,6 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; -import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.time.Duration; @@ -269,19 +268,10 @@ private Response handlePushQuery( final ConfiguredStatement configured = ConfiguredStatement.of(statement, streamsProperties, ksqlConfig); - final QueryMetadata query = ksqlEngine.execute(serviceContext, configured) - .getQuery() - .get(); - - if (!(query instanceof TransientQueryMetadata)) { - throw new IllegalStateException(String.format( - "Unexpected metadata type: expected TransientQueryMetadata, found %s instead", - query.getClass() - )); - } + final TransientQueryMetadata query = ksqlEngine.executeQuery(serviceContext, configured); final QueryStreamWriter queryStreamWriter = new QueryStreamWriter( - (TransientQueryMetadata) query, + query, disconnectCheckInterval.toMillis(), objectMapper); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 12f69549948e..c0997610c962 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -56,6 +56,7 @@ import io.confluent.ksql.security.KsqlSecurityExtension; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.version.metrics.VersionCheckerAgent; @@ -284,7 +285,8 @@ public void shouldOnlyCreateLogStreamIfCommandQueueEmpty() { @Test public void shouldNotCreateLogStreamIfValidationFails() { // Given: - when(sandBox.execute(any(), any())).thenThrow(new KsqlException("error")); + when(sandBox.execute(any(), any(ConfiguredStatement.class))) + .thenThrow(new KsqlException("error")); // When: app.startKsql(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index 6106f8eba71f..b46efc49e1ec 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -258,7 +258,8 @@ public void before() throws Exception { when(ksqlEngine.prepare(PARSED_STMT_0)).thenReturn((PreparedStatement) PREPARED_STMT_0); when(ksqlEngine.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); - when(ksqlEngine.execute(any(), any())).thenReturn(ExecuteResult.of(persistentQuery)); + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of(persistentQuery)); when(ksqlEngine.createSandbox(any())).thenReturn(sandBox); @@ -266,7 +267,8 @@ public void before() throws Exception { when(sandBox.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); when(sandBox.getServiceContext()).thenReturn(sandBoxServiceContext); - when(sandBox.execute(any(), any())).thenReturn(ExecuteResult.of("success")); + when(sandBox.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of("success")); when(sandBox.execute(sandBoxServiceContext, CSAS_CFG_WITH_TOPIC)) .thenReturn(ExecuteResult.of(persistentQuery)); @@ -595,7 +597,7 @@ public void shouldThrowIfExecutingPersistentQueryDoesNotReturnQuery() { // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any(), any())) + when(sandBox.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of("well, this is unexpected.")); expectedException.expect(KsqlException.class); @@ -610,7 +612,7 @@ public void shouldThrowIfExecutingPersistentQueryReturnsNonPersistentMetaData() // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any(), any())) + when(sandBox.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of(nonPersistentQueryMd)); expectedException.expect(KsqlException.class); @@ -632,7 +634,8 @@ public void shouldThrowIfParseThrows() { @Test(expected = RuntimeException.class) public void shouldThrowIfExecuteThrows() { // Given: - when(ksqlEngine.execute(any(), any())).thenThrow(new RuntimeException("Boom!")); + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) + .thenThrow(new RuntimeException("Boom!")); // When: standaloneExecutor.startAsync(); @@ -672,7 +675,8 @@ public void shouldStartQueries() { public void shouldNotStartValidationPhaseQueries() { // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any(), any())).thenReturn(ExecuteResult.of(sandBoxQuery)); + when(sandBox.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of(sandBoxQuery)); // When: standaloneExecutor.startAsync(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index f3e6924d02ca..da3aee586aa8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -38,6 +38,7 @@ import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.KsqlEngineTestUtil; +import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.internal.KsqlEngineMetrics; @@ -54,6 +55,7 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.rest.entity.CommandId; @@ -83,6 +85,9 @@ import org.easymock.IArgumentMatcher; import org.easymock.Mock; import org.hamcrest.CoreMatchers; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.hamcrest.integration.EasyMock2Adapter; import org.junit.After; import org.junit.Assert; @@ -92,6 +97,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.RuleChain; +import org.mockito.Mockito; public class InteractiveStatementExecutorTest extends EasyMockSupport { @@ -274,7 +280,9 @@ public void shouldBuildQueriesWithPersistedConfig() { expect(mockParser.parseSingleStatement(statementText)).andReturn(csasStatement); expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); - expect(mockEngine.execute(eq(serviceContext), eq(configuredCsas))) + final KsqlPlan plan = Mockito.mock(KsqlPlan.class); + expect(mockEngine.plan(eq(serviceContext), eq(configuredCsas))).andReturn(plan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(plan))) .andReturn(ExecuteResult.of(mockQueryMetadata)); mockQueryMetadata.start(); expectLastCall(); @@ -474,6 +482,7 @@ private PersistentQueryMetadata mockReplayCSAS( final String name, final QueryId queryId) { final CreateStreamAsSelect mockCSAS = mockCSAS(name); + final KsqlPlan mockPlan = Mockito.mock(KsqlPlan.class); final PersistentQueryMetadata mockQuery = mock(PersistentQueryMetadata.class); expect(mockQuery.getQueryId()).andStubReturn(queryId); final PreparedStatement csas = PreparedStatement.of("CSAS", mockCSAS); @@ -481,7 +490,9 @@ private PersistentQueryMetadata mockReplayCSAS( .andReturn(csas); expect(mockMetaStore.getSource(SourceName.of(name))).andStubReturn(null); expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); - expect(mockEngine.execute(eq(serviceContext), eqConfigured(csas))) + expect(mockEngine.plan(eq(serviceContext), eqConfigured(csas))) + .andReturn(mockPlan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(mockPlan))) .andReturn(ExecuteResult.of(mockQuery)); return mockQuery; } @@ -547,12 +558,10 @@ public void shouldCascade4Dot1DropStreamCommand() { mockQueryMetadata.close(); expectLastCall(); - expect(mockEngine - .execute( - eq(serviceContext), - eqConfigured(PreparedStatement.of("DROP", mockDropStream))) - ) - .andReturn(ExecuteResult.of("SUCCESS")); + final KsqlPlan plan = Mockito.mock(KsqlPlan.class); + expect(mockEngine.plan(eq(serviceContext), eqConfigured(PreparedStatement.of("DROP", mockDropStream)))) + .andReturn(plan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(plan))).andReturn(ExecuteResult.of("SUCCESS")); replayAll(); // When: @@ -569,6 +578,25 @@ public void shouldCascade4Dot1DropStreamCommand() { verify(mockParser, mockEngine, mockMetaStore); } + public static Matcher configuredPlan(final Matcher plan) { + return new TypeSafeMatcher() { + @Override + protected boolean matchesSafely(final ConfiguredKsqlPlan item) { + return plan.matches(item.getPlan()); + } + + @Override + public void describeTo(final Description description) { + plan.describeTo(description); + } + }; + } + + private static ConfiguredKsqlPlan eqConfigured(final KsqlPlan plan) { + EasyMock2Adapter.adapt(configuredPlan(equalTo(plan))); + return null; + } + private static ConfiguredStatement eqConfigured( final PreparedStatement preparedStatement ) { @@ -586,7 +614,9 @@ public void shouldNotCascadeDropStreamCommand() { final DropStream mockDropStream = mockDropStream("foo"); final PreparedStatement statement = PreparedStatement.of(drop, mockDropStream); - expect(mockEngine.execute(eq(serviceContext), eqConfigured(statement))) + final KsqlPlan plan = Mockito.mock(KsqlPlan.class); + expect(mockEngine.plan(eq(serviceContext), eqConfigured(statement))).andReturn(plan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(plan))) .andReturn(ExecuteResult.of("SUCCESS")); replayAll(); @@ -921,4 +951,4 @@ private static Command givenCommand(final String statementStr, final KsqlConfig return new Command( statementStr, true, emptyMap(), ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); } -} \ No newline at end of file +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 13fb8b4f74e8..513f9c2457f4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -1211,7 +1211,7 @@ public void shouldExplainQueryStatement() { ksqlString, QueryDescriptionEntity.class); // Then: - validateQueryDescription(ksqlQueryString, emptyMap(), query); + validateTransientQueryDescription(ksqlQueryString, emptyMap(), query); } @Test @@ -1288,7 +1288,7 @@ public void shouldReturn5xxOnStatementSystemError() { final String ksqlString = "CREATE STREAM test_explain AS SELECT * FROM test_stream;"; givenMockEngine(); - when(sandbox.execute(any(), any())) + when(sandbox.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new RuntimeException("internal error")); // When: @@ -2037,6 +2037,20 @@ private List makeMultipleRequest( .collect(Collectors.toList()); } + private void validateTransientQueryDescription( + final String ksqlQueryString, + final Map overriddenProperties, + final KsqlEntity entity) { + final TransientQueryMetadata queryMetadata = KsqlEngineTestUtil.executeQuery( + serviceContext, + ksqlEngine, + ksqlQueryString, + ksqlConfig, + overriddenProperties + ); + validateQueryDescription(queryMetadata, overriddenProperties, entity); + } + @SuppressWarnings("SameParameterValue") private void validateQueryDescription( final String ksqlQueryString, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 283c7793aae7..f87cc46a18c7 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -38,7 +38,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.json.JsonMapper; @@ -136,13 +135,16 @@ public class StreamedQueryResourceTest { @Mock private KsqlAuthorizationValidator authorizationValidator; private StreamedQueryResource testResource; - private PreparedStatement statement; + private PreparedStatement invalid; + private PreparedStatement query; + private PreparedStatement print; @Before public void setup() { when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); - statement = PreparedStatement.of(PUSH_QUERY_STRING, mock(Statement.class)); - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(statement); + query = PreparedStatement.of(PUSH_QUERY_STRING, mock(Query.class)); + invalid = PreparedStatement.of("sql", mock(Statement.class)); + when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(invalid); final Query pullQuery = mock(Query.class); when(pullQuery.isPullQuery()).thenReturn(true); @@ -358,11 +360,10 @@ public void shouldStreamRowsCorrectly() throws Throwable { final KafkaStreams mockKafkaStreams = mock(KafkaStreams.class); - final Map requestStreamsProperties = Collections.emptyMap(); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(query); - statement = PreparedStatement.of("query", mock(Query.class)); - when(mockStatementParser.parseSingleStatement(queryString)) - .thenReturn(statement); + final Map requestStreamsProperties = Collections.emptyMap(); final TransientQueryMetadata transientQueryMetadata = new TransientQueryMetadata( @@ -379,9 +380,9 @@ public void shouldStreamRowsCorrectly() throws Throwable { Collections.emptyMap(), queryCloseCallback); - when(mockKsqlEngine.execute(serviceContext, - ConfiguredStatement.of(statement, requestStreamsProperties, VALID_CONFIG))) - .thenReturn(ExecuteResult.of(transientQueryMetadata)); + when(mockKsqlEngine.executeQuery(serviceContext, + ConfiguredStatement.of(query, requestStreamsProperties, VALID_CONFIG))) + .thenReturn(transientQueryMetadata); final Response response = testResource.streamQuery( @@ -535,10 +536,8 @@ public void shouldUpdateTheLastRequestTime() { @Test public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() { // Given: - statement = PreparedStatement.of("query", mock(Query.class)); - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) - .thenReturn(statement); - + when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) + .thenReturn(query); doThrow( new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))) .when(authorizationValidator).checkAuthorization(any(), any(), any()); @@ -561,9 +560,8 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() @Test public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { // Given: - statement = PreparedStatement.of("query", mock(Query.class)); - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) - .thenReturn(statement); + when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) + .thenReturn(query); doThrow(new KsqlException( "", new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME)))) @@ -589,9 +587,9 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc @Test public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() { // Given: - statement = PreparedStatement.of("print", mock(PrintTopic.class)); - when(mockStatementParser.parseSingleStatement(PRINT_TOPIC)) - .thenReturn(statement); + print = PreparedStatement.of("print", mock(PrintTopic.class)); + when(mockStatementParser.parseSingleStatement(PRINT_TOPIC)) + .thenReturn(print); doThrow( new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))) @@ -615,9 +613,9 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { // Given: final PrintTopic cmd = mock(PrintTopic.class); when(cmd.getTopic()).thenReturn("TEST_TOPIC"); - statement = PreparedStatement.of("print", cmd); - when(mockStatementParser.parseSingleStatement(any())) - .thenReturn(statement); + print = PreparedStatement.of("print", cmd); + when(mockStatementParser.parseSingleStatement(any())) + .thenReturn(print); when(mockKafkaTopicClient.isTopicExists(any())).thenReturn(false); when(mockKafkaTopicClient.listTopicNames()).thenReturn(ImmutableSet.of(