From 631751dd40bc01c2e9a3254c3528d1443c9b28a6 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 6 Nov 2019 07:47:41 -0800 Subject: [PATCH 1/3] feat: add flag to disable pull queries (MINOR) --- .../ksql/config/ImmutableProperties.java | 1 + .../io/confluent/ksql/util/KsqlConfig.java | 11 +++++ .../server/execution/StaticQueryExecutor.java | 9 ++++ .../ksql/rest/server/TemporaryEngine.java | 16 ++++-- .../execution/StaticQueryExecutorTest.java | 49 ++++++++++++++++--- 5 files changed, 74 insertions(+), 12 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java index 14a3fcb158b3..77c3f5409523 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java @@ -29,6 +29,7 @@ public final class ImmutableProperties { .add(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) .add(KsqlConfig.KSQL_EXT_DIR) .add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG) + .add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG) .addAll(KsqlConfig.SSL_CONFIG_NAMES) .build(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 2afcf2c4e206..623ac5559758 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -196,6 +196,11 @@ public class KsqlConfig extends AbstractConfig { + "whether the Kafka cluster supports the required API, and enables the validator if " + "it does."; + public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable"; + public static final String KSQL_PULL_QUERIES_ENABLE_DOC = + "Config to enable or disable transient pull queries on a specific KSQL server."; + public static final boolean KSQL_PULL_QUERIES_ENABLE_DEFAULT = true; + public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS = ImmutableList.of( new CompatibilityBreakingConfigDef( @@ -560,6 +565,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC + ).define( + KSQL_PULL_QUERIES_ENABLE_CONFIG, + Type.BOOLEAN, + KSQL_PULL_QUERIES_ENABLE_DEFAULT, + Importance.LOW, + KSQL_PULL_QUERIES_ENABLE_DOC ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java index b216613dbe30..d626fe5f85ae 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java @@ -119,6 +119,15 @@ public static void validate( ) { final Query queryStmt = statement.getStatement(); + if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)) { + throw new KsqlRestException( + Errors.badStatement( + "Pull queries are disabled on this KSQL server - please set " + + KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG + "=true to enable this feature. " + + "If you intended to issue a push query, resubmit the query with EMIT CHANGES.", + statement.getStatementText())); + } + if (!queryStmt.isStatic()) { throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText())); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java index 6267656c5fee..1c5738fd500b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java @@ -56,6 +56,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.junit.rules.ExternalResource; @@ -75,6 +76,7 @@ public class TemporaryEngine extends ExternalResource { private KsqlConfig ksqlConfig; private KsqlEngine engine; private ServiceContext serviceContext; + private Map configs = ImmutableMap.of(); @Override protected void before() { @@ -86,10 +88,11 @@ protected void before() { ksqlConfig = KsqlConfigTestUtil.create( "localhost:9092", - ImmutableMap.of( - "ksql.command.topic.suffix", "commands", - RestConfig.LISTENERS_CONFIG, "http://localhost:8088" - ) + ImmutableMap.builder() + .putAll(configs) + .put("ksql.command.topic.suffix", "commands") + .put( RestConfig.LISTENERS_CONFIG, "http://localhost:8088" ) + .build() ); final SqlTypeParser typeParser = SqlTypeParser.create(TypeRegistry.EMPTY); @@ -100,6 +103,11 @@ protected void before() { udtfLoader.loadUdtfFromClass(TestUdtf2.class, "whatever"); } + public TemporaryEngine withConfigs(final Map configs) { + this.configs = configs; + return this; + } + @Override protected void after() { engine.close(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java index f679af84cd31..0c9de3c89c0a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java @@ -30,24 +30,57 @@ import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.server.validation.CustomValidators; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; import org.eclipse.jetty.http.HttpStatus.Code; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; -@RunWith(MockitoJUnitRunner.class) +@RunWith(Enclosed.class) public class StaticQueryExecutorTest { - @Rule - public final TemporaryEngine engine = new TemporaryEngine(); + public static class Disabled { + @Rule + public final TemporaryEngine engine = new TemporaryEngine() + .withConfigs(ImmutableMap.of(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG, false)); - @Rule - public final ExpectedException expectedException = ExpectedException.none(); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); - @Test - public void shouldThrowExceptionOnQueryEndpoint() { + @Test + public void shouldThrowExceptionIfConfigDisabled() { + + testForFailure( + engine, + expectedException, + "Pull queries are disabled on this KSQL server" + ); + } + } + + public static class Enabled { + @Rule + public final TemporaryEngine engine = new TemporaryEngine(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldThrowExceptionOnQueryEndpoint() { + testForFailure( + engine, + expectedException, + "The following statement types should be issued to the websocket endpoint '/query'" + ); + } + } + + private static void testForFailure( + TemporaryEngine engine, ExpectedException expectedException, String errorMessage + ) { // Given: final ConfiguredStatement query = ConfiguredStatement.of( PreparedStatement.of("SELECT * FROM test_table;", mock(Query.class)), @@ -59,7 +92,7 @@ public void shouldThrowExceptionOnQueryEndpoint() { expectedException.expect(KsqlRestException.class); expectedException.expect(exceptionStatusCode(is(Code.BAD_REQUEST))); expectedException.expect(exceptionStatementErrorMessage(errorMessage(containsString( - "The following statement types should be issued to the websocket endpoint '/query'" + errorMessage )))); expectedException.expect(exceptionStatementErrorMessage(statement(containsString( "SELECT * FROM test_table")))); From 2c21ccf9da345fdb45b10819607b7c045fcb64bc Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 6 Nov 2019 11:04:11 -0800 Subject: [PATCH 2/3] chore: update the error message --- .../ksql/rest/server/execution/StaticQueryExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java index d626fe5f85ae..9a6cf053e311 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java @@ -124,7 +124,8 @@ public static void validate( Errors.badStatement( "Pull queries are disabled on this KSQL server - please set " + KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG + "=true to enable this feature. " - + "If you intended to issue a push query, resubmit the query with EMIT CHANGES.", + + "If you intended to issue a push query, resubmit the query with the " + + "EMIT CHANGES clause.", statement.getStatementText())); } From 0daffc2afc760784e4e75f40a642babedd3842cb Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 6 Nov 2019 11:33:06 -0800 Subject: [PATCH 3/3] chore: remove unused imports --- .../ksql/rest/server/execution/StaticQueryExecutorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java index 0c9de3c89c0a..bc13d2591dd5 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java @@ -37,7 +37,6 @@ import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; @RunWith(Enclosed.class) public class StaticQueryExecutorTest {