From 3a7f98a28817be7db1cefff18063db90d70523a5 Mon Sep 17 00:00:00 2001 From: rodesai Date: Fri, 4 Oct 2019 00:13:52 -0700 Subject: [PATCH] feat: expose execution plans from the ksql engine API This patch adds interfaces to build and execute plans in the KSQL engine API, and changes StatementExecutor to use these interfaces to run statements. It also exposes a method executeQuery that the query endpoint can use to build transient queries. --- .../confluent/ksql/KsqlExecutionContext.java | 22 +++++++ .../confluent/ksql/embedded/KsqlContext.java | 21 ++++--- .../confluent/ksql/engine/EngineExecutor.java | 13 +---- .../io/confluent/ksql/engine/KsqlEngine.java | 44 ++++++++++++-- .../engine/SandboxedExecutionContext.java | 52 ++++++++++++++++- .../ksql/planner/plan/ConfiguredKsqlPlan.java | 57 +++++++++++++++++++ .../ksql/embedded/KsqlContextTest.java | 15 ++--- .../confluent/ksql/engine/KsqlEngineTest.java | 10 ++-- .../ksql/engine/KsqlEngineTestUtil.java | 21 +++++++ .../physical/PhysicalPlanBuilderTest.java | 25 ++++++-- .../InteractiveStatementExecutor.java | 18 ++++-- .../server/execution/ExplainExecutor.java | 25 +++++--- .../streaming/PushQueryPublisher.java | 6 +- .../streaming/StreamedQueryResource.java | 14 +---- .../rest/server/KsqlRestApplicationTest.java | 4 +- .../rest/server/StandaloneExecutorTest.java | 16 ++++-- .../InteractiveStatementExecutorTest.java | 50 ++++++++++++---- .../server/resources/KsqlResourceTest.java | 18 +++++- .../streaming/StreamedQueryResourceTest.java | 46 +++++++-------- 19 files changed, 362 insertions(+), 115 deletions(-) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ConfiguredKsqlPlan.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java index 68fd6dbf56b1..6e581dfa6c63 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlExecutionContext.java @@ -16,14 +16,18 @@ package io.confluent.ksql; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -88,6 +92,24 @@ public interface KsqlExecutionContext { */ PreparedStatement prepare(ParsedStatement stmt); + /** + * Executes a query using the supplied service context. + */ + TransientQueryMetadata executeQuery( + ServiceContext serviceContext, + ConfiguredStatement statement + ); + + /** + * Computes a plan for executing a DDL/DML statement using the supplied service context. + */ + KsqlPlan plan(ServiceContext serviceContext, ConfiguredStatement statement); + + /** + * Executes a KSQL plan using the supplied service context. + */ + ExecuteResult execute(ServiceContext serviceContext, ConfiguredKsqlPlan plan); + /** * Execute the supplied statement, updating the meta store and registering any query. * diff --git a/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java b/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java index ec4575698821..065c26a21b9a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java @@ -28,6 +28,7 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.UnsetProperty; @@ -197,10 +198,11 @@ private static ExecuteResult execute( final CustomExecutor executor = CustomExecutors.EXECUTOR_MAP.getOrDefault( configured.getStatement().getClass(), - (serviceContext, s, props) -> executionContext.execute(serviceContext, s)); + (passedExecutionContext, s, props) -> passedExecutionContext.execute( + passedExecutionContext.getServiceContext(), s)); return executor.apply( - executionContext.getServiceContext(), + executionContext, configured, mutableSessionPropertyOverrides ); @@ -209,7 +211,7 @@ private static ExecuteResult execute( @FunctionalInterface private interface CustomExecutor { ExecuteResult apply( - ServiceContext serviceContext, + KsqlExecutionContext executionContext, ConfiguredStatement statement, Map mutableSessionPropertyOverrides ); @@ -218,13 +220,17 @@ ExecuteResult apply( @SuppressWarnings("unchecked") private enum CustomExecutors { - SET_PROPERTY(SetProperty.class, (serviceContext, stmt, props) -> { + SET_PROPERTY(SetProperty.class, (executionContext, stmt, props) -> { PropertyOverrider.set((ConfiguredStatement) stmt, props); return ExecuteResult.of("Successfully executed " + stmt.getStatement()); }), - UNSET_PROPERTY(UnsetProperty.class, (serviceContext, stmt, props) -> { + UNSET_PROPERTY(UnsetProperty.class, (executionContext, stmt, props) -> { PropertyOverrider.unset((ConfiguredStatement) stmt, props); return ExecuteResult.of("Successfully executed " + stmt.getStatement()); + }), + QUERY(Query.class, (executionContext, stmt, props) -> { + return ExecuteResult.of( + executionContext.executeQuery(executionContext.getServiceContext(), stmt.cast())); }) ; @@ -256,11 +262,12 @@ private CustomExecutor getExecutor() { } public ExecuteResult execute( - final ServiceContext serviceContext, + final KsqlExecutionContext executionContext, final ConfiguredStatement statement, final Map mutableSessionPropertyOverrides ) { - return executor.apply(serviceContext, statement, mutableSessionPropertyOverrides); + return executor.apply( + executionContext, statement, mutableSessionPropertyOverrides); } } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index d259d3c2ff2e..f58e47d36cdd 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -98,15 +98,8 @@ static EngineExecutor create( return new EngineExecutor(engineContext, serviceContext, ksqlConfig, overriddenProperties); } - ExecuteResult execute(final ConfiguredStatement statement) { - if (statement.getStatement() instanceof Query) { - return ExecuteResult.of(executeQuery(statement.cast())); - } - return execute(plan(statement)); - } - @SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty - private ExecuteResult execute(final KsqlPlan plan) { + ExecuteResult execute(final KsqlPlan plan) { final Optional ddlResult = plan.getDdlCommand() .map(ddl -> executeDdl(ddl, plan.getStatementText())); @@ -119,7 +112,7 @@ private ExecuteResult execute(final KsqlPlan plan) { } @SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty - private TransientQueryMetadata executeQuery(final ConfiguredStatement statement) { + TransientQueryMetadata executeQuery(final ConfiguredStatement statement) { final ExecutorPlans plans = planQuery(statement, statement.getStatement(), Optional.empty()); final OutputNode outputNode = plans.logicalPlan.getNode().get(); final QueryExecutor executor = engineContext.createQueryExecutor( @@ -139,7 +132,7 @@ private TransientQueryMetadata executeQuery(final ConfiguredStatement sta } @SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty - private KsqlPlan plan(final ConfiguredStatement statement) { + KsqlPlan plan(final ConfiguredStatement statement) { try { throwOnNonExecutableStatement(statement); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index d88725b6ba3d..fa7827a06484 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -31,6 +31,7 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.QueryContainer; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.schema.registry.SchemaRegistryUtil; @@ -38,6 +39,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.io.Closeable; import java.util.List; import java.util.Objects; @@ -160,19 +162,49 @@ public PreparedStatement prepare(final ParsedStatement stmt) { } @Override - public ExecuteResult execute( + public KsqlPlan plan( final ServiceContext serviceContext, - final ConfiguredStatement statement - ) { - final ExecuteResult result = EngineExecutor + final ConfiguredStatement statement) { + return EngineExecutor .create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides()) - .execute(statement); + .plan(statement); + } + @Override + public ExecuteResult execute(final ServiceContext serviceContext, final ConfiguredKsqlPlan plan) { + final ExecuteResult result = EngineExecutor + .create(primaryContext, serviceContext, plan.getConfig(), plan.getOverrides()) + .execute(plan.getPlan()); result.getQuery().ifPresent(this::registerQuery); - return result; } + @Override + public ExecuteResult execute( + final ServiceContext serviceContext, + final ConfiguredStatement statement + ) { + return execute( + serviceContext, + ConfiguredKsqlPlan.of( + plan(serviceContext, statement), + statement.getOverrides(), + statement.getConfig() + ) + ); + } + + @Override + public TransientQueryMetadata executeQuery( + final ServiceContext serviceContext, + final ConfiguredStatement statement) { + final TransientQueryMetadata query = EngineExecutor + .create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides()) + .executeQuery(statement); + registerQuery(query); + return query; + } + @Override public void close() { allLiveQueries.forEach(QueryMetadata::close); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java index 8bb8700fe007..bd80d1f84675 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/SandboxedExecutionContext.java @@ -20,11 +20,14 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.Sandbox; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.List; import java.util.Optional; @@ -79,14 +82,57 @@ public PreparedStatement prepare(final ParsedStatement stmt) { return engineContext.prepare(stmt); } + @Override + public KsqlPlan plan( + final ServiceContext serviceContext, + final ConfiguredStatement statement + ) { + return EngineExecutor.create( + engineContext, + serviceContext, + statement.getConfig(), + statement.getOverrides() + ).plan(statement); + } + + @Override + public ExecuteResult execute( + final ServiceContext serviceContext, + final ConfiguredKsqlPlan ksqlPlan + ) { + return EngineExecutor.create( + engineContext, + serviceContext, + ksqlPlan.getConfig(), + ksqlPlan.getOverrides() + ).execute(ksqlPlan.getPlan()); + } + @Override public ExecuteResult execute( final ServiceContext serviceContext, final ConfiguredStatement statement ) { - final EngineExecutor executor = EngineExecutor - .create(engineContext, serviceContext, statement.getConfig(), statement.getOverrides()); + return execute( + serviceContext, + ConfiguredKsqlPlan.of( + plan(serviceContext, statement), + statement.getOverrides(), + statement.getConfig() + ) + ); + } - return executor.execute(statement); + @Override + public TransientQueryMetadata executeQuery( + final ServiceContext serviceContext, + final ConfiguredStatement statement + ) { + return EngineExecutor.create( + engineContext, + serviceContext, + statement.getConfig(), + statement.getOverrides() + ).executeQuery(statement); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ConfiguredKsqlPlan.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ConfiguredKsqlPlan.java new file mode 100644 index 000000000000..29fadfdce7ef --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ConfiguredKsqlPlan.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.planner.plan; + +import io.confluent.ksql.engine.KsqlPlan; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; +import java.util.Objects; + +public final class ConfiguredKsqlPlan { + private final KsqlPlan plan; + private final Map overrides; + private final KsqlConfig config; + + public static ConfiguredKsqlPlan of( + final KsqlPlan plan, + final Map overrides, + final KsqlConfig config + ) { + return new ConfiguredKsqlPlan(plan, overrides, config); + } + + private ConfiguredKsqlPlan( + final KsqlPlan plan, + final Map overrides, + final KsqlConfig config + ) { + this.plan = Objects.requireNonNull(plan, "plan"); + this.overrides = Objects.requireNonNull(overrides, "overrides"); + this.config = Objects.requireNonNull(config, "config"); + } + + public KsqlPlan getPlan() { + return plan; + } + + public Map getOverrides() { + return overrides; + } + + public KsqlConfig getConfig() { + return config; + } +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java index 46d9eafaf8e1..c315b830fe7e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java @@ -123,7 +123,8 @@ public void setUp() { when(ksqlEngine.prepare(PARSED_STMT_0)).thenReturn((PreparedStatement) PREPARED_STMT_0); when(ksqlEngine.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); - when(ksqlEngine.execute(any(), any())).thenReturn(ExecuteResult.of("success")); + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of("success")); when(ksqlEngine.createSandbox(any())).thenReturn(sandbox); @@ -204,7 +205,7 @@ public void shouldThrowIfParseFails() { @Test public void shouldThrowIfSandboxExecuteThrows() { // Given: - when(sandbox.execute(any(), any())) + when(sandbox.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new KsqlException("Bad tings happen")); // Expect @@ -218,7 +219,7 @@ public void shouldThrowIfSandboxExecuteThrows() { @Test public void shouldThrowIfExecuteThrows() { // Given: - when(ksqlEngine.execute(any(), any())) + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new KsqlException("Bad tings happen")); // Expect @@ -232,7 +233,7 @@ public void shouldThrowIfExecuteThrows() { @Test public void shouldNotExecuteAnyStatementsIfTryExecuteThrows() { // Given: - when(sandbox.execute(any(), any())) + when(sandbox.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new KsqlException("Bad tings happen")); // When: @@ -243,13 +244,13 @@ public void shouldNotExecuteAnyStatementsIfTryExecuteThrows() { } // Then: - verify(ksqlEngine, never()).execute(any(), any()); + verify(ksqlEngine, never()).execute(any(), any(ConfiguredStatement.class)); } @Test public void shouldStartPersistentQueries() { // Given: - when(ksqlEngine.execute(any(), any())) + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of(persistentQuery)); // When: @@ -262,7 +263,7 @@ public void shouldStartPersistentQueries() { @Test public void shouldNotBlowUpOnSqlThatDoesNotResultInPersistentQueries() { // Given: - when(ksqlEngine.execute(any(), any())) + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of(transientQuery)); // When: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 184189cc4e2f..edbdd864f3b0 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -659,12 +659,12 @@ public void shouldNotDeleteSchemaNorTopicForTable() throws Exception { @Test public void shouldCleanUpInternalTopicsOnClose() { // Given: - final QueryMetadata query = KsqlEngineTestUtil.execute( + final QueryMetadata query = KsqlEngineTestUtil.executeQuery( serviceContext, ksqlEngine, "select * from test1 EMIT CHANGES;", KSQL_CONFIG, Collections.emptyMap() - ).get(0); + ); query.start(); @@ -720,12 +720,12 @@ public void shouldRemoveTransientQueryFromEngineWhenClosed() { // Given: final int startingLiveQueries = ksqlEngine.numberOfLiveQueries(); - final QueryMetadata query = KsqlEngineTestUtil.execute( + final QueryMetadata query = KsqlEngineTestUtil.executeQuery( serviceContext, ksqlEngine, "select * from test1 EMIT CHANGES;", KSQL_CONFIG, Collections.emptyMap() - ).get(0); + ); // When: query.close(); @@ -1308,4 +1308,4 @@ private void givenSqlAlreadyExecuted(final String sql) { sandbox = ksqlEngine.createSandbox(serviceContext); } -} \ No newline at end of file +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java index dc0e7e82b595..402204547ea9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java @@ -23,6 +23,7 @@ import io.confluent.ksql.metastore.MutableMetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector; @@ -32,6 +33,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collections; import java.util.List; import java.util.Map; @@ -84,6 +86,25 @@ public static List execute( return execute(serviceContext, engine, sql, ksqlConfig, overriddenProperties, Optional.empty()); } + public static TransientQueryMetadata executeQuery( + final ServiceContext serviceContext, + final KsqlEngine engine, + final String sql, + final KsqlConfig ksqlConfig, + final Map overriddenProperties) { + final ParsedStatement stmt = engine.parse(sql).get(0); + final PreparedStatement prepared = engine.prepare(stmt); + final ConfiguredStatement configured = ConfiguredStatement.of( + prepared, overriddenProperties, ksqlConfig).cast(); + try { + return engine.executeQuery(serviceContext, configured); + } catch (final KsqlStatementException e) { + // use the original statement text in the exception so that tests + // can easily check that the failed statement is the input statement + throw new KsqlStatementException(e.getRawMessage(), stmt.getStatementText(), e.getCause()); + } + } + /** * @param srClient if supplied, then schemas can be inferred from the schema registry. */ diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index f6d7ab797f3f..143100c85318 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -126,6 +126,12 @@ private QueryMetadata buildQuery(final String query) { return execute(CREATE_STREAM_TEST1 + query).get(0); } + private QueryMetadata buildTransientQuery(final String query) { + givenKafkaTopicsExist("test1"); + execute(CREATE_STREAM_TEST1); + return executeQuery(query); + } + @Test public void shouldHaveKStreamDataSource() { final PersistentQueryMetadata metadata = (PersistentQueryMetadata) buildQuery( @@ -135,14 +141,14 @@ public void shouldHaveKStreamDataSource() { @Test public void shouldMakeBareQuery() { - final QueryMetadata queryMetadata = buildQuery(simpleSelectFilter); + final QueryMetadata queryMetadata = buildTransientQuery(simpleSelectFilter); assertThat(queryMetadata, instanceOf(TransientQueryMetadata.class)); } @Test public void shouldBuildTransientQueryWithCorrectSchema() { // When: - final QueryMetadata queryMetadata = buildQuery(simpleSelectFilter); + final QueryMetadata queryMetadata = buildTransientQuery(simpleSelectFilter); // Then: assertThat(queryMetadata.getLogicalSchema(), is(LogicalSchema.builder() @@ -181,15 +187,17 @@ public void shouldMakePersistentQuery() { @Test public void shouldCreateExecutionPlan() { - final String queryString = + final String ddl = "CREATE STREAM TEST1 (" + "COL0 BIGINT, COL1 VARCHAR, COL2 STRING, COL3 DOUBLE," + " COL4 ARRAY, COL5 MAP)" - + " WITH (KAFKA_TOPIC = 'test1', VALUE_FORMAT = 'JSON');" - + "SELECT col0, sum(col3), count(col3) FROM test1" + + " WITH (KAFKA_TOPIC = 'test1', VALUE_FORMAT = 'JSON');"; + final String queryString = + "SELECT col0, sum(col3), count(col3) FROM test1" + " WHERE col0 > 100 GROUP BY col0 EMIT CHANGES;"; givenKafkaTopicsExist("test1"); - final QueryMetadata metadata = execute(queryString).get(0); + execute(ddl); + final QueryMetadata metadata = executeQuery(queryString); final String planText = metadata.getExecutionPlan(); final String[] lines = planText.split("\n"); assertThat(lines[0], startsWith( @@ -491,6 +499,11 @@ private List execute(final String sql) { Collections.emptyMap()); } + private TransientQueryMetadata executeQuery(final String sql) { + return KsqlEngineTestUtil.executeQuery( + serviceContext, ksqlEngine, sql, ksqlConfig, Collections.emptyMap()); + } + private void givenKafkaTopicsExist(final String... names) { Arrays.stream(names).forEach(name -> kafkaTopicClient.createTopic(name, 1, (short) 1, Collections.emptyMap()) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index ad9220dd388d..a3fa31e69f9e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.exception.ExceptionUtil; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.name.SourceName; @@ -29,6 +30,7 @@ import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.TerminateQuery; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.rest.entity.CommandId; @@ -268,8 +270,11 @@ private String executeDdlStatement(final PreparedStatement statement, final C final ConfiguredStatement configured = ConfiguredStatement.of(statement, command.getOverwriteProperties(), mergedConfig); + final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured); return ksqlEngine - .execute(serviceContext, configured) + .execute( + serviceContext, + ConfiguredKsqlPlan.of(plan, command.getOverwriteProperties(), mergedConfig)) .getCommandResult() .get(); } @@ -340,9 +345,14 @@ private PersistentQueryMetadata startQuery( queryIdGenerator.activateNewGenerator(offset); } - final QueryMetadata queryMetadata = ksqlEngine.execute(serviceContext, configured) - .getQuery() - .orElseThrow(() -> new IllegalStateException("Statement did not return a query")); + final KsqlPlan plan = ksqlEngine.plan(serviceContext, configured); + final QueryMetadata queryMetadata = + ksqlEngine + .execute( + serviceContext, + ConfiguredKsqlPlan.of(plan, command.getOverwriteProperties(), mergedConfig)) + .getQuery() + .orElseThrow(() -> new IllegalStateException("Statement did not return a query")); if (!(queryMetadata instanceof PersistentQueryMetadata)) { throw new KsqlException(String.format( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java index be464e06d1e6..c4a11734e768 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java @@ -99,13 +99,24 @@ private static QueryDescription explainStatement( explain.getStatementText().substring("EXPLAIN ".length()), statement); - final QueryMetadata metadata = executionContext.createSandbox(serviceContext) - .execute( - serviceContext, - ConfiguredStatement.of(preparedStatement, explain.getOverrides(), explain.getConfig())) - .getQuery() - .orElseThrow(() -> - new IllegalStateException("The provided statement did not run a ksql query")); + final QueryMetadata metadata; + final KsqlExecutionContext sandbox = executionContext.createSandbox(serviceContext); + if (preparedStatement.getStatement() instanceof Query) { + metadata = sandbox.executeQuery( + serviceContext, + ConfiguredStatement.of( + preparedStatement, explain.getOverrides(), explain.getConfig()).cast() + ); + } else { + metadata = sandbox + .execute( + serviceContext, + ConfiguredStatement + .of(preparedStatement, explain.getOverrides(), explain.getConfig())) + .getQuery() + .orElseThrow(() -> + new IllegalStateException("The provided statement did not run a ksql query")); + } return QueryDescriptionFactory.forQueryMetadata(metadata); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java index e6f6ae597c97..170801806a8d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.java @@ -60,11 +60,7 @@ class PushQueryPublisher implements Flow.Publisher> { @SuppressWarnings("OptionalGetWithoutIsPresent") @Override public synchronized void subscribe(final Flow.Subscriber> subscriber) { - final TransientQueryMetadata queryMetadata = - (TransientQueryMetadata) ksqlEngine.execute(serviceContext, query) - .getQuery() - .get(); - + final TransientQueryMetadata queryMetadata = ksqlEngine.executeQuery(serviceContext, query); final PushQuerySubscription subscription = new PushQuerySubscription(subscriber, queryMetadata); log.info("Running query {}", queryMetadata.getQueryApplicationId()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index a0d78826dd1f..8d13a4b1b6b6 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -42,7 +42,6 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; -import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.time.Duration; @@ -269,19 +268,10 @@ private Response handlePushQuery( final ConfiguredStatement configured = ConfiguredStatement.of(statement, streamsProperties, ksqlConfig); - final QueryMetadata query = ksqlEngine.execute(serviceContext, configured) - .getQuery() - .get(); - - if (!(query instanceof TransientQueryMetadata)) { - throw new IllegalStateException(String.format( - "Unexpected metadata type: expected TransientQueryMetadata, found %s instead", - query.getClass() - )); - } + final TransientQueryMetadata query = ksqlEngine.executeQuery(serviceContext, configured); final QueryStreamWriter queryStreamWriter = new QueryStreamWriter( - (TransientQueryMetadata) query, + query, disconnectCheckInterval.toMillis(), objectMapper); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 12f69549948e..c0997610c962 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -56,6 +56,7 @@ import io.confluent.ksql.security.KsqlSecurityExtension; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.version.metrics.VersionCheckerAgent; @@ -284,7 +285,8 @@ public void shouldOnlyCreateLogStreamIfCommandQueueEmpty() { @Test public void shouldNotCreateLogStreamIfValidationFails() { // Given: - when(sandBox.execute(any(), any())).thenThrow(new KsqlException("error")); + when(sandBox.execute(any(), any(ConfiguredStatement.class))) + .thenThrow(new KsqlException("error")); // When: app.startKsql(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index 6106f8eba71f..b46efc49e1ec 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -258,7 +258,8 @@ public void before() throws Exception { when(ksqlEngine.prepare(PARSED_STMT_0)).thenReturn((PreparedStatement) PREPARED_STMT_0); when(ksqlEngine.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); - when(ksqlEngine.execute(any(), any())).thenReturn(ExecuteResult.of(persistentQuery)); + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of(persistentQuery)); when(ksqlEngine.createSandbox(any())).thenReturn(sandBox); @@ -266,7 +267,8 @@ public void before() throws Exception { when(sandBox.prepare(PARSED_STMT_1)).thenReturn((PreparedStatement) PREPARED_STMT_1); when(sandBox.getServiceContext()).thenReturn(sandBoxServiceContext); - when(sandBox.execute(any(), any())).thenReturn(ExecuteResult.of("success")); + when(sandBox.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of("success")); when(sandBox.execute(sandBoxServiceContext, CSAS_CFG_WITH_TOPIC)) .thenReturn(ExecuteResult.of(persistentQuery)); @@ -595,7 +597,7 @@ public void shouldThrowIfExecutingPersistentQueryDoesNotReturnQuery() { // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any(), any())) + when(sandBox.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of("well, this is unexpected.")); expectedException.expect(KsqlException.class); @@ -610,7 +612,7 @@ public void shouldThrowIfExecutingPersistentQueryReturnsNonPersistentMetaData() // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any(), any())) + when(sandBox.execute(any(), any(ConfiguredStatement.class))) .thenReturn(ExecuteResult.of(nonPersistentQueryMd)); expectedException.expect(KsqlException.class); @@ -632,7 +634,8 @@ public void shouldThrowIfParseThrows() { @Test(expected = RuntimeException.class) public void shouldThrowIfExecuteThrows() { // Given: - when(ksqlEngine.execute(any(), any())).thenThrow(new RuntimeException("Boom!")); + when(ksqlEngine.execute(any(), any(ConfiguredStatement.class))) + .thenThrow(new RuntimeException("Boom!")); // When: standaloneExecutor.startAsync(); @@ -672,7 +675,8 @@ public void shouldStartQueries() { public void shouldNotStartValidationPhaseQueries() { // Given: givenFileContainsAPersistentQuery(); - when(sandBox.execute(any(), any())).thenReturn(ExecuteResult.of(sandBoxQuery)); + when(sandBox.execute(any(), any(ConfiguredStatement.class))) + .thenReturn(ExecuteResult.of(sandBoxQuery)); // When: standaloneExecutor.startAsync(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index f3e6924d02ca..da3aee586aa8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -38,6 +38,7 @@ import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.KsqlEngineTestUtil; +import io.confluent.ksql.engine.KsqlPlan; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.internal.KsqlEngineMetrics; @@ -54,6 +55,7 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.rest.entity.CommandId; @@ -83,6 +85,9 @@ import org.easymock.IArgumentMatcher; import org.easymock.Mock; import org.hamcrest.CoreMatchers; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.hamcrest.integration.EasyMock2Adapter; import org.junit.After; import org.junit.Assert; @@ -92,6 +97,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.RuleChain; +import org.mockito.Mockito; public class InteractiveStatementExecutorTest extends EasyMockSupport { @@ -274,7 +280,9 @@ public void shouldBuildQueriesWithPersistedConfig() { expect(mockParser.parseSingleStatement(statementText)).andReturn(csasStatement); expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); - expect(mockEngine.execute(eq(serviceContext), eq(configuredCsas))) + final KsqlPlan plan = Mockito.mock(KsqlPlan.class); + expect(mockEngine.plan(eq(serviceContext), eq(configuredCsas))).andReturn(plan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(plan))) .andReturn(ExecuteResult.of(mockQueryMetadata)); mockQueryMetadata.start(); expectLastCall(); @@ -474,6 +482,7 @@ private PersistentQueryMetadata mockReplayCSAS( final String name, final QueryId queryId) { final CreateStreamAsSelect mockCSAS = mockCSAS(name); + final KsqlPlan mockPlan = Mockito.mock(KsqlPlan.class); final PersistentQueryMetadata mockQuery = mock(PersistentQueryMetadata.class); expect(mockQuery.getQueryId()).andStubReturn(queryId); final PreparedStatement csas = PreparedStatement.of("CSAS", mockCSAS); @@ -481,7 +490,9 @@ private PersistentQueryMetadata mockReplayCSAS( .andReturn(csas); expect(mockMetaStore.getSource(SourceName.of(name))).andStubReturn(null); expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); - expect(mockEngine.execute(eq(serviceContext), eqConfigured(csas))) + expect(mockEngine.plan(eq(serviceContext), eqConfigured(csas))) + .andReturn(mockPlan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(mockPlan))) .andReturn(ExecuteResult.of(mockQuery)); return mockQuery; } @@ -547,12 +558,10 @@ public void shouldCascade4Dot1DropStreamCommand() { mockQueryMetadata.close(); expectLastCall(); - expect(mockEngine - .execute( - eq(serviceContext), - eqConfigured(PreparedStatement.of("DROP", mockDropStream))) - ) - .andReturn(ExecuteResult.of("SUCCESS")); + final KsqlPlan plan = Mockito.mock(KsqlPlan.class); + expect(mockEngine.plan(eq(serviceContext), eqConfigured(PreparedStatement.of("DROP", mockDropStream)))) + .andReturn(plan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(plan))).andReturn(ExecuteResult.of("SUCCESS")); replayAll(); // When: @@ -569,6 +578,25 @@ public void shouldCascade4Dot1DropStreamCommand() { verify(mockParser, mockEngine, mockMetaStore); } + public static Matcher configuredPlan(final Matcher plan) { + return new TypeSafeMatcher() { + @Override + protected boolean matchesSafely(final ConfiguredKsqlPlan item) { + return plan.matches(item.getPlan()); + } + + @Override + public void describeTo(final Description description) { + plan.describeTo(description); + } + }; + } + + private static ConfiguredKsqlPlan eqConfigured(final KsqlPlan plan) { + EasyMock2Adapter.adapt(configuredPlan(equalTo(plan))); + return null; + } + private static ConfiguredStatement eqConfigured( final PreparedStatement preparedStatement ) { @@ -586,7 +614,9 @@ public void shouldNotCascadeDropStreamCommand() { final DropStream mockDropStream = mockDropStream("foo"); final PreparedStatement statement = PreparedStatement.of(drop, mockDropStream); - expect(mockEngine.execute(eq(serviceContext), eqConfigured(statement))) + final KsqlPlan plan = Mockito.mock(KsqlPlan.class); + expect(mockEngine.plan(eq(serviceContext), eqConfigured(statement))).andReturn(plan); + expect(mockEngine.execute(eq(serviceContext), eqConfigured(plan))) .andReturn(ExecuteResult.of("SUCCESS")); replayAll(); @@ -921,4 +951,4 @@ private static Command givenCommand(final String statementStr, final KsqlConfig return new Command( statementStr, true, emptyMap(), ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); } -} \ No newline at end of file +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 13fb8b4f74e8..513f9c2457f4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -1211,7 +1211,7 @@ public void shouldExplainQueryStatement() { ksqlString, QueryDescriptionEntity.class); // Then: - validateQueryDescription(ksqlQueryString, emptyMap(), query); + validateTransientQueryDescription(ksqlQueryString, emptyMap(), query); } @Test @@ -1288,7 +1288,7 @@ public void shouldReturn5xxOnStatementSystemError() { final String ksqlString = "CREATE STREAM test_explain AS SELECT * FROM test_stream;"; givenMockEngine(); - when(sandbox.execute(any(), any())) + when(sandbox.execute(any(), any(ConfiguredStatement.class))) .thenThrow(new RuntimeException("internal error")); // When: @@ -2037,6 +2037,20 @@ private List makeMultipleRequest( .collect(Collectors.toList()); } + private void validateTransientQueryDescription( + final String ksqlQueryString, + final Map overriddenProperties, + final KsqlEntity entity) { + final TransientQueryMetadata queryMetadata = KsqlEngineTestUtil.executeQuery( + serviceContext, + ksqlEngine, + ksqlQueryString, + ksqlConfig, + overriddenProperties + ); + validateQueryDescription(queryMetadata, overriddenProperties, entity); + } + @SuppressWarnings("SameParameterValue") private void validateQueryDescription( final String ksqlQueryString, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 283c7793aae7..f87cc46a18c7 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -38,7 +38,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.json.JsonMapper; @@ -136,13 +135,16 @@ public class StreamedQueryResourceTest { @Mock private KsqlAuthorizationValidator authorizationValidator; private StreamedQueryResource testResource; - private PreparedStatement statement; + private PreparedStatement invalid; + private PreparedStatement query; + private PreparedStatement print; @Before public void setup() { when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); - statement = PreparedStatement.of(PUSH_QUERY_STRING, mock(Statement.class)); - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(statement); + query = PreparedStatement.of(PUSH_QUERY_STRING, mock(Query.class)); + invalid = PreparedStatement.of("sql", mock(Statement.class)); + when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)).thenReturn(invalid); final Query pullQuery = mock(Query.class); when(pullQuery.isPullQuery()).thenReturn(true); @@ -358,11 +360,10 @@ public void shouldStreamRowsCorrectly() throws Throwable { final KafkaStreams mockKafkaStreams = mock(KafkaStreams.class); - final Map requestStreamsProperties = Collections.emptyMap(); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(query); - statement = PreparedStatement.of("query", mock(Query.class)); - when(mockStatementParser.parseSingleStatement(queryString)) - .thenReturn(statement); + final Map requestStreamsProperties = Collections.emptyMap(); final TransientQueryMetadata transientQueryMetadata = new TransientQueryMetadata( @@ -379,9 +380,9 @@ public void shouldStreamRowsCorrectly() throws Throwable { Collections.emptyMap(), queryCloseCallback); - when(mockKsqlEngine.execute(serviceContext, - ConfiguredStatement.of(statement, requestStreamsProperties, VALID_CONFIG))) - .thenReturn(ExecuteResult.of(transientQueryMetadata)); + when(mockKsqlEngine.executeQuery(serviceContext, + ConfiguredStatement.of(query, requestStreamsProperties, VALID_CONFIG))) + .thenReturn(transientQueryMetadata); final Response response = testResource.streamQuery( @@ -535,10 +536,8 @@ public void shouldUpdateTheLastRequestTime() { @Test public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() { // Given: - statement = PreparedStatement.of("query", mock(Query.class)); - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) - .thenReturn(statement); - + when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) + .thenReturn(query); doThrow( new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))) .when(authorizationValidator).checkAuthorization(any(), any(), any()); @@ -561,9 +560,8 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() @Test public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { // Given: - statement = PreparedStatement.of("query", mock(Query.class)); - when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) - .thenReturn(statement); + when(mockStatementParser.parseSingleStatement(PUSH_QUERY_STRING)) + .thenReturn(query); doThrow(new KsqlException( "", new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME)))) @@ -589,9 +587,9 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc @Test public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() { // Given: - statement = PreparedStatement.of("print", mock(PrintTopic.class)); - when(mockStatementParser.parseSingleStatement(PRINT_TOPIC)) - .thenReturn(statement); + print = PreparedStatement.of("print", mock(PrintTopic.class)); + when(mockStatementParser.parseSingleStatement(PRINT_TOPIC)) + .thenReturn(print); doThrow( new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(TOPIC_NAME))) @@ -615,9 +613,9 @@ public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { // Given: final PrintTopic cmd = mock(PrintTopic.class); when(cmd.getTopic()).thenReturn("TEST_TOPIC"); - statement = PreparedStatement.of("print", cmd); - when(mockStatementParser.parseSingleStatement(any())) - .thenReturn(statement); + print = PreparedStatement.of("print", cmd); + when(mockStatementParser.parseSingleStatement(any())) + .thenReturn(print); when(mockKafkaTopicClient.isTopicExists(any())).thenReturn(false); when(mockKafkaTopicClient.listTopicNames()).thenReturn(ImmutableSet.of(