From 04e206f1148da72080fb858b74eae9b8fea1cadd Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 6 Nov 2019 14:24:06 -0800 Subject: [PATCH] feat: add flag to disable pull queries (MINOR) (#3778) --- .../ksql/config/ImmutableProperties.java | 1 + .../io/confluent/ksql/util/KsqlConfig.java | 11 ++++ .../server/execution/StaticQueryExecutor.java | 10 ++++ .../ksql/rest/server/TemporaryEngine.java | 16 ++++-- .../execution/StaticQueryExecutorTest.java | 50 +++++++++++++++---- 5 files changed, 75 insertions(+), 13 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java index 14a3fcb158b3..77c3f5409523 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java @@ -29,6 +29,7 @@ public final class ImmutableProperties { .add(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) .add(KsqlConfig.KSQL_EXT_DIR) .add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG) + .add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG) .addAll(KsqlConfig.SSL_CONFIG_NAMES) .build(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index d4733fba81b1..8f7e9c4dba7a 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -197,6 +197,11 @@ public class KsqlConfig extends AbstractConfig { + "whether the Kafka cluster supports the required API, and enables the validator if " + "it does."; + public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable"; + public static final String KSQL_PULL_QUERIES_ENABLE_DOC = + "Config to enable or disable transient pull queries on a specific KSQL server."; + public static final boolean KSQL_PULL_QUERIES_ENABLE_DEFAULT = true; + public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS = ImmutableList.of( new CompatibilityBreakingConfigDef( @@ -573,6 +578,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC + ).define( + KSQL_PULL_QUERIES_ENABLE_CONFIG, + Type.BOOLEAN, + KSQL_PULL_QUERIES_ENABLE_DEFAULT, + Importance.LOW, + KSQL_PULL_QUERIES_ENABLE_DOC ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java index 5c60b92da655..bb9406ffd452 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java @@ -119,6 +119,16 @@ public static void validate( ) { final Query queryStmt = statement.getStatement(); + if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)) { + throw new KsqlRestException( + Errors.badStatement( + "Pull queries are disabled on this KSQL server - please set " + + KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG + "=true to enable this feature. " + + "If you intended to issue a push query, resubmit the query with the " + + "EMIT CHANGES clause.", + statement.getStatementText())); + } + if (!queryStmt.isStatic()) { throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText())); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java index 488e92bd7a92..22421bb353c3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TemporaryEngine.java @@ -56,6 +56,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.junit.rules.ExternalResource; @@ -77,6 +78,7 @@ public class TemporaryEngine extends ExternalResource { private KsqlConfig ksqlConfig; private KsqlEngine engine; private ServiceContext serviceContext; + private Map configs = ImmutableMap.of(); @Override protected void before() { @@ -88,10 +90,11 @@ protected void before() { ksqlConfig = KsqlConfigTestUtil.create( "localhost:9092", - ImmutableMap.of( - "ksql.command.topic.suffix", "commands", - RestConfig.LISTENERS_CONFIG, "http://localhost:8088" - ) + ImmutableMap.builder() + .putAll(configs) + .put("ksql.command.topic.suffix", "commands") + .put( RestConfig.LISTENERS_CONFIG, "http://localhost:8088" ) + .build() ); final SqlTypeParser typeParser = SqlTypeParser.create(TypeRegistry.EMPTY); @@ -102,6 +105,11 @@ protected void before() { udtfLoader.loadUdtfFromClass(TestUdtf2.class, "whatever"); } + public TemporaryEngine withConfigs(final Map configs) { + this.configs = configs; + return this; + } + @Override protected void after() { engine.close(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java index f679af84cd31..bc13d2591dd5 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java @@ -30,24 +30,56 @@ import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.server.validation.CustomValidators; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; import org.eclipse.jetty.http.HttpStatus.Code; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; -@RunWith(MockitoJUnitRunner.class) +@RunWith(Enclosed.class) public class StaticQueryExecutorTest { - @Rule - public final TemporaryEngine engine = new TemporaryEngine(); + public static class Disabled { + @Rule + public final TemporaryEngine engine = new TemporaryEngine() + .withConfigs(ImmutableMap.of(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG, false)); - @Rule - public final ExpectedException expectedException = ExpectedException.none(); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); - @Test - public void shouldThrowExceptionOnQueryEndpoint() { + @Test + public void shouldThrowExceptionIfConfigDisabled() { + + testForFailure( + engine, + expectedException, + "Pull queries are disabled on this KSQL server" + ); + } + } + + public static class Enabled { + @Rule + public final TemporaryEngine engine = new TemporaryEngine(); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldThrowExceptionOnQueryEndpoint() { + testForFailure( + engine, + expectedException, + "The following statement types should be issued to the websocket endpoint '/query'" + ); + } + } + + private static void testForFailure( + TemporaryEngine engine, ExpectedException expectedException, String errorMessage + ) { // Given: final ConfiguredStatement query = ConfiguredStatement.of( PreparedStatement.of("SELECT * FROM test_table;", mock(Query.class)), @@ -59,7 +91,7 @@ public void shouldThrowExceptionOnQueryEndpoint() { expectedException.expect(KsqlRestException.class); expectedException.expect(exceptionStatusCode(is(Code.BAD_REQUEST))); expectedException.expect(exceptionStatementErrorMessage(errorMessage(containsString( - "The following statement types should be issued to the websocket endpoint '/query'" + errorMessage )))); expectedException.expect(exceptionStatementErrorMessage(statement(containsString( "SELECT * FROM test_table"))));