Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: better error message when users enter old style syntax for query #3397

Merged
merged 16 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
4fde2c3
fix: better error message when users enter old style syntax for query
big-andy-coates Sep 23, 2019
d19b3a0
chore: improve msg
big-andy-coates Sep 23, 2019
6c23675
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/Continuou…
big-andy-coates Sep 23, 2019
cc14fa9
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
2d68bb6
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
4b53686
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
22d69cf
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
49d21b4
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
ebaede2
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
736f9f7
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
9f105ac
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
e4d8a7d
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
0209834
Update ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQue…
big-andy-coates Sep 23, 2019
e61dcba
chore: jim's requested changes
big-andy-coates Sep 23, 2019
8b4586f
chore: fix test
big-andy-coates Sep 23, 2019
1ea12a2
Merge branch 'master' into static_help
big-andy-coates Sep 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.planner.plan.JoinNode;
Expand All @@ -40,14 +41,14 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;

public class Analysis {

private final ResultMaterialization resultMaterialization;
private Optional<Into> into = Optional.empty();
private final List<AliasedDataSource> fromDataSources = new ArrayList<>();
private Optional<JoinInfo> joinInfo = Optional.empty();
Expand All @@ -61,6 +62,14 @@ public class Analysis {
private OptionalInt limitClause = OptionalInt.empty();
private CreateSourceAsProperties withProperties = CreateSourceAsProperties.none();

public Analysis(final ResultMaterialization resultMaterialization) {
this.resultMaterialization = requireNonNull(resultMaterialization, "resultMaterialization");
}

ResultMaterialization getResultMaterialization() {
return resultMaterialization;
}

void addSelectItem(final Expression expression, final String alias) {
selectExpressions.add(SelectExpression.of(alias, expression));
}
Expand Down Expand Up @@ -177,7 +186,7 @@ public Set<SerdeOption> getSerdeOptions() {
}

void setProperties(final CreateSourceAsProperties properties) {
withProperties = Objects.requireNonNull(properties, "properties");
withProperties = requireNonNull(properties, "properties");
}

public CreateSourceAsProperties getProperties() {
Expand Down Expand Up @@ -232,8 +241,8 @@ public static final class AliasedDataSource {
final String alias,
final DataSource<?> dataSource
) {
this.alias = Objects.requireNonNull(alias, "alias");
this.dataSource = Objects.requireNonNull(dataSource, "dataSource");
this.alias = requireNonNull(alias, "alias");
this.dataSource = requireNonNull(dataSource, "dataSource");

if (alias.trim().isEmpty()) {
throw new IllegalArgumentException("Alias or name can not be empty: '" + alias + "'");
Expand Down Expand Up @@ -264,10 +273,10 @@ public static final class JoinInfo {
final Optional<WithinExpression> withinExpression

) {
this.leftJoinField = Objects.requireNonNull(leftJoinField, "leftJoinField");
this.rightJoinField = Objects.requireNonNull(rightJoinField, "rightJoinField");
this.type = Objects.requireNonNull(type, "type");
this.withinExpression = Objects.requireNonNull(withinExpression, "withinExpression");
this.leftJoinField = requireNonNull(leftJoinField, "leftJoinField");
this.rightJoinField = requireNonNull(rightJoinField, "rightJoinField");
this.type = requireNonNull(type, "type");
this.withinExpression = requireNonNull(withinExpression, "withinExpression");

if (leftJoinField.trim().isEmpty()) {
throw new IllegalArgumentException("left join field name can not be empty");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
final Visitor visitor = new Visitor(query.isStatic());
final Visitor visitor = new Visitor(query);
visitor.process(query, null);

sink.ifPresent(visitor::analyzeNonStdOutSink);
Expand All @@ -145,13 +145,14 @@ Analysis analyze(
private final class Visitor extends DefaultTraversalVisitor<AstNode, Void> {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final Analysis analysis = new Analysis();
private final Analysis analysis;
private final boolean staticQuery;
private boolean isJoin = false;
private boolean isGroupBy = false;

Visitor(final boolean staticQuery) {
this.staticQuery = staticQuery;
Visitor(final Query query) {
this.staticQuery = query.isStatic();
this.analysis = new Analysis(query.getResultMaterialization());
}

private void analyzeNonStdOutSink(final Sink sink) {
Expand Down Expand Up @@ -286,7 +287,6 @@ protected AstNode visitQuery(
final Query node,
final Void context
) {

process(node.getFrom(), context);

process(node.getSelect(), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,14 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;

public class ContinuousQueryValidator implements QueryValidator {

@Override
public void preValidate(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
throw new IllegalArgumentException("static");
public void validate(final Analysis analysis) {
if (analysis.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new IllegalArgumentException("Continuous queries don't support `EMIT FINAL`.");
}

if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
throw new KsqlException("Continuous queries do not yet support `EMIT FINAL`. "
+ "Consider changing to `EMIT CHANGES`."
+ QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
);
}
}

@Override
public void postValidate(final Analysis analysis) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,6 @@

public class QueryAnalyzer {

static final String NEW_QUERY_SYNTAX_HELP = System.lineSeparator()
+ "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+ System.lineSeparator()
+ "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+ "static queries, i.e. they query the current state of the system and return a final "
+ "result."
+ System.lineSeparator()
+ "To turn a static query into a streaming query, as was the default in older versions "
+ "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+ System.lineSeparator()
+ "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+ "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+ "as a this will be required in a future release.";

private final Analyzer analyzer;
private final MetaStore metaStore;
private final QueryValidator continuousValidator;
Expand Down Expand Up @@ -90,18 +76,12 @@ public Analysis analyze(
final Query query,
final Optional<Sink> sink
) {
if (query.isStatic()) {
staticValidator.preValidate(query, sink);
} else {
continuousValidator.preValidate(query, sink);
}

final Analysis analysis = analyzer.analyze(query, sink);

if (query.isStatic()) {
staticValidator.postValidate(analysis);
staticValidator.validate(analysis);
} else {
continuousValidator.postValidate(analysis);
continuousValidator.validate(analysis);
}

return analysis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,10 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import java.util.Optional;

/**
* Validator used by {@link QueryAnalyzer}.
*/
interface QueryValidator {

void preValidate(Query query, Optional<Sink> sink);

void postValidate(Analysis analysis);
void validate(Analysis analysis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,110 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Query;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

public class StaticQueryValidator implements QueryValidator {

@Override
public void preValidate(
final Query query,
final Optional<Sink> sink
) {
if (!query.isStatic()) {
throw new IllegalArgumentException("not static");
}

if (query.getResultMaterialization() != ResultMaterialization.FINAL) {
throw new KsqlException("Static queries do not yet support `EMIT CHANGES`. "
+ "Consider removing 'EMIT CHANGES' to any bare query."
+ QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
);
}
private static final String NEW_QUERY_SYNTAX_HELP = " "
+ "Did you mean to execute a continuous query? Add an 'EMIT CHANGES' clause to do so."
+ System.lineSeparator()
+ System.lineSeparator()
+ "Query syntax in KSQL has changed. There are now two broad categories of queries:"
+ System.lineSeparator()
+ "- Static queries: query the current state of the system, return a result, and terminate "
+ "the query."
+ System.lineSeparator()
+ "- Streaming queries: query the state of the system in motion and continue to output "
+ "results until they meet a LIMIT clause condition or the user terminates the query."
+ System.lineSeparator()
+ System.lineSeparator()
+ "Use 'EMIT CHANGES' to indicate that a query is continuous and outputs all changes. "
+ "To convert a static query into a streaming query, which was the default behavior in older "
+ "versions of KSQL, add `EMIT CHANGES` to the end of the statement before any LIMIT clause."
+ System.lineSeparator()
+ System.lineSeparator()
+ "For example, the following are static queries:"
+ System.lineSeparator()
+ "\t'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)"
+ System.lineSeparator()
+ "\t'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)"
+ System.lineSeparator()
+ System.lineSeparator()
+ "The following is a streaming query:"
+ System.lineSeparator()
+ "\t'SELECT * FROM X EMIT CHANGES;'"
+ System.lineSeparator()
+ System.lineSeparator()
+ "Note: Persistent queries, like `CREATE TABLE AS ...`, have an implicit "
+ "`EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements.";

if (sink.isPresent()) {
throw new IllegalArgumentException("static queries should not have a sink");
}
}
private static final List<Rule> RULES = ImmutableList.of(
Rule.of(
analysis -> analysis.getResultMaterialization() == ResultMaterialization.FINAL,
"Static queries don't support `EMIT CHANGES`."
),
Rule.of(
analysis -> !analysis.getInto().isPresent(),
"Static queries don't support output to sinks."
),
Rule.of(
analysis -> !analysis.isJoin(),
"Static queries don't support JOIN clauses."
),
Rule.of(
analysis -> !analysis.getWindowExpression().isPresent(),
"Static queries don't support WINDOW clauses."
),
Rule.of(
analysis -> analysis.getGroupByExpressions().isEmpty(),
"Static queries don't support GROUP BY clauses."
),
Rule.of(
analysis -> !analysis.getPartitionBy().isPresent(),
"Static queries don't support PARTITION BY clauses."
),
Rule.of(
analysis -> !analysis.getHavingExpression().isPresent(),
"Static queries don't support HAVING clauses."
),
Rule.of(
analysis -> !analysis.getLimitClause().isPresent(),
"Static queries don't support LIMIT clauses."
)
);

@Override
public void postValidate(final Analysis analysis) {
if (analysis.getInto().isPresent()) {
throw new KsqlException("Static queries do not support outputting to sinks.");
public void validate(final Analysis analysis) {
try {
RULES.forEach(rule -> rule.check(analysis));
} catch (final KsqlException e) {
throw new KsqlException(e.getMessage() + NEW_QUERY_SYNTAX_HELP, e);
}
}

if (analysis.isJoin()) {
throw new KsqlException("Static queries do not support joins.");
}
private static final class Rule {

if (analysis.getWindowExpression().isPresent()) {
throw new KsqlException("Static queries do not support WINDOW clauses.");
}

if (!analysis.getGroupByExpressions().isEmpty()) {
throw new KsqlException("Static queries do not support GROUP BY clauses.");
}
private final Predicate<Analysis> condition;
private final String failureMsg;

if (analysis.getPartitionBy().isPresent()) {
throw new KsqlException("Static queries do not support PARTITION BY clauses.");
private static Rule of(final Predicate<Analysis> condition, final String failureMsg) {
return new Rule(condition, failureMsg);
}

if (analysis.getHavingExpression().isPresent()) {
throw new KsqlException("Static queries do not support HAVING clauses.");
private Rule(final Predicate<Analysis> condition, final String failureMsg) {
this.condition = Objects.requireNonNull(condition, "condition");
this.failureMsg = Objects.requireNonNull(failureMsg, "failureMsg");
}

if (analysis.getLimitClause().isPresent()) {
throw new KsqlException("Static queries do not support LIMIT clauses.");
public void check(final Analysis analysis) {
if (!condition.test(analysis)) {
throw new KsqlException(failureMsg);
}
}
}
}
Loading