From aade4fcf58b6c4e99bb99704a12595ec5409bb9d Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Mon, 28 Oct 2019 16:16:59 -0700 Subject: [PATCH 1/3] fix: quoted identifiers for source names --- .../quoted-identifiers.json | 13 ++++++ .../io/confluent/ksql/parser/SqlBase.g4 | 26 ++++++----- .../io/confluent/ksql/parser/AstBuilder.java | 43 ++++++++++--------- .../ksql/util/DataSourceExtractor.java | 18 ++++---- .../io/confluent/ksql/util/ParserUtil.java | 24 +++++++++++ .../confluent/ksql/parser/KsqlParserTest.java | 24 +++++++++++ 6 files changed, 108 insertions(+), 40 deletions(-) diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json b/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json index c2227068bd2d..cb11f606ae85 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json @@ -89,6 +89,19 @@ "outputs": [ {"topic": "JOINED", "key": "1", "value": { "field!": "A"}} ] + }, + { + "name": "source names requiring quotes", + "statements": [ + "CREATE STREAM `foo-source` (id VARCHAR) WITH (kafka_topic='foo-source', value_format='JSON');", + "CREATE STREAM `foo-too` AS SELECT * FROM `foo-source`;" + ], + "inputs": [ + {"topic": "foo-source", "value": {"id": "1"}} + ], + "outputs": [ + {"topic": "foo-too", "value": {"ID": "1"}} + ] } ] } \ No newline at end of file diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index c9947dc59db2..251922896ede 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -40,7 +40,7 @@ statement | (LIST | SHOW) FUNCTIONS #listFunctions | (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors | (LIST | SHOW) TYPES #listTypes - | DESCRIBE EXTENDED? identifier #showColumns + | DESCRIBE EXTENDED? sourceName #showColumns | DESCRIBE FUNCTION identifier #describeFunction | DESCRIBE CONNECTOR identifier #describeConnector | PRINT (identifier| STRING) printClause #printTopic @@ -48,22 +48,22 @@ statement | TERMINATE QUERY? identifier #terminateQuery | SET STRING EQ STRING #setProperty | UNSET STRING #unsetProperty - | CREATE STREAM (IF NOT EXISTS)? identifier + | CREATE STREAM (IF NOT EXISTS)? sourceName (tableElements)? (WITH tableProperties)? #createStream - | CREATE STREAM (IF NOT EXISTS)? identifier + | CREATE STREAM (IF NOT EXISTS)? sourceName (WITH tableProperties)? AS query (PARTITION BY identifier)? #createStreamAs - | CREATE TABLE (IF NOT EXISTS)? identifier + | CREATE TABLE (IF NOT EXISTS)? sourceName (tableElements)? (WITH tableProperties)? #createTable - | CREATE TABLE (IF NOT EXISTS)? identifier + | CREATE TABLE (IF NOT EXISTS)? sourceName (WITH tableProperties)? AS query #createTableAs | CREATE (SINK | SOURCE) CONNECTOR identifier WITH tableProperties #createConnector - | INSERT INTO identifier query (PARTITION BY identifier)? #insertInto - | INSERT INTO identifier (columns)? VALUES values #insertValues - | DROP STREAM (IF EXISTS)? identifier (DELETE TOPIC)? #dropStream - | DROP TABLE (IF EXISTS)? identifier (DELETE TOPIC)? #dropTable + | INSERT INTO sourceName query (PARTITION BY identifier)? #insertInto + | INSERT INTO sourceName (columns)? VALUES values #insertValues + | DROP STREAM (IF EXISTS)? sourceName (DELETE TOPIC)? #dropStream + | DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable | DROP CONNECTOR identifier #dropConnector | EXPLAIN (statement | identifier) #explain | RUN SCRIPT STRING #runScript @@ -200,7 +200,7 @@ joinCriteria ; aliasedRelation - : relationPrimary (AS? identifier)? + : relationPrimary (AS? sourceName)? ; columns @@ -208,7 +208,7 @@ columns ; relationPrimary - : identifier #tableName + : sourceName #tableName ; expression @@ -304,6 +304,10 @@ identifier | DIGIT_IDENTIFIER #digitIdentifier ; +sourceName + : identifier + ; + number : DECIMAL_VALUE #decimalLiteral | INTEGER_VALUE #integerLiteral diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index e1c1dfde1395..cdd93dd5bfdf 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -72,6 +72,7 @@ import io.confluent.ksql.parser.SqlBaseParser.ListTypesContext; import io.confluent.ksql.parser.SqlBaseParser.RegisterTypeContext; import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext; +import io.confluent.ksql.parser.SqlBaseParser.SourceNameContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext; import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; @@ -240,7 +241,7 @@ public Node visitCreateTable(final SqlBaseParser.CreateTableContext context) { return new CreateTable( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier())), + ParserUtil.getSourceName(context.sourceName()), TableElements.of(elements), context.EXISTS() != null, CreateSourceProperties.from(properties) @@ -257,7 +258,7 @@ public Node visitCreateStream(final SqlBaseParser.CreateStreamContext context) { return new CreateStream( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier())), + ParserUtil.getSourceName(context.sourceName()), TableElements.of(elements), context.EXISTS() != null, CreateSourceProperties.from(properties) @@ -272,11 +273,11 @@ public Node visitCreateStreamAs(final SqlBaseParser.CreateStreamAsContext contex return new CreateStreamAsSelect( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier(0))), + ParserUtil.getSourceName(context.sourceName()), query, context.EXISTS() != null, CreateSourceAsProperties.from(properties), - getPartitionBy(context.identifier(1)) + getPartitionBy(context.identifier()) ); } @@ -288,7 +289,7 @@ public Node visitCreateTableAs(final SqlBaseParser.CreateTableAsContext context) return new CreateTableAsSelect( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier())), + ParserUtil.getSourceName(context.sourceName()), query, context.EXISTS() != null, CreateSourceAsProperties.from(properties) @@ -312,8 +313,8 @@ public Node visitCreateConnector(final CreateConnectorContext context) { @Override public Node visitInsertInto(final SqlBaseParser.InsertIntoContext context) { - final SourceName targetName = SourceName.of(getIdentifierText(context.identifier(0))); - final Optional targetLocation = getLocation(context.identifier(0)); + final SourceName targetName = ParserUtil.getSourceName(context.sourceName()); + final Optional targetLocation = getLocation(context.sourceName()); final DataSource target = getSource(targetName, targetLocation); @@ -329,13 +330,13 @@ public Node visitInsertInto(final SqlBaseParser.InsertIntoContext context) { getLocation(context), targetName, query, - getPartitionBy(context.identifier(1))); + getPartitionBy(context.identifier())); } @Override public Node visitInsertValues(final InsertValuesContext context) { - final String targetName = getIdentifierText(context.identifier()); - final Optional targetLocation = getLocation(context.identifier()); + final SourceName targetName = ParserUtil.getSourceName(context.sourceName()); + final Optional targetLocation = getLocation(context.sourceName()); final List columns; if (context.columns() != null) { @@ -350,7 +351,7 @@ public Node visitInsertValues(final InsertValuesContext context) { return new InsertValues( targetLocation, - SourceName.of(targetName), + targetName, columns, visit(context.values().valueExpression(), Expression.class)); } @@ -359,7 +360,7 @@ public Node visitInsertValues(final InsertValuesContext context) { public Node visitDropTable(final SqlBaseParser.DropTableContext context) { return new DropTable( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier())), + ParserUtil.getSourceName(context.sourceName()), context.EXISTS() != null, context.DELETE() != null ); @@ -369,7 +370,7 @@ public Node visitDropTable(final SqlBaseParser.DropTableContext context) { public Node visitDropStream(final SqlBaseParser.DropStreamContext context) { return new DropStream( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier())), + ParserUtil.getSourceName(context.sourceName()), context.EXISTS() != null, context.DELETE() != null ); @@ -670,7 +671,7 @@ public Node visitListTypes(final ListTypesContext ctx) { public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) { return new TerminateQuery( getLocation(context), - context.identifier().getText() + ParserUtil.getIdentifierText(context.identifier()) ); } @@ -678,7 +679,7 @@ public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext contex public Node visitShowColumns(final SqlBaseParser.ShowColumnsContext context) { return new ShowColumns( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier())), + ParserUtil.getSourceName(context.sourceName()), context.EXTENDED() != null ); } @@ -781,19 +782,19 @@ public Node visitJoinRelation(final SqlBaseParser.JoinRelationContext context) { public Node visitAliasedRelation(final SqlBaseParser.AliasedRelationContext context) { final Relation child = (Relation) visit(context.relationPrimary()); - final String alias; + final SourceName alias; switch (context.children.size()) { case 1: final Table table = (Table) visit(context.relationPrimary()); - alias = table.getName().name(); + alias = table.getName(); break; case 2: - alias = context.children.get(1).getText(); + alias = ParserUtil.getSourceName((SourceNameContext) context.children.get(1)); break; case 3: - alias = context.children.get(2).getText(); + alias = ParserUtil.getSourceName((SourceNameContext) context.children.get(2)); break; default: @@ -803,14 +804,14 @@ public Node visitAliasedRelation(final SqlBaseParser.AliasedRelationContext cont ); } - return new AliasedRelation(getLocation(context), child, SourceName.of(alias.toUpperCase())); + return new AliasedRelation(getLocation(context), child, alias); } @Override public Node visitTableName(final SqlBaseParser.TableNameContext context) { return new Table( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier())) + ParserUtil.getSourceName(context.sourceName()) ); } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/DataSourceExtractor.java b/ksql-parser/src/main/java/io/confluent/ksql/util/DataSourceExtractor.java index 600c680e9936..624af9df29da 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/util/DataSourceExtractor.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/util/DataSourceExtractor.java @@ -25,6 +25,7 @@ import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.parser.SqlBaseBaseVisitor; import io.confluent.ksql.parser.SqlBaseParser; +import io.confluent.ksql.parser.SqlBaseParser.SourceNameContext; import io.confluent.ksql.parser.tree.AliasedRelation; import io.confluent.ksql.parser.tree.AstNode; import io.confluent.ksql.parser.tree.Table; @@ -117,25 +118,26 @@ public AstNode visitQuery(final SqlBaseParser.QueryContext ctx) { public AstNode visitTableName(final SqlBaseParser.TableNameContext context) { return new Table( getLocation(context), - SourceName.of(ParserUtil.getIdentifierText(context.identifier()))); + ParserUtil.getSourceName(context.sourceName()) + ); } @Override public AstNode visitAliasedRelation(final SqlBaseParser.AliasedRelationContext context) { final Table table = (Table) visit(context.relationPrimary()); - final String alias; + final SourceName alias; switch (context.children.size()) { case 1: - alias = table.getName().name().toUpperCase(); + alias = table.getName(); break; case 2: - alias = context.children.get(1).getText().toUpperCase(); + alias = ParserUtil.getSourceName((SourceNameContext) context.children.get(1)); break; case 3: - alias = context.children.get(2).getText().toUpperCase(); + alias = ParserUtil.getSourceName((SourceNameContext) context.children.get(2)); break; default: @@ -146,8 +148,8 @@ public AstNode visitAliasedRelation(final SqlBaseParser.AliasedRelationContext c } if (!isJoin) { - fromAlias = SourceName.of(alias); - fromName = SourceName.of(table.getName().name().toUpperCase()); + fromAlias = alias; + fromName = table.getName(); if (metaStore.getSource(fromName) == null) { throw new KsqlException(table.getName().name() + " does not exist."); } @@ -155,7 +157,7 @@ public AstNode visitAliasedRelation(final SqlBaseParser.AliasedRelationContext c return null; } - return new AliasedRelation(getLocation(context), table, SourceName.of(alias.toUpperCase())); + return new AliasedRelation(getLocation(context), table, alias); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java index 374f4d670cf6..06e12d36180b 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/util/ParserUtil.java @@ -22,21 +22,45 @@ import io.confluent.ksql.execution.expression.tree.IntegerLiteral; import io.confluent.ksql.execution.expression.tree.Literal; import io.confluent.ksql.execution.expression.tree.LongLiteral; +import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.parser.ParsingException; import io.confluent.ksql.parser.SqlBaseParser; import io.confluent.ksql.parser.SqlBaseParser.IntegerLiteralContext; import io.confluent.ksql.parser.SqlBaseParser.NumberContext; +import io.confluent.ksql.parser.SqlBaseParser.SourceNameContext; +import io.confluent.ksql.parser.exception.ParseFailedException; import java.util.Optional; +import java.util.regex.Pattern; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.Token; import org.antlr.v4.runtime.tree.TerminalNode; public final class ParserUtil { + /** + * Source names must adhere to the kafka topic naming convention. We restrict + * it here instead of as a parser rule to allow for a more descriptive error + * message and to avoid duplicated rules. + * + * @see org.apache.kafka.streams.state.StoreBuilder#name + */ + private static final Pattern VALID_SOURCE_NAMES = Pattern.compile("[a-zA-Z0-9_-]*"); + private ParserUtil() { } + public static SourceName getSourceName(final SourceNameContext sourceName) { + final String text = getIdentifierText(sourceName.identifier()); + if (!VALID_SOURCE_NAMES.matcher(text).matches()) { + throw new ParseFailedException( + "Illegal argument at " + getLocation(sourceName).map(NodeLocation::toString).orElse("?") + + ". Source names may only contain alphanumeric values, '_' or '-'. Got: '" + + text + "'"); + } + return SourceName.of(text); + } + public static String getIdentifierText(final SqlBaseParser.IdentifierContext context) { return getIdentifierText(false, context); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 5530c9bbc34c..b5255606f415 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -76,6 +76,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.TableElements; +import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -458,6 +459,29 @@ public void testReservedRowKeyAlias() { KsqlParserTestUtil.buildSingleAst("SELECT C2 as ROWKEY FROM test1 t1;", metaStore); } + @Test + public void shouldThrowOnNonAlphanumericSourceName() { + // Expect: + expectedException.expect(ParseFailedException.class); + expectedException.expectMessage("Got: 'foo!bar'"); + + // When: + KsqlParserTestUtil.buildSingleAst( + "CREATE STREAM `foo!bar` WITH (kafka_topic='foo', value_format='AVRO');", + metaStore); + } + + @Test + public void shouldAllowEscapedTerminateQuery() { + // When: + final PreparedStatement statement = KsqlParserTestUtil + .buildSingleAst("TERMINATE QUERY `CSAS-foo_2`;", metaStore); + + // Then: + assertThat(statement.getStatement().getQueryId().getId(), is("CSAS-foo_2")); + } + + @Test public void testSelectAllJoin() { // When: From 9aaf625b5af695f2da1e1c7723231cf434d972ea Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 30 Oct 2019 08:36:10 -0700 Subject: [PATCH 2/3] test: add some more test coverage --- .../quoted-identifiers.json | 2 +- .../insert-values.json | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json b/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json index cb11f606ae85..dd7747a710c7 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json @@ -94,7 +94,7 @@ "name": "source names requiring quotes", "statements": [ "CREATE STREAM `foo-source` (id VARCHAR) WITH (kafka_topic='foo-source', value_format='JSON');", - "CREATE STREAM `foo-too` AS SELECT * FROM `foo-source`;" + "CREATE STREAM `foo-too` AS SELECT `foo-source`.id FROM `foo-source`;" ], "inputs": [ {"topic": "foo-source", "value": {"id": "1"}} diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json index 5cfdc22e3c77..e7b2c36fdadc 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json @@ -257,6 +257,21 @@ "outputs": [ {"topic": "test_topic", "key": null, "value": {"I": -1, "A": [1, 2, 3]}} ] + }, + { + "name": "should handle quoted identifiers", + "statements": [ + "CREATE STREAM `test` (`id!` INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "INSERT INTO `test` (ROWTIME, ROWKEY, `id!`) VALUES (1234, 'key', 10);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "timestamp": 1234, "key": "key", "value": {"id!": 10}} + ], + "responses": [ + {"@type": "currentStatus", "statementText": "{STATEMENT}"} + ] } ] } \ No newline at end of file From d0fd09984b07c05d53186119ce722fa8d2deb2ba Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 30 Oct 2019 10:56:00 -0700 Subject: [PATCH 3/3] fix: fix more tests --- .../src/main/java/io/confluent/ksql/parser/AstBuilder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index cdd93dd5bfdf..6bf4a9ffffef 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -671,7 +671,8 @@ public Node visitListTypes(final ListTypesContext ctx) { public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) { return new TerminateQuery( getLocation(context), - ParserUtil.getIdentifierText(context.identifier()) + // use case sensitive parsing here to maintain backwards compatibility + ParserUtil.getIdentifierText(true, context.identifier()) ); }