Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flag to disable pull queries (MINOR) #3778

Merged
merged 3 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class ImmutableProperties {
.add(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ public class KsqlConfig extends AbstractConfig {
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";

public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable";
public static final String KSQL_PULL_QUERIES_ENABLE_DOC =
"Config to enable or disable transient pull queries on a specific KSQL server.";
public static final boolean KSQL_PULL_QUERIES_ENABLE_DEFAULT = true;

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of(
new CompatibilityBreakingConfigDef(
Expand Down Expand Up @@ -560,6 +565,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
"",
Importance.LOW,
METRIC_REPORTER_CLASSES_DOC
).define(
KSQL_PULL_QUERIES_ENABLE_CONFIG,
Type.BOOLEAN,
KSQL_PULL_QUERIES_ENABLE_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_ENABLE_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ public static void validate(
) {
final Query queryStmt = statement.getStatement();

if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)) {
throw new KsqlRestException(
Errors.badStatement(
"Pull queries are disabled on this KSQL server - please set "
+ KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG + "=true to enable this feature. "
+ "If you intended to issue a push query, resubmit the query with the "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just a matter of resubmitting the query with EMIT CHANGES, right? I thought push queries went to the /query endpoint, whereas pull queries went to the /ksql endpoint.

Copy link
Contributor Author

@agavra agavra Nov 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes and no - the CLI will handle that for you. if you submit the query to the wrong endpoint, it will also complain and tell you which endpoint you should be hitting. since all this is changing with #3742, I'd rather not bake too much of it into the error message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. As long as our error messages if pull/push queries are submitted to the wrong endpoint are good enough for non-CLI users to know how to respond, that works for me.

+ "EMIT CHANGES clause.",
statement.getStatementText()));
}

if (!queryStmt.isStatic()) {
throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.rules.ExternalResource;

Expand All @@ -75,6 +76,7 @@ public class TemporaryEngine extends ExternalResource {
private KsqlConfig ksqlConfig;
private KsqlEngine engine;
private ServiceContext serviceContext;
private Map<String, Object> configs = ImmutableMap.of();

@Override
protected void before() {
Expand All @@ -86,10 +88,11 @@ protected void before() {

ksqlConfig = KsqlConfigTestUtil.create(
"localhost:9092",
ImmutableMap.of(
"ksql.command.topic.suffix", "commands",
RestConfig.LISTENERS_CONFIG, "http://localhost:8088"
)
ImmutableMap.<String, Object>builder()
.putAll(configs)
.put("ksql.command.topic.suffix", "commands")
.put( RestConfig.LISTENERS_CONFIG, "http://localhost:8088" )
.build()
);

final SqlTypeParser typeParser = SqlTypeParser.create(TypeRegistry.EMPTY);
Expand All @@ -100,6 +103,11 @@ protected void before() {
udtfLoader.loadUdtfFromClass(TestUdtf2.class, "whatever");
}

public TemporaryEngine withConfigs(final Map<String, Object> configs) {
this.configs = configs;
return this;
}

@Override
protected void after() {
engine.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,57 @@
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import org.eclipse.jetty.http.HttpStatus.Code;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused Mockito imports.


@RunWith(MockitoJUnitRunner.class)
@RunWith(Enclosed.class)
public class StaticQueryExecutorTest {

@Rule
public final TemporaryEngine engine = new TemporaryEngine();
public static class Disabled {
@Rule
public final TemporaryEngine engine = new TemporaryEngine()
.withConfigs(ImmutableMap.of(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG, false));

@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Test
public void shouldThrowExceptionOnQueryEndpoint() {
@Test
public void shouldThrowExceptionIfConfigDisabled() {

testForFailure(
engine,
expectedException,
"Pull queries are disabled on this KSQL server"
);
}
}

public static class Enabled {
@Rule
public final TemporaryEngine engine = new TemporaryEngine();

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Test
public void shouldThrowExceptionOnQueryEndpoint() {
testForFailure(
engine,
expectedException,
"The following statement types should be issued to the websocket endpoint '/query'"
);
}
}

private static void testForFailure(
TemporaryEngine engine, ExpectedException expectedException, String errorMessage
) {
// Given:
final ConfiguredStatement<Query> query = ConfiguredStatement.of(
PreparedStatement.of("SELECT * FROM test_table;", mock(Query.class)),
Expand All @@ -59,7 +92,7 @@ public void shouldThrowExceptionOnQueryEndpoint() {
expectedException.expect(KsqlRestException.class);
expectedException.expect(exceptionStatusCode(is(Code.BAD_REQUEST)));
expectedException.expect(exceptionStatementErrorMessage(errorMessage(containsString(
"The following statement types should be issued to the websocket endpoint '/query'"
errorMessage
))));
expectedException.expect(exceptionStatementErrorMessage(statement(containsString(
"SELECT * FROM test_table"))));
Expand Down