diff --git a/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java b/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java index 18901b7e6a03..1ffdabbb5039 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/embedded/KsqlContext.java @@ -46,13 +46,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,23 +123,25 @@ public List sql(final String sql) { return sql(sql, Collections.emptyMap()); } - public List sql(final String sql, final Map overriddenProperties) { + public List sql(final String sql, final Map overriddenProperties) { final List statements = ksqlEngine.parse(sql); final KsqlExecutionContext sandbox = ksqlEngine.createSandbox(ksqlEngine.getServiceContext()); + final Map validationOverrides = new HashMap<>(overriddenProperties); for (ParsedStatement stmt : statements) { execute( sandbox, stmt, ksqlConfig, - overriddenProperties, + validationOverrides, injectorFactory.apply(sandbox, sandbox.getServiceContext())); } final List queries = new ArrayList<>(); final Injector injector = injectorFactory.apply(ksqlEngine, serviceContext); + final Map executionOverrides = new HashMap<>(overriddenProperties); for (final ParsedStatement parsed : statements) { - execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties, injector) + execute(ksqlEngine, parsed, ksqlConfig, executionOverrides, injector) .getQuery() .ifPresent(queries::add); } @@ -181,32 +183,42 @@ private static ExecuteResult execute( final KsqlExecutionContext executionContext, final ParsedStatement stmt, final KsqlConfig ksqlConfig, - final Map overriddenProperties, - final Injector injector) { + final Map mutableSessionPropertyOverrides, + final Injector injector + ) { final PreparedStatement prepared = executionContext.prepare(stmt); - final ConfiguredStatement configured = - injector.inject(ConfiguredStatement.of(prepared, overriddenProperties, ksqlConfig)); + + final ConfiguredStatement configured = injector.inject(ConfiguredStatement.of( + prepared, + mutableSessionPropertyOverrides, + ksqlConfig + )); final CustomExecutor executor = CustomExecutors.EXECUTOR_MAP.getOrDefault( configured.getStatement().getClass(), - executionContext::execute); + (s, props) -> executionContext.execute(s)); - return executor.apply(configured); + return executor.apply(configured, mutableSessionPropertyOverrides); } @FunctionalInterface - private interface CustomExecutor extends Function, ExecuteResult> { } + private interface CustomExecutor { + ExecuteResult apply( + ConfiguredStatement statement, + Map mutableSessionPropertyOverrides + ); + } @SuppressWarnings("unchecked") private enum CustomExecutors { - SET_PROPERTY(SetProperty.class, stmt -> { - PropertyOverrider.set((ConfiguredStatement) stmt); + SET_PROPERTY(SetProperty.class, (stmt, props) -> { + PropertyOverrider.set((ConfiguredStatement) stmt, props); return ExecuteResult.of("Successfully executed " + stmt.getStatement()); }), - UNSET_PROPERTY(UnsetProperty.class, stmt -> { - PropertyOverrider.unset((ConfiguredStatement) stmt); + UNSET_PROPERTY(UnsetProperty.class, (stmt, props) -> { + PropertyOverrider.unset((ConfiguredStatement) stmt, props); return ExecuteResult.of("Successfully executed " + stmt.getStatement()); }) ; @@ -238,9 +250,11 @@ private CustomExecutor getExecutor() { return this::execute; } - public ExecuteResult execute(final ConfiguredStatement statement) { - return executor.apply(statement); + public ExecuteResult execute( + final ConfiguredStatement statement, + final Map mutableSessionPropertyOverrides + ) { + return executor.apply(statement, mutableSessionPropertyOverrides); } - } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index fc1a3dab62bb..ae293a1b1746 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -136,6 +136,7 @@ private InsertValuesExecutor( public void execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/properties/PropertyOverrider.java b/ksql-engine/src/main/java/io/confluent/ksql/properties/PropertyOverrider.java index b7de54735e94..cc8ac3789cc9 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/properties/PropertyOverrider.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/properties/PropertyOverrider.java @@ -21,22 +21,29 @@ import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlStatementException; +import java.util.Map; public final class PropertyOverrider { private PropertyOverrider() { } - public static void set(final ConfiguredStatement statement) { + public static void set( + final ConfiguredStatement statement, + final Map mutableProperties + ) { final SetProperty setProperty = statement.getStatement(); throwIfInvalidProperty(setProperty.getPropertyName(), statement.getStatementText()); throwIfInvalidPropertyValues(setProperty, statement); - statement.getOverrides().put(setProperty.getPropertyName(), setProperty.getPropertyValue()); + mutableProperties.put(setProperty.getPropertyName(), setProperty.getPropertyValue()); } - public static void unset(final ConfiguredStatement statement) { + public static void unset( + final ConfiguredStatement statement, + final Map mutableProperties + ) { final UnsetProperty unsetProperty = statement.getStatement(); throwIfInvalidProperty(unsetProperty.getPropertyName(), statement.getStatementText()); - statement.getOverrides().remove(unsetProperty.getPropertyName()); + mutableProperties.remove(unsetProperty.getPropertyName()); } @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_INFERRED") // clone has side-effects diff --git a/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java b/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java index 0716fa6b7f45..2edf280d668f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/statement/ConfiguredStatement.java @@ -15,6 +15,10 @@ package io.confluent.ksql.statement; +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.util.KsqlConfig; @@ -25,6 +29,7 @@ * A prepared statement paired with the configurations needed to fully * execute it. */ +@Immutable public final class ConfiguredStatement { private final PreparedStatement statement; @@ -33,7 +38,7 @@ public final class ConfiguredStatement { public static ConfiguredStatement of( final PreparedStatement statement, - final Map overrides, + final Map overrides, final KsqlConfig config ) { return new ConfiguredStatement<>(statement, overrides, config); @@ -41,12 +46,12 @@ public static ConfiguredStatement of( private ConfiguredStatement( final PreparedStatement statement, - final Map overrides, + final Map overrides, final KsqlConfig config ) { - this.statement = Objects.requireNonNull(statement, "statement"); - this.overrides = Objects.requireNonNull(overrides, "overrides"); - this.config = Objects.requireNonNull(config, "config"); + this.statement = requireNonNull(statement, "statement"); + this.overrides = ImmutableMap.copyOf(requireNonNull(overrides, "overrides")); + this.config = requireNonNull(config, "config"); } @SuppressWarnings("unchecked") diff --git a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java index 11d063ddb0b7..38d2345acc46 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/embedded/KsqlContextTest.java @@ -15,9 +15,6 @@ package io.confluent.ksql.embedded; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.not; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; @@ -48,7 +45,6 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -364,43 +360,81 @@ public void shouldInferTopicAfterInferringSchema() { ksqlContext.sql("Some SQL", SOME_PROPERTIES); // Then: - verify(ksqlEngine).execute(eq(STMT_1_WITH_TOPIC)); + verify(ksqlEngine).execute(STMT_1_WITH_TOPIC); } @SuppressWarnings("unchecked") @Test public void shouldSetProperty() { // Given: - final Map properties = new HashMap<>(); + when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0)); + + final PreparedStatement set = PreparedStatement.of( + "SET SOMETHING", + new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + ); + when(ksqlEngine.prepare(any())) - .thenReturn( - (PreparedStatement) PreparedStatement.of( - "SET SOMETHING", - new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))); + .thenReturn((PreparedStatement) set) + .thenReturn(PREPARED_STMT_0); // When: - ksqlContext.sql("SQL;", properties); + ksqlContext.sql("SQL;", ImmutableMap.of()); + + // Then: + verify(ksqlEngine).execute(ConfiguredStatement.of( + PREPARED_STMT_0, ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), + SOME_CONFIG + )); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() { + // Given: + when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0)); + + final PreparedStatement set = PreparedStatement.of( + "SET SOMETHING", + new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + ); + + when(ksqlEngine.prepare(any())) + .thenReturn((PreparedStatement) PREPARED_STMT_0) + .thenReturn(set); + + // When: + ksqlContext.sql("SQL;", ImmutableMap.of()); // Then: - assertThat(properties, hasEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")); + verify(ksqlEngine).execute(ConfiguredStatement.of( + PREPARED_STMT_0, ImmutableMap.of(), SOME_CONFIG + )); } @SuppressWarnings("unchecked") @Test public void shouldUnsetProperty() { // Given: - final Map properties = new HashMap<>(); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0)); + + final Map properties = ImmutableMap + .of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final PreparedStatement unset = PreparedStatement.of( + "UNSET SOMETHING", + new UnsetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + when(ksqlEngine.prepare(any())) - .thenReturn( - (PreparedStatement) PreparedStatement.of( - "UNSET SOMETHING", - new UnsetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); + .thenReturn((PreparedStatement) unset) + .thenReturn(PREPARED_STMT_0); // When: ksqlContext.sql("SQL;", properties); // Then: - assertThat(properties, not(hasEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))); + verify(ksqlEngine).execute(ConfiguredStatement.of( + PREPARED_STMT_0, ImmutableMap.of(), SOME_CONFIG + )); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java index ed27f10c64b2..21fbfedfa796 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java @@ -189,7 +189,7 @@ public void shouldHandleFullRow() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -208,7 +208,7 @@ public void shouldInsertWrappedSingleField() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("new")); @@ -228,7 +228,7 @@ public void shouldInsertUnwrappedSingleField() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("new")); @@ -249,7 +249,7 @@ public void shouldFillInRowtime() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -270,7 +270,7 @@ public void shouldHandleRowTimeWithoutRowKey() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -290,7 +290,7 @@ public void shouldFillInRowKeyFromSpecifiedKey() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -311,7 +311,7 @@ public void shouldFillInFullRowWithNoSchema() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -330,7 +330,7 @@ public void shouldFillInMissingColumnsWithNulls() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -350,7 +350,7 @@ public void shouldFillInKeyFromRowKey() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -370,7 +370,7 @@ public void shouldHandleOutOfOrderSchema() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -389,7 +389,7 @@ public void shouldHandleAllSortsOfLiterals() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -416,7 +416,7 @@ public void shouldHandleNullKey() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -442,7 +442,7 @@ public void shouldAllowUpcast() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str")); @@ -472,7 +472,7 @@ public void shouldThrowOnProducerSendError() throws ExecutionException, Interrup expectedException.expectMessage("Failed to insert values into "); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); } @Test @@ -493,7 +493,7 @@ public void shouldThrowOnSerializingKeyError() { expectedException.expectCause(hasMessage(containsString("Could not serialize key"))); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); } @Test @@ -515,7 +515,7 @@ public void shouldThrowOnSerializingValueError() { expectedException.expectCause(hasMessage(containsString("Could not serialize row"))); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); } @Test @@ -539,7 +539,7 @@ public void shouldThrowOnTopicAuthorizationException() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); } @Test @@ -557,7 +557,7 @@ public void shouldThrowIfRowKeyAndKeyDoNotMatch() { expectedException.expectCause(hasMessage(containsString("Expected ROWKEY and COL0 to match"))); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); } @Test @@ -574,7 +574,7 @@ public void shouldThrowIfNotEnoughValuesSuppliedWithNoSchema() { expectedException.expectCause(hasMessage(containsString("Expected a value for each column"))); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); } @Test @@ -594,7 +594,7 @@ public void shouldFailOnDowncast() { expectedException.expectCause(hasMessage(containsString("Expected type INTEGER for field"))); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); } @Test @@ -611,7 +611,7 @@ public void shouldHandleSourcesWithNoKeyField() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct("key")); @@ -632,7 +632,7 @@ public void shouldHandleSourcesWithNoKeyFieldAndNoRowKeyProvided() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerializer).serialize(TOPIC_NAME, keyStruct(null)); @@ -652,7 +652,7 @@ public void shouldBuildCorrectSerde() { ); // When: - executor.execute(statement, engine, serviceContext); + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: verify(keySerdeFactory).create( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/statement/ConfiguredStatementTest.java b/ksql-engine/src/test/java/io/confluent/ksql/statement/ConfiguredStatementTest.java new file mode 100644 index 000000000000..09f56947de60 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/statement/ConfiguredStatementTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.statement; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.util.KsqlConfig; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ConfiguredStatementTest { + + private static final KsqlConfig CONFIG = new KsqlConfig(ImmutableMap.of()); + @Mock + private PreparedStatement prepared; + + @Test + public void shouldTakeDefensiveCopyOfProperties() { + // Given: + final Map props = new HashMap<>(); + props.put("this", "that"); + + final ConfiguredStatement statement = ConfiguredStatement + .of(prepared, props, CONFIG); + + // When: + props.put("other", "thing"); + + // Then: + assertThat(statement.getOverrides(), is(ImmutableMap.of("this", "that"))); + } + + @Test + public void shouldReturnImmutableProperties() { + // Given: + final ConfiguredStatement statement = ConfiguredStatement + .of(prepared, new HashMap<>(), CONFIG); + + // Then: + assertThat(statement.getOverrides(), is(instanceOf(ImmutableMap.class))); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index 21bbfe2d9e3b..71d4b69be611 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -187,8 +187,8 @@ public void shouldUseNameFromCreate() { @Test public void shouldGenerateNameWithCorrectPrefixFromOverrides() { // Given: - givenStatement("CREATE STREAM x AS SELECT * FROM SOURCE;"); overrides.put(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG, "prefix-"); + givenStatement("CREATE STREAM x AS SELECT * FROM SOURCE;"); config = new KsqlConfig(ImmutableMap.of( KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG, "nope" )); diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java index b49d29213ecc..5abe49423541 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java @@ -295,6 +295,7 @@ private static ExecuteResultAndSortedSources execute( if (prepared.getStatement() instanceof InsertValues) { StubInsertValuesExecutor.of(stubKafkaService).execute( (ConfiguredStatement) configured, + overriddenProperties, executionContext, executionContext.getServiceContext() ); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java index 691bcc03ddd1..561ada6f5b2e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutor.java @@ -15,6 +15,8 @@ package io.confluent.ksql.rest.server; +import static java.util.Objects.requireNonNull; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.confluent.ksql.KsqlExecutionContext; @@ -57,7 +59,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.function.BiConsumer; @@ -77,7 +78,6 @@ public class StandaloneExecutor implements Executable { private final String queriesFile; private final UdfLoader udfLoader; private final CountDownLatch shutdownLatch = new CountDownLatch(1); - private final Map configProperties = new HashMap<>(); private final boolean failOnNoQueries; private final VersionCheckerAgent versionChecker; private final BiFunction injectorFactory; @@ -93,15 +93,15 @@ public class StandaloneExecutor implements Executable { final VersionCheckerAgent versionChecker, final BiFunction injectorFactory ) { - this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); - this.processingLogConfig = Objects.requireNonNull(processingLogConfig, "processingLogConfig"); - this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); - this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); - this.queriesFile = Objects.requireNonNull(queriesFile, "queriesFile"); - this.udfLoader = Objects.requireNonNull(udfLoader, "udfLoader"); + this.serviceContext = requireNonNull(serviceContext, "serviceContext"); + this.processingLogConfig = requireNonNull(processingLogConfig, "processingLogConfig"); + this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); + this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine"); + this.queriesFile = requireNonNull(queriesFile, "queriesFile"); + this.udfLoader = requireNonNull(udfLoader, "udfLoader"); this.failOnNoQueries = failOnNoQueries; - this.versionChecker = Objects.requireNonNull(versionChecker, "versionChecker"); - this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory"); + this.versionChecker = requireNonNull(versionChecker, "versionChecker"); + this.injectorFactory = requireNonNull(injectorFactory, "injectorFactory"); } public void start() { @@ -169,7 +169,7 @@ private void processesQueryFile(final String queries) { final Injector injector = injectorFactory.apply(ksqlEngine, serviceContext); executeStatements( preparedStatements, - new StatementExecutor(ksqlEngine, injector, configProperties, ksqlConfig) + new StatementExecutor(ksqlEngine, injector, ksqlConfig) ); ksqlEngine.getPersistentQueries().forEach(QueryMetadata::start); @@ -183,7 +183,6 @@ private void validateStatements(final List statements) { final StatementExecutor sandboxExecutor = new StatementExecutor( sandboxEngine, injector, - new HashMap<>(configProperties), ksqlConfig ); @@ -263,20 +262,18 @@ private static final class StatementExecutor { private static final String SUPPORTED_STATEMENTS = generateSupportedMessage(); private final KsqlExecutionContext executionContext; - private final Map configProperties; + private final Map configOverrides = new HashMap<>(); private final KsqlConfig ksqlConfig; private final Injector injector; private StatementExecutor( final KsqlExecutionContext executionContext, final Injector injector, - final Map configProperties, final KsqlConfig ksqlConfig ) { - this.executionContext = Objects.requireNonNull(executionContext, "executionContext"); - this.configProperties = Objects.requireNonNull(configProperties, "configProperties"); - this.injector = Objects.requireNonNull(injector, "injector"); - this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); + this.executionContext = requireNonNull(executionContext, "executionContext"); + this.injector = requireNonNull(injector, "injector"); + this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); } /** @@ -306,7 +303,7 @@ private ConfiguredStatement prepare( ) { final PreparedStatement prepared = executionContext.prepare(statement); final ConfiguredStatement configured = ConfiguredStatement.of( - prepared, configProperties, ksqlConfig); + prepared, configOverrides, ksqlConfig); return injector.inject(configured); } @@ -328,11 +325,11 @@ private static void throwOnMissingSchema(final ConfiguredStatement statement) } private void handleSetProperty(final ConfiguredStatement statement) { - PropertyOverrider.set(statement); + PropertyOverrider.set(statement, configOverrides); } private void handleUnsetProperty(final ConfiguredStatement statement) { - PropertyOverrider.unset(statement); + PropertyOverrider.unset(statement, configOverrides); } private void handleExecutableDdl(final ConfiguredStatement statement) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index ea0eb062269f..61ba067b87c3 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -27,6 +27,7 @@ import io.confluent.ksql.statement.Injector; import io.confluent.ksql.util.KsqlServerException; import java.time.Duration; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; @@ -61,9 +62,9 @@ public DistributingExecutor( @Override public Optional execute( final ConfiguredStatement statement, + final Map mutableScopedProperties, final KsqlExecutionContext executionContext, - final ServiceContext serviceContext - ) { + final ServiceContext serviceContext) { final ConfiguredStatement injected = injectorFactory .apply(executionContext, serviceContext) .inject(statement); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java index 63206d6755bd..6c3806edb6fd 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ConnectExecutor.java @@ -26,6 +26,7 @@ import io.confluent.ksql.services.ConnectClient.ConnectResponse; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.Map; import java.util.Optional; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -35,6 +36,7 @@ private ConnectExecutor() { } public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index d4688895bf41..f68b6f0ad7fb 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -107,16 +107,18 @@ private StatementExecutor getExecutor() { public Optional execute( final ConfiguredStatement statement, + final Map mutableScopedProperties, final KsqlExecutionContext executionCtx, - final ServiceContext serviceCtx) { - return executor.execute(statement, executionCtx, serviceCtx); + final ServiceContext serviceCtx + ) { + return executor.execute(statement, mutableScopedProperties, executionCtx, serviceCtx); } private static StatementExecutor insertValuesExecutor() { final InsertValuesExecutor executor = new InsertValuesExecutor(); - return (statement, executionContext, serviceContext) -> { - executor.execute(statement, executionContext, serviceContext); + return (statement, sessionProperties, executionContext, serviceContext) -> { + executor.execute(statement, sessionProperties, executionContext, serviceContext); return Optional.empty(); }; } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java index a49ab1c186c8..aa412a3b0852 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java @@ -31,6 +31,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -60,6 +61,7 @@ public DescribeConnectorExecutor() { @SuppressWarnings("OptionalGetWithoutIsPresent") public Optional execute( final ConfiguredStatement configuredStatement, + final Map sessionProperties, final KsqlExecutionContext ksqlExecutionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutor.java index 62571b3006ef..60e78379e2a1 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutor.java @@ -31,6 +31,7 @@ import io.confluent.ksql.util.IdentifierUtil; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.kafka.connect.data.Schema; @@ -43,6 +44,7 @@ private DescribeFunctionExecutor() { } public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java index 26cbdbc7ce86..8b13ab156b51 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutor.java @@ -23,6 +23,7 @@ import io.confluent.ksql.services.ConnectClient.ConnectResponse; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.Map; import java.util.Optional; public final class DropConnectorExecutor { @@ -31,6 +32,7 @@ private DropConnectorExecutor() { } public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java index 8308fb8add6a..be464e06d1e6 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ExplainExecutor.java @@ -32,6 +32,7 @@ import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import java.util.Map; import java.util.Optional; /** @@ -44,6 +45,7 @@ private ExplainExecutor() { } public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java index c7a693684451..6eb2ca7cbe8d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutor.java @@ -29,6 +29,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -41,6 +42,7 @@ private ListConnectorsExecutor() { } @SuppressWarnings("OptionalGetWithoutIsPresent") public static Optional execute( final ConfiguredStatement configuredStatement, + final Map sessionProperties, final KsqlExecutionContext ksqlExecutionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutor.java index 1566c4dce7c8..a1e9bfbb22e6 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutor.java @@ -25,6 +25,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ private ListFunctionsExecutor() { } public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java index 85fda5588903..258ad638a2b4 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutor.java @@ -35,6 +35,7 @@ private ListPropertiesExecutor() { } public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java index c97c43a05454..90f1265d0a81 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java @@ -24,6 +24,7 @@ import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -33,6 +34,7 @@ private ListQueriesExecutor() { } public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java index 9d66179b59d2..6b2b03b6bb99 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java @@ -43,6 +43,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -81,6 +82,7 @@ private static Optional sourceDescriptionList( public static Optional streams( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { @@ -105,6 +107,7 @@ public static Optional streams( public static Optional tables( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { @@ -128,6 +131,7 @@ public static Optional tables( public static Optional columns( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java index c6e52f4fb48e..c7d34ed66cdc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java @@ -53,6 +53,7 @@ private ListTopicsExecutor() { public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTypesExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTypesExecutor.java index 51d76eea7d9c..fce0dc10d017 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTypesExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTypesExecutor.java @@ -26,6 +26,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.Iterator; +import java.util.Map; import java.util.Optional; public final class ListTypesExecutor { @@ -34,6 +35,7 @@ private ListTypesExecutor() { } public static Optional execute( final ConfiguredStatement configuredStatement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PropertyExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PropertyExecutor.java index 8862c14fcd8b..c7a1e69b6381 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PropertyExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PropertyExecutor.java @@ -22,6 +22,7 @@ import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.Map; import java.util.Optional; public final class PropertyExecutor { @@ -30,19 +31,21 @@ private PropertyExecutor() { } public static Optional set( final ConfiguredStatement statement, + final Map mutableScopedProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { - PropertyOverrider.set(statement); + PropertyOverrider.set(statement, mutableScopedProperties); return Optional.empty(); } public static Optional unset( final ConfiguredStatement statement, + final Map mutableScopedProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { - PropertyOverrider.unset(statement); + PropertyOverrider.unset(statement, mutableScopedProperties); return Optional.empty(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java index f76805c385d9..008a8c73f403 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java @@ -90,7 +90,8 @@ public KsqlEntityList execute( } else { final ConfiguredStatement configured = ConfiguredStatement.of( prepared, scopedPropertyOverrides, ksqlConfig); - executeStatement(serviceContext, configured, entities).ifPresent(entities::add); + executeStatement(serviceContext, configured, scopedPropertyOverrides, entities) + .ifPresent(entities::add); } } return entities; @@ -100,6 +101,7 @@ public KsqlEntityList execute( private Optional executeStatement( final ServiceContext serviceContext, final ConfiguredStatement configured, + final Map mutableScopedProperties, final KsqlEntityList entities ) { final Class statementClass = configured.getStatement().getClass(); @@ -110,6 +112,7 @@ private Optional executeStatement( return executor.execute( configured, + mutableScopedProperties, ksqlEngine, serviceContext ); @@ -129,5 +132,4 @@ private KsqlEntityList executeRunScript( return execute(serviceContext, ksqlEngine.parse(sql), propertyOverrides); } - } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StatementExecutor.java index 6bdb0aaf53d3..2ac2b562e835 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StatementExecutor.java @@ -20,6 +20,7 @@ import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.Map; import java.util.Optional; /** @@ -31,11 +32,16 @@ public interface StatementExecutor { /** * Executes the query against the parameterized {@code ksqlEngine}. * + * @param statement the statement to execute + * @param mutableScopedProperties the session properties + * @param executionContext the context in which to execute it + * @param serviceContext the services to use to execute it * @return the execution result, if present, else {@link Optional#empty()} */ Optional execute( ConfiguredStatement statement, + Map mutableScopedProperties, KsqlExecutionContext executionContext, - ServiceContext serviceContext); - + ServiceContext serviceContext + ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java index 016621955faa..3740f489e2f2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java @@ -113,6 +113,7 @@ private StaticQueryExecutor() { public static void validate( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { @@ -139,6 +140,7 @@ public static void validate( public static Optional execute( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java index 787cb164db45..684fba62616e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java @@ -116,8 +116,9 @@ private StatementValidator getValidator() { public void validate( final ConfiguredStatement statement, + final Map mutableScopedProperties, final KsqlExecutionContext executionContext, final ServiceContext serviceContext) throws KsqlException { - validator.validate(statement, executionContext, serviceContext); + validator.validate(statement, mutableScopedProperties, executionContext, serviceContext); } } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/PrintTopicValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/PrintTopicValidator.java index 85e248600cf5..b73f1840047f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/PrintTopicValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/PrintTopicValidator.java @@ -20,6 +20,7 @@ import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import java.util.Map; public final class PrintTopicValidator { @@ -27,6 +28,7 @@ private PrintTopicValidator() { } public static void validate( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext context, final ServiceContext serviceContext) { throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText())); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java index e61d8a679d33..c07602fa6b39 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java @@ -34,6 +34,7 @@ import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -97,6 +98,7 @@ public int validate( ) { requireSandbox(serviceContext); + final Map scopedPropertyOverrides = new HashMap<>(propertyOverrides); final KsqlExecutionContext ctx = requireSandbox(snapshotSupplier.apply(serviceContext)); final Injector injector = injectorFactory.apply(ctx, serviceContext); @@ -104,11 +106,11 @@ public int validate( for (ParsedStatement parsed : statements) { final PreparedStatement prepared = ctx.prepare(parsed); final ConfiguredStatement configured = ConfiguredStatement.of( - prepared, propertyOverrides, ksqlConfig); + prepared, scopedPropertyOverrides, ksqlConfig); numPersistentQueries += (prepared.getStatement() instanceof RunScript) - ? validateRunScript(serviceContext, configured, ctx) - : validate(serviceContext, configured, ctx, injector); + ? validateRunScript(serviceContext, configured, scopedPropertyOverrides, ctx) + : validate(serviceContext, configured, scopedPropertyOverrides, ctx, injector); } if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ctx, ksqlConfig, numPersistentQueries)) { @@ -127,6 +129,7 @@ public int validate( private int validate( final ServiceContext serviceContext, final ConfiguredStatement configured, + final Map mutableScopedProperties, final KsqlExecutionContext executionContext, final Injector injector ) throws KsqlStatementException { @@ -136,7 +139,8 @@ private int validate( customValidators.get(statementClass); if (customValidator != null) { - customValidator.validate(configured, executionContext, serviceContext); + customValidator + .validate(configured, mutableScopedProperties, executionContext, serviceContext); } else if (KsqlEngine.isExecutableStatement(configured.getStatement())) { final ConfiguredStatement statementInjected = injector.inject(configured); executionContext.execute(serviceContext, statementInjected); @@ -153,7 +157,9 @@ private int validate( private int validateRunScript( final ServiceContext serviceContext, final ConfiguredStatement statement, - final KsqlExecutionContext executionContext) { + final Map mutableScopedProperties, + final KsqlExecutionContext executionContext + ) { final String sql = (String) statement.getOverrides() .get(KsqlConstants.LEGACY_RUN_SCRIPT_STATEMENTS_CONTENT); @@ -166,7 +172,6 @@ private int validateRunScript( + "Note: RUN SCRIPT is deprecated and will be removed in the next major version. " + "statement: " + statement.getStatementText()); - return validate(serviceContext, executionContext.parse(sql), statement.getOverrides(), sql); + return validate(serviceContext, executionContext.parse(sql), mutableScopedProperties, sql); } - } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/StatementValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/StatementValidator.java index 8cccc8bdb5d2..3dbb8e497c60 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/StatementValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/StatementValidator.java @@ -20,6 +20,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; +import java.util.Map; /** * An interface that allows for arbitrary validation code of a prepared statement @@ -31,7 +32,7 @@ public interface StatementValidator { /** * A statement validator that does nothing. */ - StatementValidator NO_VALIDATION = (stmt, ectx, sctx) -> { }; + StatementValidator NO_VALIDATION = (stmt, props, ectx, sctx) -> { }; /** * Validates the statement against the given parameters, and throws an exception @@ -42,6 +43,7 @@ public interface StatementValidator { */ void validate( ConfiguredStatement statement, + Map mutableScopedProperties, KsqlExecutionContext executionContext, ServiceContext serviceContext) throws KsqlException; } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java index 29fe04355dad..e1f7c7ca11fc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java @@ -21,6 +21,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlStatementException; +import java.util.Map; public final class TerminateQueryValidator { @@ -28,6 +29,7 @@ private TerminateQueryValidator() { } public static void validate( final ConfiguredStatement statement, + final Map sessionProperties, final KsqlExecutionContext context, final ServiceContext serviceContext ) { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index 5ba0eca27e38..a6891e00fa3b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -485,6 +485,28 @@ public void shouldRunSetStatements() { ksqlConfig)); } + @Test + public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() { + // Given: + final PreparedStatement setProp = PreparedStatement.of("SET PROP", + new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")); + + final PreparedStatement cs = PreparedStatement.of("CS", + new CreateStream(SOME_NAME, SOME_ELEMENTS, false, JSON_PROPS)); + + givenQueryFileParsesTo(cs, setProp); + + // When: + standaloneExecutor.start(); + + // Then: + verify(ksqlEngine).execute(ConfiguredStatement.of( + cs, + ImmutableMap.of(), + ksqlConfig + )); + } + @Test public void shouldRunUnSetStatements() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 6d08ae2fc180..aa753cd7a0b5 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -111,7 +111,7 @@ public void setUp() throws InterruptedException { @Test public void shouldEnqueueSuccessfulCommand() throws InterruptedException { // When: - distributor.execute(EMPTY_STATEMENT, executionContext, serviceContext); + distributor.execute(EMPTY_STATEMENT, ImmutableMap.of(), executionContext, serviceContext); // Then: verify(queue, times(1)).enqueueCommand(eq(EMPTY_STATEMENT)); @@ -120,7 +120,7 @@ public void shouldEnqueueSuccessfulCommand() throws InterruptedException { @Test public void shouldInferSchemas() { // When: - distributor.execute(EMPTY_STATEMENT, executionContext, serviceContext); + distributor.execute(EMPTY_STATEMENT, ImmutableMap.of(), executionContext, serviceContext); // Then: verify(schemaInjector, times(1)).inject(eq(EMPTY_STATEMENT)); @@ -132,6 +132,7 @@ public void shouldReturnCommandStatus() { final CommandStatusEntity commandStatusEntity = (CommandStatusEntity) distributor.execute( EMPTY_STATEMENT, + ImmutableMap.of(), executionContext, serviceContext ) @@ -165,7 +166,7 @@ public void shouldThrowExceptionOnFailureToEnqueue() { expectedException.expectCause(is(cause)); // When: - distributor.execute(configured, executionContext, serviceContext); + distributor.execute(configured, ImmutableMap.of(), executionContext, serviceContext); } @Test @@ -182,7 +183,7 @@ public void shouldThrowFailureIfCannotInferSchema() { expectedException.expectMessage("Could not infer!"); // When: - distributor.execute(configured, executionContext, serviceContext); + distributor.execute(configured, ImmutableMap.of(), executionContext, serviceContext); } @Test @@ -200,7 +201,7 @@ public void shouldThrowExceptionIfUserServiceContextIsDeniedAuthorization() { expectedException.expect(KsqlTopicAuthorizationException.class); // When: - distributor.execute(configured, executionContext, userServiceContext); + distributor.execute(configured, ImmutableMap.of(), executionContext, userServiceContext); } @Test @@ -219,6 +220,6 @@ public void shouldThrowServerExceptionIfServerServiceContextIsDeniedAuthorizatio expectedException.expectCause(is(instanceOf(KsqlTopicAuthorizationException.class))); // When: - distributor.execute(configured, executionContext, userServiceContext); + distributor.execute(configured, ImmutableMap.of(), executionContext, userServiceContext); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java index f08f6043c542..eaf55364c347 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java @@ -24,10 +24,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.CreateConnector.Type; -import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.rest.entity.CreateConnectorEntity; import io.confluent.ksql.rest.entity.ErrorEntity; import io.confluent.ksql.rest.entity.KsqlEntity; @@ -78,7 +78,7 @@ public void shouldPassInCorrectArgsToConnectClient() { givenSuccess(); // When: - ConnectExecutor.execute(CREATE_CONNECTOR_CONFIGURED, null, serviceContext); + ConnectExecutor.execute(CREATE_CONNECTOR_CONFIGURED, ImmutableMap.of(), null, serviceContext); // Then: verify(connectClient).create("foo", ImmutableMap.of("foo", "bar")); @@ -91,7 +91,7 @@ public void shouldReturnConnectorInfoEntityOnSuccess() { // When: final Optional entity = ConnectExecutor - .execute(CREATE_CONNECTOR_CONFIGURED, null, serviceContext); + .execute(CREATE_CONNECTOR_CONFIGURED, ImmutableMap.of(), null, serviceContext); // Then: assertThat("Expected non-empty response", entity.isPresent()); @@ -105,7 +105,7 @@ public void shouldReturnErrorEntityOnError() { // When: final Optional entity = ConnectExecutor - .execute(CREATE_CONNECTOR_CONFIGURED, null, serviceContext); + .execute(CREATE_CONNECTOR_CONFIGURED, ImmutableMap.of(), null, serviceContext); // Then: assertThat("Expected non-empty response", entity.isPresent()); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java index 4abf0edea8c6..de5ba8fff6c6 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java @@ -160,7 +160,8 @@ public void setUp() { @Test public void shouldDescribeKnownConnector() { // When: - final Optional entity = executor.execute(describeStatement, engine, serviceContext); + final Optional entity = executor + .execute(describeStatement, ImmutableMap.of(), engine, serviceContext); // Then: assertThat("Expected a response", entity.isPresent()); @@ -184,7 +185,8 @@ public void shouldDescribeKnownConnectorIfTopicListFails() { when(topics.names()).thenReturn(fut); // When: - final Optional entity = executor.execute(describeStatement, engine, serviceContext); + final Optional entity = executor + .execute(describeStatement, ImmutableMap.of(), engine, serviceContext); // Then: assertThat("Expected a response", entity.isPresent()); @@ -202,7 +204,8 @@ public void shouldErrorIfConnectClientFailsStatus() { when(connectClient.describe(any())).thenReturn(ConnectResponse.failure("error", HttpStatus.SC_INTERNAL_SERVER_ERROR)); // When: - final Optional entity = executor.execute(describeStatement, engine, serviceContext); + final Optional entity = executor + .execute(describeStatement, ImmutableMap.of(), engine, serviceContext); // Then: verify(connectClient).status("connector"); @@ -217,7 +220,8 @@ public void shouldErrorIfConnectClientFailsDescribe() { when(connectClient.describe(any())).thenReturn(ConnectResponse.failure("error", HttpStatus.SC_INTERNAL_SERVER_ERROR)); // When: - final Optional entity = executor.execute(describeStatement, engine, serviceContext); + final Optional entity = executor + .execute(describeStatement, ImmutableMap.of(), engine, serviceContext); // Then: verify(connectClient).status("connector"); @@ -234,7 +238,8 @@ public void shouldWorkIfUnknownConnector() { executor = new DescribeConnectorExecutor(connectorFactory); // When: - final Optional entity = executor.execute(describeStatement, engine, serviceContext); + final Optional entity = executor + .execute(describeStatement, ImmutableMap.of(), engine, serviceContext); // Then: assertThat("Expected a response", entity.isPresent()); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutorTest.java index 00435dd86d41..fa949f7383f3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeFunctionExecutorTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.rest.entity.FunctionDescriptionList; import io.confluent.ksql.rest.entity.FunctionType; import io.confluent.ksql.rest.server.TemporaryEngine; @@ -38,6 +39,7 @@ public void shouldDescribeUDF() { final FunctionDescriptionList functionList = (FunctionDescriptionList) CustomExecutors.DESCRIBE_FUNCTION.execute( engine.configure("DESCRIBE FUNCTION CONCAT;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -63,6 +65,7 @@ public void shouldDescribeUDAF() { final FunctionDescriptionList functionList = (FunctionDescriptionList) CustomExecutors.DESCRIBE_FUNCTION.execute( engine.configure("DESCRIBE FUNCTION MAX;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java index 05dfa3546831..0a4290a95673 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DropConnectorExecutorTest.java @@ -16,7 +16,8 @@ package io.confluent.ksql.rest.server.execution; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -73,7 +74,7 @@ public void shouldPassInCorrectArgsToConnectClient() { .thenReturn(ConnectResponse.success("foo", HttpStatus.SC_OK)); // When: - DropConnectorExecutor.execute(DROP_CONNECTOR_CONFIGURED, null, serviceContext); + DropConnectorExecutor.execute(DROP_CONNECTOR_CONFIGURED, ImmutableMap.of(),null, serviceContext); // Then: verify(connectClient).delete("foo"); @@ -87,7 +88,7 @@ public void shouldReturnOnSuccess() { // When: final Optional response = DropConnectorExecutor - .execute(DROP_CONNECTOR_CONFIGURED, null, serviceContext); + .execute(DROP_CONNECTOR_CONFIGURED, ImmutableMap.of(),null, serviceContext); // Then: assertThat("expected response", response.isPresent()); @@ -102,7 +103,7 @@ public void shouldReturnErrorEntityOnError() { // When: final Optional entity = DropConnectorExecutor - .execute(DROP_CONNECTOR_CONFIGURED, null, serviceContext); + .execute(DROP_CONNECTOR_CONFIGURED, ImmutableMap.of(),null, serviceContext); // Then: assertThat("Expected non-empty response", entity.isPresent()); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java index 095ac6f9f358..dc51836e58f7 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; @@ -58,6 +59,7 @@ public void shouldExplainQueryId() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, + ImmutableMap.of(), engine, this.engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -77,6 +79,7 @@ public void shouldExplainPersistentStatement() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -97,6 +100,7 @@ public void shouldExplainStatement() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -116,6 +120,7 @@ public void shouldExplainStatementWithStructFieldDereference() { // When: final QueryDescriptionEntity query = (QueryDescriptionEntity) CustomExecutors.EXPLAIN.execute( explain, + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -134,6 +139,7 @@ public void shouldFailOnNonQueryExplain() { // When: CustomExecutors.EXPLAIN.execute( engine.configure("Explain SHOW TOPICS;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java index 1dbf004655fa..e411208eefee 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java @@ -87,7 +87,7 @@ public void shouldListValidConnector() { // When: final Optional entity = ListConnectorsExecutor - .execute(statement, engine, serviceContext); + .execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: assertThat("expected response!", entity.isPresent()); @@ -116,7 +116,7 @@ public void shouldFilterNonMatchingConnectors() { // When: final Optional entity = ListConnectorsExecutor - .execute(statement, engine, serviceContext); + .execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: assertThat("expected response!", entity.isPresent()); @@ -142,7 +142,7 @@ public void shouldListInvalidConnectorWithNoInfo() { // When: final Optional entity = ListConnectorsExecutor - .execute(statement, engine, serviceContext); + .execute(statement, ImmutableMap.of(), engine, serviceContext); // Then: assertThat("expected response!", entity.isPresent()); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutorTest.java index 10fe1db2eda1..1a182c0bb552 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListFunctionsExecutorTest.java @@ -40,6 +40,7 @@ public void shouldListFunctions() { // When: final FunctionNameList functionList = (FunctionNameList) CustomExecutors.LIST_FUNCTIONS.execute( engine.configure("LIST FUNCTIONS;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java index 209fc246bf90..164f3d7635b1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListPropertiesExecutorTest.java @@ -44,6 +44,7 @@ public void shouldListProperties() { // When: final PropertiesList properties = (PropertiesList) CustomExecutors.LIST_PROPERTIES.execute( engine.configure("LIST PROPERTIES;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -60,6 +61,7 @@ public void shouldListPropertiesWithOverrides() { final PropertiesList properties = (PropertiesList) CustomExecutors.LIST_PROPERTIES.execute( engine.configure("LIST PROPERTIES;") .withProperties(ImmutableMap.of("ksql.streams.auto.offset.reset", "latest")), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -75,6 +77,7 @@ public void shouldNotListSslProperties() { // When: final PropertiesList properties = (PropertiesList) CustomExecutors.LIST_PROPERTIES.execute( engine.configure("LIST PROPERTIES;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java index 1ead9700b6d0..4203efe4d4e9 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.name.SourceName; @@ -49,6 +50,7 @@ public void shouldListQueriesEmpty() { // When final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( engine.configure("SHOW QUERIES;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -68,6 +70,7 @@ public void shouldListQueriesBasic() { // When final Queries queries = (Queries) CustomExecutors.LIST_QUERIES.execute( showQueries, + ImmutableMap.of(), engine, this.engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -91,6 +94,7 @@ public void shouldListQueriesExtended() { // When final QueryDescriptionList queries = (QueryDescriptionList) CustomExecutors.LIST_QUERIES.execute( showQueries, + ImmutableMap.of(), engine, this.engine.getServiceContext() ).orElseThrow(IllegalStateException::new); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java index 9e793f92e2f5..b239ecd93a31 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java @@ -81,6 +81,7 @@ public void shouldShowStreams() { final StreamsList descriptionList = (StreamsList) CustomExecutors.LIST_STREAMS.execute( engine.configure("SHOW STREAMS;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -111,6 +112,7 @@ public void shouldShowStreamsExtended() { final SourceDescriptionList descriptionList = (SourceDescriptionList) CustomExecutors.LIST_STREAMS.execute( engine.configure("SHOW STREAMS EXTENDED;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -145,6 +147,7 @@ public void shouldShowTables() { final TablesList descriptionList = (TablesList) CustomExecutors.LIST_TABLES.execute( engine.configure("LIST TABLES;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -177,6 +180,7 @@ public void shouldShowTablesExtended() { final SourceDescriptionList descriptionList = (SourceDescriptionList) CustomExecutors.LIST_TABLES.execute( engine.configure("LIST TABLES EXTENDED;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -224,6 +228,7 @@ public void shouldShowColumnsSource() { ImmutableMap.of(), engine.getKsqlConfig() ), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ).orElseThrow(IllegalStateException::new); @@ -251,6 +256,7 @@ public void shouldThrowOnDescribeMissingSource() { // When: CustomExecutors.SHOW_COLUMNS.execute( engine.configure("DESCRIBE S;"), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ); @@ -272,6 +278,7 @@ public void shouldNotCallTopicClientForExtendedDescription() { // When: CustomExecutors.LIST_STREAMS.execute( engine.configure("SHOW STREAMS;"), + ImmutableMap.of(), engine.getEngine(), serviceContext ).orElseThrow(IllegalStateException::new); @@ -328,6 +335,7 @@ public void shouldAddWarningsOnClientExceptionForStreamListing() { // When: final KsqlEntity entity = CustomExecutors.LIST_STREAMS.execute( engine.configure("SHOW STREAMS EXTENDED;"), + ImmutableMap.of(), engine.getEngine(), serviceContext ).orElseThrow(IllegalStateException::new); @@ -347,6 +355,7 @@ public void shouldAddWarningsOnClientExceptionForTopicListing() { // When: final KsqlEntity entity = CustomExecutors.LIST_TABLES.execute( engine.configure("SHOW TABLES EXTENDED;"), + ImmutableMap.of(), engine.getEngine(), serviceContext ).orElseThrow(IllegalStateException::new); @@ -365,6 +374,7 @@ public void shouldAddWarningOnClientExceptionForDescription() { // When: final KsqlEntity entity = CustomExecutors.SHOW_COLUMNS.execute( engine.configure("DESCRIBE EXTENDED STREAM1;"), + ImmutableMap.of(), engine.getEngine(), serviceContext ).orElseThrow(IllegalStateException::new); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java index 1252a8a88346..0f0d2c3b99f8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.rest.entity.KafkaTopicInfo; import io.confluent.ksql.rest.entity.KafkaTopicInfoExtended; import io.confluent.ksql.rest.entity.KafkaTopicsList; @@ -63,6 +64,7 @@ public void shouldListKafkaTopics() { final KafkaTopicsList topicsList = (KafkaTopicsList) CustomExecutors.LIST_TOPICS.execute( engine.configure("LIST TOPICS;"), + ImmutableMap.of(), engine.getEngine(), serviceContext ).orElseThrow(IllegalStateException::new); @@ -100,6 +102,7 @@ public void shouldListKafkaTopicsExtended() { final KafkaTopicsListExtended topicsList = (KafkaTopicsListExtended) CustomExecutors.LIST_TOPICS.execute( engine.configure("LIST TOPICS EXTENDED;"), + ImmutableMap.of(), engine.getEngine(), serviceContext ).orElseThrow(IllegalStateException::new); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTypesExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTypesExecutorTest.java index 42f610868e95..ada5ded0efc3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTypesExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTypesExecutorTest.java @@ -67,7 +67,9 @@ public void shouldListTypes() { ConfiguredStatement.of( PreparedStatement.of("statement", new ListTypes(Optional.empty())), ImmutableMap.of(), - KSQL_CONFIG), + KSQL_CONFIG + ), + ImmutableMap.of(), context, null); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PropertyExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PropertyExecutorTest.java index 068123dc7e69..16a3f66e0e20 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PropertyExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PropertyExecutorTest.java @@ -43,8 +43,8 @@ public void shouldSetProperty() { // When: CustomExecutors.SET_PROPERTY.execute( - engine.configure("SET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "' = 'none';") - .withProperties(properties), + engine.configure("SET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "' = 'none';"), + properties, engine.getEngine(), engine.getServiceContext() ); @@ -62,8 +62,8 @@ public void shouldUnSetProperty() { // When: CustomExecutors.UNSET_PROPERTY.execute( - engine.configure("UNSET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "';") - .withProperties(properties), + engine.configure("UNSET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "';"), + properties, engine.getEngine(), engine.getServiceContext() ); @@ -71,6 +71,4 @@ public void shouldUnSetProperty() { // Then: assertThat(properties, not(hasKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); } - - } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java index 6b37efa54b45..3e54152011c8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java @@ -83,7 +83,7 @@ public void setUp() { when(ksqlEngine.prepare(any())) .thenAnswer(invocation -> new DefaultKsqlParser().prepare(invocation.getArgument(0), metaStore)); - when(distributor.execute(any(), any(), any())).thenReturn(Optional.of(entity)); + when(distributor.execute(any(), any(), any(), any())).thenReturn(Optional.of(entity)); doNothing().when(sync).waitFor(any(), any()); } @@ -107,6 +107,7 @@ public void shouldUseCustomExecutor() { preparedStatement(instanceOf(CreateStream.class)), ImmutableMap.of(), ksqlConfig))), + eq(ImmutableMap.of()), eq(ksqlEngine), eq(serviceContext) ); @@ -129,6 +130,7 @@ public void shouldDefaultToDistributor() { preparedStatement(instanceOf(CreateStream.class)), ImmutableMap.of(), ksqlConfig))), + eq(ImmutableMap.of()), eq(ksqlEngine), eq(serviceContext) ); @@ -155,6 +157,7 @@ public void shouldDistributeProperties() { preparedStatement(instanceOf(CreateStream.class)), ImmutableMap.of("x", "y"), ksqlConfig))), + any(), eq(ksqlEngine), eq(serviceContext) ); @@ -210,8 +213,8 @@ public void shouldInlineRunScriptStatements() { // Then: verify(customExecutor, times(1)) .execute( - argThat(is( - configured(preparedStatementText(SOME_STREAM_SQL)))), + argThat(is(configured(preparedStatementText(SOME_STREAM_SQL)))), + any(), eq(ksqlEngine), eq(serviceContext) ); @@ -232,9 +235,10 @@ public void shouldOnlyReturnLastInRunScript() { CreateStream.class, entity1, entity2); givenRequestHandler(ImmutableMap.of(CreateStream.class, customExecutor)); - // When: final List statements = new DefaultKsqlParser() .parse("RUN SCRIPT '/some/script.sql';" ); + + // When: final KsqlEntityList result = handler.execute(serviceContext, statements, props); // Then: @@ -261,6 +265,7 @@ private StatementExecutor givenReturningExecutor( final StatementExecutor customExecutor = mock(StatementExecutor.class); when(customExecutor.execute( argThat(is(configured(preparedStatement(instanceOf(statementClass))))), + any(), eq(ksqlEngine), eq(serviceContext) )) @@ -268,7 +273,7 @@ private StatementExecutor givenReturningExecutor( return customExecutor; } - private Matcher hasItems(final KsqlEntity... items) { + private static Matcher hasItems(final KsqlEntity... items) { return new TypeSafeMatcher() { @Override protected boolean matchesSafely(KsqlEntityList actual) { @@ -290,5 +295,4 @@ public void describeTo(Description description) { } }; } - } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 82ad82c6789e..9781addd59ff 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -1302,6 +1302,25 @@ public void shouldSetProperty() { assertThat(results.get(0).getStatementText(), is(csas)); } + @Test + public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() { + // Given: + final String csas = "CREATE STREAM " + streamName + " AS SELECT * FROM test_stream;"; + + // When: + makeMultipleRequest( + csas + + "SET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "' = 'earliest';", + CommandStatusEntity.class); + + // Then: + verify(commandStore).enqueueCommand( + argThat(is(configured( + preparedStatementText(csas), + ImmutableMap.of(), + ksqlConfig)))); + } + @Test public void shouldFailSetPropertyOnInvalidPropertyName() { // When: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PrintTopicValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PrintTopicValidatorTest.java index 103da423054b..548c4289a750 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PrintTopicValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PrintTopicValidatorTest.java @@ -74,6 +74,7 @@ public void shouldThrowExceptionOnPrintTopic() { // When: CustomValidators.PRINT_TOPIC.validate( query, + ImmutableMap.of(), ksqlEngine, serviceContext ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PropertyOverriderTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PropertyOverriderTest.java index 0e1e268c1a46..8df33d5dcfb2 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PropertyOverriderTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/PropertyOverriderTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.UnsetProperty; @@ -55,7 +56,9 @@ public void shouldFailOnUnknownSetProperty() { "SET 'consumer.invalid'='value';", new SetProperty(Optional.empty(), "consumer.invalid", "value")), new HashMap<>(), - engine.getKsqlConfig()), + engine.getKsqlConfig() + ), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ); @@ -72,8 +75,10 @@ public void shouldAllowSetKnownProperty() { PreparedStatement.of( "SET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "' = 'earliest';", new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")), - properties, - engine.getKsqlConfig()), + ImmutableMap.of(), + engine.getKsqlConfig() + ), + properties, engine.getEngine(), engine.getServiceContext() ); @@ -95,8 +100,10 @@ public void shouldFailOnInvalidSetPropertyValue() { PreparedStatement.of( "SET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "' = 'invalid';", new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "invalid")), - new HashMap<>(), - engine.getKsqlConfig()), + ImmutableMap.of(), + engine.getKsqlConfig() + ), + new HashMap<>(), engine.getEngine(), engine.getServiceContext() ); @@ -115,7 +122,9 @@ public void shouldFailOnUnknownUnsetProperty() { "UNSET 'consumer.invalid';", new UnsetProperty(Optional.empty(), "consumer.invalid")), new HashMap<>(), - engine.getKsqlConfig()), + engine.getKsqlConfig() + ), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ); @@ -133,8 +142,10 @@ public void shouldAllowUnsetKnownProperty() { PreparedStatement.of( "UNSET '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "';", new UnsetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)), - properties, - engine.getKsqlConfig()), + ImmutableMap.of(), + engine.getKsqlConfig() + ), + properties, engine.getEngine(), engine.getServiceContext() ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/QueryValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/QueryValidatorTest.java index c246eec6a2b5..4e2b2a615d21 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/QueryValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/QueryValidatorTest.java @@ -63,6 +63,7 @@ public void shouldThrowExceptionOnQueryEndpoint() { // When: CustomValidators.QUERY_ENDPOINT.validate( query, + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java index 062e37cfdd6d..08d9e451c34b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java @@ -128,6 +128,7 @@ public void shouldCallStatementValidator() { // Then: verify(statementValidator, times(1)).validate( argThat(is(configured(preparedStatement(instanceOf(CreateStream.class))))), + eq(ImmutableMap.of()), eq(executionContext), any() ); @@ -156,7 +157,7 @@ public void shouldThrowExceptionIfValidationFails() { ImmutableMap.of(CreateStream.class, statementValidator) ); doThrow(new KsqlException("Fail")) - .when(statementValidator).validate(any(), any(), any()); + .when(statementValidator).validate(any(), any(), any(), any()); final List statements = givenParsed(SOME_STREAM_SQL); @@ -242,6 +243,7 @@ public void shouldValidateRunScript() { // Then: verify(statementValidator, times(1)).validate( argThat(is(configured(preparedStatement(instanceOf(CreateStream.class))))), + any(), eq(executionContext), any() ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java index a936d083cd0c..91ec449d3614 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java @@ -53,6 +53,7 @@ public void shouldFailOnTerminateUnknownQueryId() { ImmutableMap.of(), engine.getKsqlConfig() ), + ImmutableMap.of(), engine.getEngine(), engine.getServiceContext() ); @@ -72,6 +73,7 @@ public void shouldValidateKnownQueryId() { ImmutableMap.of(), engine.getKsqlConfig() ), + ImmutableMap.of(), mockEngine, engine.getServiceContext() );