Skip to content

Commit

Permalink
feat: add flag to disable pull queries (MINOR) (#3778)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Nov 6, 2019
1 parent 86209c9 commit 04e206f
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class ImmutableProperties {
.add(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
11 changes: 11 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public class KsqlConfig extends AbstractConfig {
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";

public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable";
public static final String KSQL_PULL_QUERIES_ENABLE_DOC =
"Config to enable or disable transient pull queries on a specific KSQL server.";
public static final boolean KSQL_PULL_QUERIES_ENABLE_DEFAULT = true;

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of(
new CompatibilityBreakingConfigDef(
Expand Down Expand Up @@ -573,6 +578,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
"",
Importance.LOW,
METRIC_REPORTER_CLASSES_DOC
).define(
KSQL_PULL_QUERIES_ENABLE_CONFIG,
Type.BOOLEAN,
KSQL_PULL_QUERIES_ENABLE_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_ENABLE_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ public static void validate(
) {
final Query queryStmt = statement.getStatement();

if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)) {
throw new KsqlRestException(
Errors.badStatement(
"Pull queries are disabled on this KSQL server - please set "
+ KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG + "=true to enable this feature. "
+ "If you intended to issue a push query, resubmit the query with the "
+ "EMIT CHANGES clause.",
statement.getStatementText()));
}

if (!queryStmt.isStatic()) {
throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.rules.ExternalResource;

Expand All @@ -77,6 +78,7 @@ public class TemporaryEngine extends ExternalResource {
private KsqlConfig ksqlConfig;
private KsqlEngine engine;
private ServiceContext serviceContext;
private Map<String, Object> configs = ImmutableMap.of();

@Override
protected void before() {
Expand All @@ -88,10 +90,11 @@ protected void before() {

ksqlConfig = KsqlConfigTestUtil.create(
"localhost:9092",
ImmutableMap.of(
"ksql.command.topic.suffix", "commands",
RestConfig.LISTENERS_CONFIG, "http://localhost:8088"
)
ImmutableMap.<String, Object>builder()
.putAll(configs)
.put("ksql.command.topic.suffix", "commands")
.put( RestConfig.LISTENERS_CONFIG, "http://localhost:8088" )
.build()
);

final SqlTypeParser typeParser = SqlTypeParser.create(TypeRegistry.EMPTY);
Expand All @@ -102,6 +105,11 @@ protected void before() {
udtfLoader.loadUdtfFromClass(TestUdtf2.class, "whatever");
}

public TemporaryEngine withConfigs(final Map<String, Object> configs) {
this.configs = configs;
return this;
}

@Override
protected void after() {
engine.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,56 @@
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import org.eclipse.jetty.http.HttpStatus.Code;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
@RunWith(Enclosed.class)
public class StaticQueryExecutorTest {

@Rule
public final TemporaryEngine engine = new TemporaryEngine();
public static class Disabled {
@Rule
public final TemporaryEngine engine = new TemporaryEngine()
.withConfigs(ImmutableMap.of(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG, false));

@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Test
public void shouldThrowExceptionOnQueryEndpoint() {
@Test
public void shouldThrowExceptionIfConfigDisabled() {

testForFailure(
engine,
expectedException,
"Pull queries are disabled on this KSQL server"
);
}
}

public static class Enabled {
@Rule
public final TemporaryEngine engine = new TemporaryEngine();

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Test
public void shouldThrowExceptionOnQueryEndpoint() {
testForFailure(
engine,
expectedException,
"The following statement types should be issued to the websocket endpoint '/query'"
);
}
}

private static void testForFailure(
TemporaryEngine engine, ExpectedException expectedException, String errorMessage
) {
// Given:
final ConfiguredStatement<Query> query = ConfiguredStatement.of(
PreparedStatement.of("SELECT * FROM test_table;", mock(Query.class)),
Expand All @@ -59,7 +91,7 @@ public void shouldThrowExceptionOnQueryEndpoint() {
expectedException.expect(KsqlRestException.class);
expectedException.expect(exceptionStatusCode(is(Code.BAD_REQUEST)));
expectedException.expect(exceptionStatementErrorMessage(errorMessage(containsString(
"The following statement types should be issued to the websocket endpoint '/query'"
errorMessage
))));
expectedException.expect(exceptionStatementErrorMessage(statement(containsString(
"SELECT * FROM test_table"))));
Expand Down

0 comments on commit 04e206f

Please sign in to comment.