diff --git a/docs-md/developer-guide/create-a-stream.md b/docs-md/developer-guide/create-a-stream.md index cf2f99fae120..a697a800c93b 100644 --- a/docs-md/developer-guide/create-a-stream.md +++ b/docs-md/developer-guide/create-a-stream.md @@ -5,11 +5,12 @@ tagline: Create a Stream from a Kafka topic description: Learn how to use the CREATE STREAM statement on a Kafka topic --- -In ksqlDB, you create streams from {{ site.aktm }} topics, and you create -streams of query results from other streams. +In ksqlDB, you create streams from existing {{ site.aktm }} topics, create +streams that will create new {{ site.aktm }} topics, or create streams of +query results from other streams. -- Use the CREATE STREAM statement to create a stream from a Kafka - topic. +- Use the CREATE STREAM statement to create a stream from an existing Kafka + topic, or a new Kafka topic. - Use the CREATE STREAM AS SELECT statement to create a query stream from an existing stream. @@ -17,10 +18,10 @@ streams of query results from other streams. Creating tables is similar to creating streams. For more information, see [Create a ksqlDB Table](create-a-table.md). -Create a Stream from a Kafka topic ----------------------------------- +Create a Stream from an existing Kafka topic +-------------------------------------------- -Use the CREATE STREAM statement to create a stream from an underlying +Use the CREATE STREAM statement to create a stream from an existing underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster. The following examples show how to create streams from a Kafka topic, @@ -102,6 +103,9 @@ The previous SQL statement makes no assumptions about the Kafka message key in the underlying Kafka topic. If the value of the message key in the topic is the same as one of the columns defined in the stream, you can specify the key in the WITH clause of the CREATE STREAM statement. +If you use this column name later to perform a join or a repartition, ksqlDB +knows that no repartition is needed. In effect, the named column becomes an +alias for ROWKEY. For example, if the Kafka message key has the same value as the `pageid` column, you can write the CREATE STREAM statement like this: @@ -180,6 +184,31 @@ Kafka topic : pageviews (partitions: 1, replication: 1) [...] ``` +Create a Stream backed by a new Kafka Topic +------------------------------------------- + +Use the CREATE STREAM statement to create a stream without a preexisting +topic by providing the PARTITIONS count, and optionally the REPLICA count, +in the WITH clause. + +Taking the example of the pageviews table from above, but where the underlying +Kafka topic does not already exist, you can create the stream by pasting +the following CREATE STREAM statement into the CLI: + +```sql +CREATE STREAM pageviews + (viewtime BIGINT, + userid VARCHAR, + pageid VARCHAR) + WITH (KAFKA_TOPIC='pageviews', + PARTITIONS=4, + REPLICAS=3 + VALUE_FORMAT='DELIMITED') + EMIT CHANGES; +``` + +This will create the pageviews topics for you with the supplied partition and replica count. + Create a Persistent Streaming Query from a Stream ------------------------------------------------- @@ -276,9 +305,9 @@ Your output should resemble: ``` Query ID | Kafka Topic | Query String - -------------------------------------------------------------------------------------------------------------------------------------------------------- + CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20' EMIT CHANGES; - -------------------------------------------------------------------------------------------------------------------------------------------------------- + For detailed information on a Query run: EXPLAIN ; ``` diff --git a/docs-md/developer-guide/create-a-table.md b/docs-md/developer-guide/create-a-table.md index deb2c58768ec..41fdfdce98ad 100644 --- a/docs-md/developer-guide/create-a-table.md +++ b/docs-md/developer-guide/create-a-table.md @@ -5,10 +5,12 @@ tagline: Create a Table from a Kafka topic description: Learn how to use the CREATE TABLE statement on a Kafka topic --- -In ksqlDB, you create tables from {{ site.aktm }} topics, and you create -tables of query results from other tables or streams. +In ksqlDB, you create tables from existing {{ site.aktm }} topics, create +tables that will create new {{ site.ak }} topics, or create tables of +query results from other tables or streams. -- Use the CREATE TABLE statement to create a table from a Kafka topic. +- Use the CREATE TABLE statement to create a table from an existing Kafka topic, + or a new Kafka topic. - Use the CREATE TABLE AS SELECT statement to create a table with query results from an existing table or stream. @@ -16,11 +18,11 @@ tables of query results from other tables or streams. Creating streams is similar to creating tables. For more information, see [Create a ksqlDB Stream](create-a-stream.md). -Create a Table from a Kafka Topic ---------------------------------- +Create a Table from an existing Kafka Topic +------------------------------------------- -Use the CREATE TABLE statement to create a table from an underlying -Kafka topic. The Kafka topic must exist already in your Kafka cluster. +Use the CREATE TABLE statement to create a table from an existing +underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster. The following examples show how to create tables from a Kafka topic, named `users`. To see these examples in action, create the `users` topic @@ -109,13 +111,16 @@ SELECT statement: SELECT * FROM users EMIT CHANGES; ``` -Your output should resemble: +Assuming the table has content, your output should resemble: ``` -1541439611069 | User_2 | 1498028899054 | User_2 | MALE | Region_1 -1541439611320 | User_6 | 1505677113995 | User_6 | FEMALE | Region_7 -1541439611396 | User_5 | 1491338621627 | User_5 | OTHER | Region_2 -1541439611536 | User_9 | 1492621173463 | User_9 | FEMALE | Region_3 ++---------------+--------+---------------+--------+--------+----------+ +| ROWTIME | ROWKEY | REGISTERTIME | USERID | GENDER | REGIONID | ++---------------+--------+---------------+--------+--------+----------+ +| 1541439611069 | User_2 | 1498028899054 | User_2 | MALE | Region_1 | +| 1541439611320 | User_6 | 1505677113995 | User_6 | FEMALE | Region_7 | +| 1541439611396 | User_5 | 1491338621627 | User_5 | OTHER | Region_2 | +| 1541439611536 | User_9 | 1492621173463 | User_9 | FEMALE | Region_3 | ^CQuery terminated ``` @@ -124,6 +129,33 @@ Press Ctrl+C to stop printing the query results. The table values update continuously with the most recent records, because the underlying `users` topic receives new messages continuously. +Create a Table backed by a new Kafka Topic +------------------------------------------ + +Use the CREATE TABLE statement to create a table without a preexisting +topic by providing the PARTITIONS count, and optionally the REPLICA count, +in the WITH clause. + +Taking the example of the users table from above, but where the underlying +Kafka topic does not already exist, you can create the table by pasting +the following CREATE TABLE statement into the CLI: + +```sql +CREATE TABLE users + (registertime BIGINT, + userid VARCHAR, + gender VARCHAR, + regionid VARCHAR) + WITH (KAFKA_TOPIC = 'users', + VALUE_FORMAT='JSON', + PARTITIONS=4, + REPLICAS=3 + KEY = 'userid'); +``` + +This will create the users topics for you with the supplied partition and replica count. + + Create a ksqlDB Table with Streaming Query Results -------------------------------------------------- @@ -206,9 +238,9 @@ Your output should resemble: ``` Query ID | Kafka Topic | Query String ------------------------------------------------------------------------------------------------------------------------------------------------------- + CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE' EMIT CHANGES; ------------------------------------------------------------------------------------------------------------------------------------------------------- + For detailed information on a Query run: EXPLAIN ; ``` @@ -225,9 +257,9 @@ to include a function like COUNT(*) in the SELECT clause. ```sql CREATE TABLE pageviews_table AS - SELECT viewtime, userid, pageid, COUNT(*) AS TOTAL + SELECT userid, pageid, COUNT(*) AS TOTAL FROM pageviews_original WINDOW TUMBLING (SIZE 1 MINUTES) - GROUP BY viewtime, userid, pageid + GROUP BY userid, pageid EMIT CHANGES; ``` @@ -250,30 +282,39 @@ SELECT * FROM pageviews_table EMIT CHANGES; Your output should resemble: ``` -1557183929488 | 1557183929488|+|User_9|+|Page_39 : Window{start=1557183900000 end=-} | 1557183929488 | User_9 | Page_39 | 1 -1557183930211 | 1557183930211|+|User_1|+|Page_79 : Window{start=1557183900000 end=-} | 1557183930211 | User_1 | Page_79 | 1 -1557183930687 | 1557183930687|+|User_9|+|Page_34 : Window{start=1557183900000 end=-} | 1557183930687 | User_9 | Page_34 | 1 -1557183929786 | 1557183929786|+|User_5|+|Page_12 : Window{start=1557183900000 end=-} | 1557183929786 | User_5 | Page_12 | 1 -1557183931095 | 1557183931095|+|User_3|+|Page_43 : Window{start=1557183900000 end=-} | 1557183931095 | User_3 | Page_43 | 1 -1557183930184 | 1557183930184|+|User_1|+|Page_29 : Window{start=1557183900000 end=-} | 1557183930184 | User_1 | Page_29 | 1 -1557183930727 | 1557183930726|+|User_6|+|Page_93 : Window{start=1557183900000 end=-} | 1557183930726 | User_6 | Page_93 | 1 ++---------------+---------------+---------------+------------------+--------+---------+------+ +| ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL| ++---------------+---------------+---------------+------------------+--------+---------+------+ +| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 | +| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 | +| 1557183930211 | 1557183900000 | 1557183960000 | User_1|+|Page_79 | User_1 | Page_79 | 1 | +| 1557183930687 | 1557183900000 | 1557183960000 | User_9|+|Page_34 | User_9 | Page_34 | 1 | +| 1557183929786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 2 | +| 1557183931095 | 1557183900000 | 1557183960000 | User_3|+|Page_43 | User_3 | Page_43 | 1 | +| 1557183930184 | 1557183900000 | 1557183960000 | User_1|+|Page_29 | User_1 | Page_29 | 1 | +| 1557183930727 | 1557183900000 | 1557183960000 | User_6|+|Page_93 | User_6 | Page_93 | 3 | ^CQuery terminated ``` +!!! note + It is possible for the same key to be output multiple time when emitting changes + to the table. This is because each time the row in the table changes it will be emitted. + Look up the value for a specific key within the table by using a SELECT statement. ```sql -SELECT * FROM pageviews_table WHERE ROWKEY='1557183929488|+|User_9|+|Page_39'; +SELECT * FROM pageviews_table WHERE ROWKEY='User_9|+|Page_39'; ``` Your output should resemble: ``` - ROWKEY STRING KEY | WINDOWSTART BIGINT KEY | VIEWTIME BIGINT | USERID STRING | PAGEID STRING | TOTAL BIGINT ----------------------------------------------------------------------------------------------------------------------------- - 1557183929488|+|User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1 ----------------------------------------------------------------------------------------------------------------------------- ++-----------------+---------------+---------------+--------+---------+-------+ + ROWKEY | WINDOWSTART | ROWTIME | USERID | PAGEID | TOTAL | +------------------+---------------+---------------+--------+---------+-------+ + User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1 | +Query terminated ``` diff --git a/docs-md/tutorials/clickstream-docker.md b/docs-md/tutorials/clickstream-docker.md index 8afa3e4e8fad..be609ebe3a18 100644 --- a/docs-md/tutorials/clickstream-docker.md +++ b/docs-md/tutorials/clickstream-docker.md @@ -305,11 +305,11 @@ SELECT * FROM EVENTS_PER_MIN EMIT CHANGES LIMIT 5; Your output should resemble: ``` -1536662819576 | 24 : Window{start=1536662760000 end=-} | 24 | 12 -1536662819685 | 4 : Window{start=1536662760000 end=-} | 4 | 19 -1536662847582 | 4 : Window{start=1536662820000 end=-} | 4 | 75 -1536662847586 | 24 : Window{start=1536662820000 end=-} | 24 | 101 -1536662879959 | 29 : Window{start=1536662820000 end=-} | 29 | 2 +1536662819576 | 1536662760000 | 1536662765000 | 24 | 24 | 12 +1536662819685 | 1536662760000 | 1536662765000 | 4 | 4 | 19 +1536662847582 | 1536662820000 | 1536662825000 | 4 | 4 | 75 +1536662847586 | 1536662820000 | 1536662825000 | 24 | 24 | 101 +1536662879959 | 1536662820000 | 1536662825000 | 29 | 29 | 2 Limit Reached Query terminated ``` @@ -323,11 +323,11 @@ SELECT * FROM PAGES_PER_MIN EMIT CHANGES LIMIT 5; Your output should resemble: ``` -1536662784977 | 21 : Window{start=1536662725000 end=-} | 21 | 2 -1536662789353 | 21 : Window{start=1536662730000 end=-} | 21 | 7 -1536662793715 | 21 : Window{start=1536662735000 end=-} | 21 | 20 -1536662799627 | 21 : Window{start=1536662740000 end=-} | 21 | 35 -1536662804534 | 21 : Window{start=1536662745000 end=-} | 21 | 40 +1536662784977 | 1536662725000 | 1536662730000 | 21 | 21 | 2 +1536662789353 | 1536662730000 | 1536662735000 | 21 | 21 | 7 +1536662793715 | 1536662735000 | 1536662740000 | 21 | 21 | 20 +1536662799627 | 1536662740000 | 1536662745000 | 21 | 21 | 35 +1536662804534 | 1536662745000 | 1536662750000 | 21 | 21 | 40 Limit Reached Query terminated ``` diff --git a/ksql-clickstream-demo/non-docker-clickstream.md b/ksql-clickstream-demo/non-docker-clickstream.md index c1e9659b2c58..14741b7dce0b 100644 --- a/ksql-clickstream-demo/non-docker-clickstream.md +++ b/ksql-clickstream-demo/non-docker-clickstream.md @@ -247,11 +247,11 @@ These steps will guide you through how to setup your environment and run the cli Your output should resemble: ```bash - 1503585475000 | 4 : Window{start=1503585475000 end=-} | 4 | 14 - 1503585480000 | 25 : Window{start=1503585480000 end=-} | 25 | 9 - 1503585480000 | 16 : Window{start=1503585480000 end=-} | 16 | 6 - 1503585475000 | 25 : Window{start=1503585475000 end=-} | 25 | 20 - 1503585480000 | 37 : Window{start=1503585480000 end=-} | 37 | 6 + 1503585475000 | 1503585475000 | 1503585480000 | 4 | 4 | 14 + 1503585480000 | 1503585480000 | 1503585485000 | 25 | 25 | 9 + 1503585480000 | 1503585480000 | 1503585485000 | 16 | 16 | 6 + 1503585475000 | 1503585475000 | 1503585480000 | 25 | 25 | 20 + 1503585480000 | 1503585480000 | 1503585485000 | 37 | 37 | 6 LIMIT reached Query terminated ``` diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index a3ad16ad39cd..45bd8a347004 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -136,9 +136,8 @@ public boolean valueContainsAny(final Set names) { * *

If the columns already exist in the value schema the function returns the same schema. * - * @param windowed indicates that the source is windowed; meaning key column copied to - * value will be of type {@link SqlTypes#STRING}, inline with how {@code SourceBuilder} - * creates a {@code String} {@code ROWKEY} for windowed sources. + * @param windowed indicates that the source is windowed; meaning {@code WINDOWSTART} and {@code + * WINDOWEND} columns will added to the value schema to represent the window bounds. * @return the new schema. */ public LogicalSchema withMetaAndKeyColsInValue(final boolean windowed) { @@ -244,16 +243,27 @@ private LogicalSchema rebuild( builder.add(Column.of(c.name(), c.type(), Namespace.VALUE, valueIndex++)); } + if (windowedKey) { + builder.add( + Column.of(SchemaUtil.WINDOWSTART_NAME, SqlTypes.BIGINT, Namespace.VALUE, valueIndex++)); + builder.add( + Column.of(SchemaUtil.WINDOWEND_NAME, SqlTypes.BIGINT, Namespace.VALUE, valueIndex++)); + } + for (final Column c : key) { - final SqlType type = windowedKey ? SqlTypes.STRING : c.type(); - builder.add(Column.of(c.name(), type, Namespace.VALUE, valueIndex++)); + builder.add(Column.of(c.name(), c.type(), Namespace.VALUE, valueIndex++)); } } for (final Column c : value) { + if (c.name().equals(SchemaUtil.WINDOWSTART_NAME) + || c.name().equals(SchemaUtil.WINDOWEND_NAME) + ) { + continue; + } + if (findColumnMatching( - (withNamespace(Namespace.META).or(withNamespace(Namespace.KEY)) - .and(withRef(c.ref())) + (withNamespace(Namespace.META).or(withNamespace(Namespace.KEY)).and(withRef(c.ref())) )).isPresent()) { continue; } diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index ffa879ac13aa..3a44f70de10e 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -48,7 +48,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -@SuppressWarnings({"UnstableApiUsage","unchecked", "OptionalGetWithoutIsPresent"}) +@SuppressWarnings({"UnstableApiUsage","unchecked"}) public class LogicalSchemaTest { private static final ColumnName K0 = ColumnName.of("k0"); @@ -66,6 +66,11 @@ public class LogicalSchemaTest { .valueColumn(F1, BIGINT) .build(); + // Constants used to represent column counts: + private static final int ROWTIME = 1; + private static final int ROWKEY = 1; + private static final int WINDOW_BOUNDS = 2; + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -399,15 +404,16 @@ public void shouldAddMetaAndKeyColumns() { .withMetaAndKeyColsInValue(false); // Then: - assertThat(result.value(), hasSize(schema.value().size() + 2)); + assertThat(result.value(), hasSize(schema.value().size() + ROWTIME + ROWKEY)); assertThat(result.value().get(0).name(), is(SchemaUtil.ROWTIME_NAME)); assertThat(result.value().get(0).type(), is(BIGINT)); assertThat(result.value().get(1).name(), is(SchemaUtil.ROWKEY_NAME)); assertThat(result.value().get(1).type(), is(STRING)); + assertThat(result.value().get(2).name(), is(F0)); } @Test - public void shouldAddKeyAsStringWhenAddingToValue() { + public void shouldAddWindowedMetaAndKeyColumns() { // Given: final LogicalSchema schema = LogicalSchema.builder() .keyColumn(SchemaUtil.ROWKEY_NAME, DOUBLE) @@ -420,9 +426,16 @@ public void shouldAddKeyAsStringWhenAddingToValue() { .withMetaAndKeyColsInValue(true); // Then: - assertThat(result.value().get(1).name(), is(SchemaUtil.ROWKEY_NAME)); - assertThat(result.value().get(1).type(), is(STRING)); - assertThat(result.key().get(0).type(), is(DOUBLE)); + assertThat(result.value(), hasSize(schema.value().size() + ROWTIME + WINDOW_BOUNDS + ROWKEY)); + assertThat(result.value().get(0).name(), is(SchemaUtil.ROWTIME_NAME)); + assertThat(result.value().get(0).type(), is(BIGINT)); + assertThat(result.value().get(1).name(), is(SchemaUtil.WINDOWSTART_NAME)); + assertThat(result.value().get(1).type(), is(BIGINT)); + assertThat(result.value().get(2).name(), is(SchemaUtil.WINDOWEND_NAME)); + assertThat(result.value().get(2).type(), is(BIGINT)); + assertThat(result.value().get(3).name(), is(SchemaUtil.ROWKEY_NAME)); + assertThat(result.value().get(3).type(), is(DOUBLE)); + assertThat(result.value().get(4).name(), is(F0)); } @Test @@ -484,6 +497,26 @@ public void shouldRemoveMetaColumnsFromValue() { )); } + @Test + public void shouldRemoveWindowedMetaColumnsFromValue() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, BIGINT) + .valueColumn(F1, BIGINT) + .build() + .withMetaAndKeyColsInValue(true); + + // When + final LogicalSchema result = schema.withoutMetaAndKeyColsInValue(); + + // Then: + assertThat(result, is(LogicalSchema.builder() + .valueColumn(F0, BIGINT) + .valueColumn(F1, BIGINT) + .build() + )); + } + @Test public void shouldRemoveMetaColumnsWhereEverTheyAre() { // Given: diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java index d1ed4d840ef0..7ab0b694a083 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java @@ -184,7 +184,7 @@ public SourceSchemas getFromSourceSchemas() { final Map schemaBySource = fromDataSources.stream() .collect(Collectors.toMap( AliasedDataSource::getAlias, - s -> s.getDataSource().getSchema() + Analysis::buildStreamsSchema )); return new SourceSchemas(schemaBySource); @@ -230,6 +230,14 @@ public List getTableFunctions() { return tableFunctions; } + private static LogicalSchema buildStreamsSchema(final AliasedDataSource s) { + // Include metadata & key columns in the value schema to match the schema the streams + // topology will use. + return s.getDataSource() + .getSchema() + .withMetaAndKeyColsInValue(s.getDataSource().getKsqlTopic().getKeyFormat().isWindowed()); + } + @Immutable public static final class Into { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 3e00b591967f..2a2d55ca4d9f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -313,26 +313,19 @@ private void throwOnUnknownColumnReference() { new ExpressionAnalyzer(analysis.getFromSourceSchemas()); for (final SelectExpression selectExpression : analysis.getSelectExpressions()) { - expressionAnalyzer.analyzeExpression(selectExpression.getExpression(), false); + expressionAnalyzer.analyzeExpression(selectExpression.getExpression()); } analysis.getWhereExpression().ifPresent(where -> { - final boolean allowWindowMetaFields = pullQuery - && analysis.getFromDataSources().get(0) - .getDataSource() - .getKsqlTopic() - .getKeyFormat() - .isWindowed(); - - expressionAnalyzer.analyzeExpression(where, allowWindowMetaFields); + expressionAnalyzer.analyzeExpression(where); }); for (final Expression expression : analysis.getGroupByExpressions()) { - expressionAnalyzer.analyzeExpression(expression, false); + expressionAnalyzer.analyzeExpression(expression); } analysis.getHavingExpression().ifPresent(having -> - expressionAnalyzer.analyzeExpression(having, false) + expressionAnalyzer.analyzeExpression(having) ); } @@ -356,16 +349,14 @@ protected AstNode visitJoin(final Join node, final Void context) { throw new KsqlException("Only equality join criteria is supported."); } - final Set srcsUsedInLeft = - new ExpressionAnalyzer(analysis.getFromSourceSchemas()).analyzeExpression( - comparisonExpression.getLeft(), - false - ); - final Set srcsUsedInRight = - new ExpressionAnalyzer(analysis.getFromSourceSchemas()).analyzeExpression( - comparisonExpression.getRight(), - false - ); + final ExpressionAnalyzer expressionAnalyzer = + new ExpressionAnalyzer(analysis.getFromSourceSchemas()); + + final Set srcsUsedInLeft = expressionAnalyzer + .analyzeExpression(comparisonExpression.getLeft()); + + final Set srcsUsedInRight = expressionAnalyzer + .analyzeExpression(comparisonExpression.getRight()); final SourceName leftSourceName = getOnlySourceForJoin( comparisonExpression.getLeft(), comparisonExpression, srcsUsedInLeft); @@ -587,13 +578,14 @@ private void visitSelectStar(final AllColumns allColumns) { : ""; final LogicalSchema schema = source.getDataSource().getSchema(); + final boolean windowed = source.getDataSource().getKsqlTopic().getKeyFormat().isWindowed(); // Non-join persistent queries only require value columns on SELECT * // where as joins and transient queries require all columns in the select: // See https://github.com/confluentinc/ksql/issues/3731 for more info final List valueColumns = persistent && !analysis.isJoin() ? schema.value() - : schema.columns(); + : schema.withMetaAndKeyColsInValue(windowed).value(); for (final Column column : valueColumns) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java index 04b1187237d9..0b41c386cf20 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java @@ -42,12 +42,9 @@ class ExpressionAnalyzer { this.sourceSchemas = Objects.requireNonNull(sourceSchemas, "sourceSchemas"); } - Set analyzeExpression( - final Expression expression, - final boolean allowWindowMetaFields - ) { + Set analyzeExpression(final Expression expression) { final Set referencedSources = new HashSet<>(); - final SourceExtractor extractor = new SourceExtractor(allowWindowMetaFields, referencedSources); + final SourceExtractor extractor = new SourceExtractor(referencedSources); extractor.process(expression, null); return referencedSources; } @@ -55,14 +52,9 @@ Set analyzeExpression( private final class SourceExtractor extends TraversalExpressionVisitor { private final Set referencedSources; - private final boolean allowWindowMetaFields; - SourceExtractor( - final boolean allowWindowMetaFields, - final Set referencedSources - ) { - this.allowWindowMetaFields = allowWindowMetaFields; - this.referencedSources = referencedSources; + SourceExtractor(final Set referencedSources) { + this.referencedSources = Objects.requireNonNull(referencedSources, "referencedSources"); } @Override @@ -90,13 +82,7 @@ private Optional getSource( final ColumnRef name ) { final Set sourcesWithField = sourceSchemas.sourcesWithField(sourceName, name); - if (sourcesWithField.isEmpty()) { - if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) { - // window start doesn't have a source as its a special hacky column - return Optional.empty(); - } - throw new KsqlException("Column '" + sourceName.map(n -> n.name() + KsqlConstants.DOT + name.name().name()) .orElse(name.name().name()) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java index 4f7e134ab7b5..47b7a61903ea 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java @@ -23,8 +23,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import io.confluent.ksql.execution.expression.tree.ComparisonExpression; -import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.QualifiedColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.StringLiteral; @@ -67,39 +65,6 @@ public void setUp() { analyzer = new ExpressionAnalyzer(sourceSchemas); } - @Test - public void shouldNotThrowOnWindowStartIfAllowed() { - // Given: - final Expression expression = new ComparisonExpression( - Type.EQUAL, - WINDOW_START_EXP, - OTHER_EXP - ); - - // When: - analyzer.analyzeExpression(expression, true); - - // Then: did not throw - } - - @Test - public void shouldThrowOnWindowStartIfNotAllowed() { - // Given: - final Expression expression = new ComparisonExpression( - Type.EQUAL, - WINDOW_START_EXP, - OTHER_EXP - ); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Column 'WINDOWSTART' cannot be resolved."); - - // When: - analyzer.analyzeExpression(expression, false); - } - @Test public void shouldGetSourceForUnqualifiedColumnRef() { // Given: @@ -112,7 +77,7 @@ public void shouldGetSourceForUnqualifiedColumnRef() { when(sourceSchemas.sourcesWithField(any(), any())).thenReturn(sourceNames("something")); // When: - analyzer.analyzeExpression(expression, true); + analyzer.analyzeExpression(expression); // Then: verify(sourceSchemas).sourcesWithField(Optional.of(SourceName.of("fully")), column); @@ -134,7 +99,7 @@ public void shouldThrowOnMultipleSources() { "Column 'just-name' is ambiguous. Could be any of: multiple.just-name, sources.just-name"); // When: - analyzer.analyzeExpression(expression, true); + analyzer.analyzeExpression(expression); } @Test @@ -149,7 +114,7 @@ public void shouldGetSourceForQualifiedColumnRef() { .thenReturn(ImmutableSet.of(SourceName.of("something"))); // When: - final Set columnRefs = analyzer.analyzeExpression(expression, true); + final Set columnRefs = analyzer.analyzeExpression(expression); // Then: verify(sourceSchemas).sourcesWithField( @@ -177,7 +142,7 @@ public void shouldThrowOnNoSources() { "Column 'just-name' cannot be resolved."); // When: - analyzer.analyzeExpression(expression, true); + analyzer.analyzeExpression(expression); } private static Set sourceNames(final String... names) { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index 8f49b7297d23..14db162b6fcb 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -423,11 +423,11 @@ { "name": "non-join leaves aliased system columns in output's value schema", "statements": [ - "CREATE STREAM INPUT (F0 INT) WITH (kafka_topic='input', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT F0, ROWTIME AS TIME, ROWKEY AS KEY FROM INPUT;" + "CREATE STREAM INPUT (F0 INT) WITH (kafka_topic='input', value_format='JSON', window_type='session');", + "CREATE STREAM OUTPUT AS SELECT F0, ROWTIME AS TIME, WINDOWSTART AS WSTART, WINDOWEND AS WEND, ROWKEY AS KEY FROM INPUT;" ], - "inputs": [{"topic": "input", "key": "k", "value": {"F0": 4}, "timestamp": 1}], - "outputs": [{"topic": "OUTPUT", "key": "k", "value": {"F0": 4, "TIME": 1, "KEY": "k"}, "timestamp": 1}] + "inputs": [{"topic": "input", "key": "k", "value": {"F0": 4}, "timestamp": 1, "window": {"start": 12, "end": 465, "type": "session"}}], + "outputs": [{"topic": "OUTPUT", "key": "k", "value": {"F0": 4, "TIME": 1, "WSTART": 12, "WEND": 465, "KEY": "k"}, "timestamp": 1, "window": {"start": 12, "end": 465, "type": "session"}}] }, { "name": "join should reject ROWTIME in projection", @@ -482,13 +482,13 @@ "statements": [ "CREATE STREAM LEFT_STREAM (F0 INT) WITH (kafka_topic='left', value_format='JSON', window_type='tumbling', window_size='1 second');", "CREATE STREAM RIGHT_STREAM (F1 INT) WITH (kafka_topic='right', value_format='JSON', window_type='tumbling', window_size='1 second');", - "CREATE STREAM OUTPUT as SELECT l.ROWTIME AS TIME, l.ROWKEY AS KEY, f0, f1 FROM left_stream l join right_stream r WITHIN 1 seconds ON l.rowkey = r.rowkey;" + "CREATE STREAM OUTPUT as SELECT l.ROWTIME AS TIME, l.WINDOWSTART AS WSTART, l.WINDOWEND AS WEND, l.ROWKEY AS KEY, f0, f1 FROM left_stream l join right_stream r WITHIN 1 seconds ON l.rowkey = r.rowkey;" ], "inputs": [ {"topic": "left", "key": "k", "value": {"F0": 4}, "timestamp": 1, "window": {"start": 0, "end": 1000, "type": "time"}}, {"topic": "right", "key": "k", "value": {"F1": 6}, "timestamp": 2, "window": {"start": 0, "end": 1000, "type": "time"}} ], - "outputs": [{"topic": "OUTPUT", "key": "k", "value": {"F0": 4, "F1": 6, "TIME": 1, "KEY": "k : Window{start=0 end=-}"}, "timestamp": 2, "window": {"start": 0, "end": 1000, "type": "time"}}] + "outputs": [{"topic": "OUTPUT", "key": "k", "value": {"F0": 4, "F1": 6, "TIME": 1, "WSTART": 0, "WEND": 1000, "KEY": "k"}, "timestamp": 2, "window": {"start": 0, "end": 1000, "type": "time"}}] }, { "name": "group-by rejects ROWKEY in projection", diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/hopping-windows.json b/ksql-functional-tests/src/test/resources/query-validation-tests/hopping-windows.json index cc6e84b4e80d..50d31c51f573 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/hopping-windows.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/hopping-windows.json @@ -135,24 +135,34 @@ "name": "import hopping stream", "statements": [ "CREATE STREAM TEST (ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', WINDOW_TYPE='HoPping', WINDOW_SIZE='30 seconds');", - "CREATE STREAM S2 as SELECT *, ROWKEY as KEY FROM test;" + "CREATE STREAM S2 as SELECT *, WINDOWSTART as WSTART, WINDOWEND as WEND, ROWKEY as RKEY FROM test;" ], "inputs": [ - {"topic": "test_topic", "key": "0", "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "test_topic", "key": "0", "value": "0,0", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "test_topic", "key": "0", "value": "0,5", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "test_topic", "key": "100", "value": "100,100", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "test_topic", "key": "100", "value": "100,100", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, - {"topic": "test_topic", "key": "100", "value": "100,100", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + {"topic": "test_topic", "key": "a", "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": "a", "value": "0,0", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": "a", "value": "0,5", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "test_topic", "key": "b", "value": "100,100", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "test_topic", "key": "b", "value": "100,100", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, + {"topic": "test_topic", "key": "b", "value": "100,100", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} ], "outputs": [ - {"topic": "S2", "key": "0", "value": "0,0,0 : Window{start=0 end=-}", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": "0", "value": "0,0,0 : Window{start=0 end=-}", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": "0", "value": "0,5,0 : Window{start=10000 end=-}", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "S2", "key": "100", "value": "100,100,100 : Window{start=10000 end=-}", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "S2", "key": "100", "value": "100,100,100 : Window{start=20000 end=-}", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, - {"topic": "S2", "key": "100", "value": "100,100,100 : Window{start=30000 end=-}", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} - ] + {"topic": "S2", "key": "a", "value": "0,0,0,30000,a", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": "a", "value": "0,0,0,30000,a", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": "a", "value": "0,5,10000,40000,a", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "S2", "key": "b", "value": "100,100,10000,40000,b", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "S2", "key": "b", "value": "100,100,20000,50000,b", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, + {"topic": "S2", "key": "b", "value": "100,100,30000,60000,b", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + ], + "post": { + "sources": [ + { + "name": "S2", + "type": "stream", + "keyFormat": {"format": "KAFKA", "windowType": "HOPPING", "windowSize": 30000}, + "schema": "`ROWKEY` STRING KEY, `ID` BIGINT, `VALUE` BIGINT, `WSTART` BIGINT, `WEND` BIGINT, `RKEY` STRING" + } + ] + } }, { "name": "import table with invalid window size", @@ -168,24 +178,34 @@ "name": "import hopping stream with non-STRING key", "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', WINDOW_TYPE='HoPping', WINDOW_SIZE='30 seconds');", - "CREATE STREAM S2 as SELECT *, ROWKEY as KEY FROM test;" + "CREATE STREAM S2 as SELECT *, WINDOWSTART as WSTART, WINDOWEND as WEND, ROWKEY as KEY FROM test;" ], "inputs": [ - {"topic": "test_topic", "key": 0,"value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "test_topic", "key": 0,"value": "0,0", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "test_topic", "key": 0,"value": "0,5", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "test_topic", "key": 100,"value": "100,100", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "test_topic", "key": 100,"value": "100,100", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, - {"topic": "test_topic", "key": 100,"value": "100,100", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + {"topic": "test_topic", "key": 0, "value": "1,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": 0, "value": "2,0", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": 0, "value": "3,5", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "test_topic", "key": 100, "value": "4,100", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "test_topic", "key": 100, "value": "5,100", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, + {"topic": "test_topic", "key": 100, "value": "6,100", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} ], "outputs": [ - {"topic": "S2", "key": 0,"value": "0,0,0 : Window{start=0 end=-}", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,0,0 : Window{start=0 end=-}", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,5,0 : Window{start=10000 end=-}", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,100,100 : Window{start=10000 end=-}", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,100,100 : Window{start=20000 end=-}", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,100,100 : Window{start=30000 end=-}", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} - ] + {"topic": "S2", "key": 0, "value": "1,0,0,30000,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "2,0,0,30000,0", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "3,5,10000,40000,0", "timestamp": 10000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "4,100,10000,40000,100", "timestamp": 30000, "window": {"start": 10000, "end": 40000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "5,100,20000,50000,100", "timestamp": 30000, "window": {"start": 20000, "end": 50000, "type": "time"}}, + {"topic": "S2", "key": 100, "value": "6,100,30000,60000,100", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + ], + "post": { + "sources": [ + { + "name": "S2", + "type": "stream", + "keyFormat": {"format": "KAFKA", "windowType": "HOPPING", "windowSize": 30000}, + "schema": "`ROWKEY` BIGINT KEY, `ID` BIGINT, `VALUE` BIGINT, `WSTART` BIGINT, `WEND` BIGINT, `KEY` BIGINT" + } + ] + } } ] } \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json index 594f1ac4ac64..c23b4a91c99c 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json @@ -1911,7 +1911,7 @@ "statements": [ "CREATE STREAM S1 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');", "CREATE STREAM S2 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='2 SECOND');", - "CREATE STREAM OUTPUT as SELECT S1.ID, S2.ID FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;" + "CREATE STREAM OUTPUT as SELECT * FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;" ], "inputs": [ {"topic": "left_topic", "key": 1, "value": {"ID": 1}, "timestamp": 0, "window": {"start": 0, "end": 5000, "type": "time"}}, @@ -1921,8 +1921,8 @@ {"topic": "right_topic", "key": 1, "value": {"ID": 5}, "timestamp": 2000, "window": {"start": 2000, "end": 4000, "type": "time"}} ], "outputs": [ - {"topic": "OUTPUT", "key": 1, "value": {"S1_ID": 1, "S2_ID": 4}, "timestamp": 0, "window": {"start": 0, "end":5000, "type": "time"}}, - {"topic": "OUTPUT", "key": 1, "value": {"S1_ID": 3, "S2_ID": 5}, "timestamp": 2000, "window": {"start": 2000, "end":7000, "type": "time"}} + {"topic": "OUTPUT", "key": 1, "value": {"S1_ROWTIME": 0, "S1_WINDOWSTART": 0, "S1_WINDOWEND": 5000, "S1_ROWKEY": 1, "S1_ID": 1, "S2_ROWTIME": 0, "S2_WINDOWSTART": 0, "S2_WINDOWEND": 2000, "S2_ROWKEY": 1, "S2_ID": 4}, "timestamp": 0, "window": {"start": 0, "end":5000, "type": "time"}}, + {"topic": "OUTPUT", "key": 1, "value": {"S1_ROWTIME": 2000, "S1_WINDOWSTART": 2000, "S1_WINDOWEND": 7000, "S1_ROWKEY": 1, "S1_ID": 3, "S2_ROWTIME": 2000, "S2_WINDOWSTART": 2000, "S2_WINDOWEND": 4000, "S2_ROWKEY": 1, "S2_ID": 5}, "timestamp": 2000, "window": {"start": 2000, "end":7000, "type": "time"}} ], "post": { "sources": [ @@ -1930,7 +1930,7 @@ "name": "OUTPUT", "type": "stream", "keyFormat": {"format": "KAFKA", "windowType": "HOPPING", "windowSize": 5000}, - "schema": "ROWKEY INT KEY, S1_ID BIGINT, S2_ID BIGINT" + "schema": "`ROWKEY` INTEGER KEY, `S1_ROWTIME` BIGINT, `S1_WINDOWSTART` BIGINT, `S1_WINDOWEND` BIGINT, `S1_ROWKEY` INTEGER, `S1_ID` BIGINT, `S2_ROWTIME` BIGINT, `S2_WINDOWSTART` BIGINT, `S2_WINDOWEND` BIGINT, `S2_ROWKEY` INTEGER, `S2_ID` BIGINT" } ] } diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json b/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json index f0a3b60eeff4..b6a71a5219de 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json @@ -40,19 +40,60 @@ "CREATE STREAM S2 as SELECT *, ROWKEY as Key FROM test;" ], "inputs": [ - {"topic": "test_topic", "key": "0", "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, - {"topic": "test_topic", "key": "0", "value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, - {"topic": "test_topic", "key": "0", "value": "0,5", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, - {"topic": "test_topic", "key": "1", "value": "1,100", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, - {"topic": "test_topic", "key": "1", "value": "1,200", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} + {"topic": "test_topic", "key": "a", "value": "1,0", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "test_topic", "key": "a", "value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "test_topic", "key": "a", "value": "2,5", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, + {"topic": "test_topic", "key": "b", "value": "3,100", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, + {"topic": "test_topic", "key": "b", "value": "4,200", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} ], "outputs": [ - {"topic": "S2", "key": "0", "value": "0,0,0 : Window{start=0 end=0}", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, - {"topic": "S2", "key": "0", "value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, - {"topic": "S2", "key": "0", "value": "0,5,0 : Window{start=0 end=10000}", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, - {"topic": "S2", "key": "1", "value": "1,100,1 : Window{start=10000 end=10000}", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, - {"topic": "S2", "key": "1", "value": "1,200,1 : Window{start=10000 end=40000}", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} - ] + {"topic": "S2", "key": "a", "value": "1,0,a", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "S2", "key": "a", "value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "S2", "key": "a", "value": "2,5,a", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, + {"topic": "S2", "key": "b", "value": "3,100,b", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, + {"topic": "S2", "key": "b", "value": "4,200,b", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} + ], + "post": { + "sources": [ + { + "name": "S2", + "type": "stream", + "keyFormat": {"format": "KAFKA", "windowType": "SESSION", "windowSize": null}, + "schema": "ROWKEY STRING KEY, ID BIGINT, VALUE BIGINT, KEY STRING" + } + ] + } + }, + { + "name": "import session stream with non-STRING key", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', WINDOW_TYPE='SESSION');", + "CREATE STREAM S2 as SELECT *, WINDOWSTART as WSTART, WINDOWEND as WEND, ROWKEY as Key FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "1", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "test_topic", "key": 0, "value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "test_topic", "key": 0, "value": "2", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, + {"topic": "test_topic", "key": 1, "value": "3", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, + {"topic": "test_topic", "key": 1, "value": "4", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "1,0,0,0", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "S2", "key": 0, "value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "S2", "key": 0, "value": "2,0,10000,0", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": "3,10000,10000,1", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, + {"topic": "S2", "key": 1, "value": "4,10000,40000,1", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} + ], + "post": { + "sources": [ + { + "name": "S2", + "type": "stream", + "keyFormat": {"format": "KAFKA", "windowType": "SESSION", "windowSize": null}, + "schema": "`ROWKEY` INTEGER KEY, `ID` BIGINT, `WSTART` BIGINT, `WEND` BIGINT, `KEY` INTEGER" + } + ] + } }, { "name": "import table with invalid window size", diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/string.json b/ksql-functional-tests/src/test/resources/query-validation-tests/string.json new file mode 100644 index 000000000000..34a26bc09b6b --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/string.json @@ -0,0 +1,38 @@ +{ + "tests": [ + { + "name": "< operator", + "statements": [ + "CREATE STREAM INPUT (text STRING) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM OUTPUT AS select text, text < 'b2' from INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "value": "a1"}, + {"topic": "test_topic", "value": "b1"}, + {"topic": "test_topic", "value": "B2"}, + {"topic": "test_topic", "value": "b2"}, + {"topic": "test_topic", "value": "b3"}, + {"topic": "test_topic", "value": "b10"}, + {"topic": "test_topic", "value": "b01"} + ], + "outputs": [ + {"topic": "OUTPUT", "value": "a1,true"}, + {"topic": "OUTPUT", "value": "b1,true"}, + {"topic": "OUTPUT", "value": "B2,true"}, + {"topic": "OUTPUT", "value": "b2,false"}, + {"topic": "OUTPUT", "value": "b3,false"}, + {"topic": "OUTPUT", "value": "b10,true"}, + {"topic": "OUTPUT", "value": "b01,true"} + ], + "post": { + "sources": [ + { + "name": "OUTPUT", + "type": "stream", + "schema": "`ROWKEY` STRING KEY, `TEXT` STRING, `KSQL_COL_1` BOOLEAN" + } + ] + } + } + ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/tumbling-windows.json b/ksql-functional-tests/src/test/resources/query-validation-tests/tumbling-windows.json index 75f54108a2a1..a95877de8ce1 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/tumbling-windows.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/tumbling-windows.json @@ -110,17 +110,56 @@ "CREATE STREAM S2 as SELECT *, ROWKEY as KEY FROM test;" ], "inputs": [ - {"topic": "test_topic", "key": "0", "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "test_topic", "key": "0", "value": "0,100", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "test_topic", "key": "0", "value": "0,10", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "test_topic", "key": "0", "value": "0,50", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + {"topic": "test_topic", "key": "a", "value": "1,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": "a", "value": "2,100", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": "a", "value": "3,10", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": "a", "value": "4,50", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} ], "outputs": [ - {"topic": "S2", "key": "0", "value": "0,0,0 : Window{start=0 end=-}", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": "0", "value": "0,100,0 : Window{start=0 end=-}", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": "0", "value": "0,10,0 : Window{start=0 end=-}", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": "0", "value": "0,50,0 : Window{start=30000 end=-}", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} - ] + {"topic": "S2", "key": "a", "value": "1,0,a", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": "a", "value": "2,100,a", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": "a", "value": "3,10,a", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": "a", "value": "4,50,a", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + ], + "post": { + "sources": [ + { + "name": "S2", + "type": "stream", + "keyFormat": {"format": "KAFKA", "windowType": "TUMBLING", "windowSize": 30000}, + "schema": "ROWKEY STRING KEY, ID BIGINT, VALUE BIGINT, KEY STRING" + } + ] + } + }, + { + "name": "import tumbling stream with non-STRING key", + "statements": [ + "CREATE STREAM TEST (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', WINDOW_TYPE='Tumbling', WINDOW_SIZE='30 seconds');", + "CREATE STREAM S2 as SELECT *, WINDOWSTART as WSTART, WINDOWEND as WEND, ROWKEY as KEY FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "1", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": 0, "value": "2", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": 0, "value": "3", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "test_topic", "key": 0, "value": "4", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "1,0,30000,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "2,0,30000,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "3,0,30000,0", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": "4,30000,60000,0", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}} + ], + "post": { + "sources": [ + { + "name": "S2", + "type": "stream", + "keyFormat": {"format": "KAFKA", "windowType": "TUMBLING", "windowSize": 30000}, + "schema": "ROWKEY INT KEY, ID BIGINT, WSTART BIGINT, WEND BIGINT, KEY INT" + } + ] + } } ] } \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/push-queries.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/push-queries.json index 9f0a76e58c9c..aa3d00aad089 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/push-queries.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/push-queries.json @@ -98,9 +98,9 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `COUNT` BIGINT"}}, - {"row":{"columns":[12345, "11 : Window{start=12000 end=-}", 1]}}, - {"row":{"columns":[12365, "11 : Window{start=12000 end=-}", 2]}}, + {"header":{"schema":"`ROWTIME` BIGINT, `WINDOWSTART` BIGINT, `WINDOWEND` BIGINT, `ROWKEY` STRING, `COUNT` BIGINT"}}, + {"row":{"columns":[12345, 12000, 13000, "11", 1]}}, + {"row":{"columns":[12365, 12000, 13000, "11", 2]}}, {"finalMessage":"Limit Reached"} ]} ] @@ -120,9 +120,9 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `COUNT` BIGINT"}}, - {"row":{"columns":[12345, "11 : Window{start=12000 end=-}", 1]}}, - {"row":{"columns":[12365, "11 : Window{start=12000 end=-}", 2]}}, + {"header":{"schema":"`ROWTIME` BIGINT, `WINDOWSTART` BIGINT, `WINDOWEND` BIGINT, `ROWKEY` INTEGER, `COUNT` BIGINT"}}, + {"row":{"columns":[12345, 12000, 13000, 11, 1]}}, + {"row":{"columns":[12365, 12000, 13000, 11, 2]}}, {"finalMessage":"Limit Reached"} ]} ] @@ -142,9 +142,9 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `COUNT` BIGINT"}}, - {"row":{"columns":[12345, "11 : Window{start=12000 end=-}", 1]}}, - {"row":{"columns":[12365, "11 : Window{start=12000 end=-}", 2]}}, + {"header":{"schema":"`ROWTIME` BIGINT, `WINDOWSTART` BIGINT, `WINDOWEND` BIGINT, `ROWKEY` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":[12345, 12000, 13000, 11, 1]}}, + {"row":{"columns":[12365, 12000, 13000, 11, 2]}}, {"finalMessage":"Limit Reached"} ]} ] @@ -164,9 +164,9 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `COUNT` BIGINT"}}, - {"row":{"columns":[12345, "11.1 : Window{start=12000 end=-}", 1]}}, - {"row":{"columns":[12365, "11.1 : Window{start=12000 end=-}", 2]}}, + {"header":{"schema":"`ROWTIME` BIGINT, `WINDOWSTART` BIGINT, `WINDOWEND` BIGINT, `ROWKEY` DOUBLE, `COUNT` BIGINT"}}, + {"row":{"columns":[12345, 12000, 13000, 11.1, 1]}}, + {"row":{"columns":[12365, 12000, 13000, 11.1, 2]}}, {"finalMessage":"Limit Reached"} ]} ] @@ -225,8 +225,8 @@ "responses": [ {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `ID` INTEGER"}}, - {"row":{"columns":[34555, "11 : Window{start=12345 end=34555}", 100]}}, + {"header":{"schema":"`ROWTIME` BIGINT, `WINDOWSTART` BIGINT, `WINDOWEND` BIGINT, `ROWKEY` BIGINT, `ID` INTEGER"}}, + {"row":{"columns":[34555, 12345, 34555, 11, 100]}}, {"finalMessage":"Limit Reached"} ]} ] @@ -235,16 +235,67 @@ "name": "imported windowed table - DOUBLE KEY", "statements": [ "CREATE TABLE INPUT (ROWKEY DOUBLE KEY, ID INT) WITH (kafka_topic='test_topic', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='1 SECOND');", - "SELECT * FROM INPUT EMIT CHANGES LIMIT 1;" + "SELECT * FROM INPUT WHERE ROWKEY = 11.1 AND WINDOWSTART=12000 EMIT CHANGES LIMIT 1;" ], "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": 11.1, "value": {"id": 100}, "window": {"start": 11000, "end": 12000, "type": "time"}}, + {"topic": "test_topic", "timestamp": 12345, "key": 10.1, "value": {"id": 100}, "window": {"start": 12000, "end": 13000, "type": "time"}}, {"topic": "test_topic", "timestamp": 12345, "key": 11.1, "value": {"id": 100}, "window": {"start": 12000, "end": 13000, "type": "time"}} ], "responses": [ {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `ID` INTEGER"}}, - {"row":{"columns":[12345, "11.1 : Window{start=12000 end=-}", 100]}}, + {"header":{"schema":"`ROWTIME` BIGINT, `WINDOWSTART` BIGINT, `WINDOWEND` BIGINT, `ROWKEY` DOUBLE, `ID` INTEGER"}}, + {"row":{"columns":[12345, 12000, 13000, 11.1, 100]}}, + {"finalMessage":"Limit Reached"} + ]} + ] + }, + { + "name": "windowed table - access window bounds in select", + "statements": [ + "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", + "SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss Z') AS WEND, COUNT FROM AGGREGATE EMIT CHANGES LIMIT 2;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 1580223282123, "key": "11", "value": {"id": 100}}, + {"topic": "test_topic", "timestamp": 1580223282456, "key": "11", "value": {"id": 101}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`WSTART` STRING, `WEND` STRING, `COUNT` BIGINT"}}, + {"row":{"columns":["2020-01-28 14:54:42 +0000", "2020-01-28 14:54:43 +0000", 1]}}, + {"row":{"columns":["2020-01-28 14:54:42 +0000", "2020-01-28 14:54:43 +0000", 2]}}, + {"finalMessage":"Limit Reached"} + ]} + ] + }, + { + "name": "windowed join", + "statements": [ + "CREATE STREAM S1 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');", + "CREATE STREAM S2 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='2 SECOND');", + "CREATE STREAM OUTPUT as SELECT * FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;", + "SELECT * FROM OUTPUT EMIT CHANGES LIMIT 2;" + ], + "inputs": [ + {"topic": "left_topic", "key": 1, "value": {"ID": 1}, "timestamp": 0, "window": {"start": 0, "end": 5000, "type": "time"}}, + {"topic": "left_topic", "key": 1, "value": {"ID": 2}, "timestamp": 1000, "window": {"start": 1000, "end": 6000, "type": "time"}}, + {"topic": "left_topic", "key": 1, "value": {"ID": 3}, "timestamp": 2000, "window": {"start": 2000, "end": 7000, "type": "time"}}, + {"topic": "right_topic", "key": 1, "value": {"ID": 4}, "timestamp": 0, "window": {"start": 0, "end": 2000, "type": "time"}}, + {"topic": "right_topic", "key": 1, "value": {"ID": 5}, "timestamp": 2000, "window": {"start": 2000, "end": 4000, "type": "time"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWTIME` BIGINT, `WINDOWSTART` BIGINT, `WINDOWEND` BIGINT, `ROWKEY` INTEGER, `S1_ROWTIME` BIGINT, `S1_WINDOWSTART` BIGINT, `S1_WINDOWEND` BIGINT, `S1_ROWKEY` INTEGER, `S1_ID` BIGINT, `S2_ROWTIME` BIGINT, `S2_WINDOWSTART` BIGINT, `S2_WINDOWEND` BIGINT, `S2_ROWKEY` INTEGER, `S2_ID` BIGINT"}}, + {"row":{"columns":[0, 0, 5000, 1, 0, 0, 5000, 1, 1, 0, 0, 2000, 1, 4]}}, + {"row":{"columns":[2000, 2000, 7000, 1, 2000, 2000, 7000, 1, 3, 2000, 2000, 4000, 1, 5]}}, {"finalMessage":"Limit Reached"} ]} ] diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java index 4cbe8a7f743d..cc8408cb30ca 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java @@ -36,6 +36,9 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlConfig; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -54,13 +57,16 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.state.KeyValueStore; public final class SourceBuilder { + private static final Collection NULL_WINDOWED_KEY_COLUMNS = Collections.unmodifiableList( + Arrays.asList(null, null, null) + ); + private SourceBuilder() { } @@ -269,7 +275,7 @@ private static KStream buildKStream( final SourceStep streamSource, final KsqlQueryBuilder queryBuilder, final Consumed consumed, - final Function rowKeyGenerator + final Function> rowKeyGenerator ) { final KStream stream = queryBuilder.getStreamsBuilder() .stream(streamSource.getTopicName(), consumed); @@ -282,7 +288,7 @@ private static KTable buildKTable( final SourceStep streamSource, final KsqlQueryBuilder queryBuilder, final Consumed consumed, - final Function rowKeyGenerator, + final Function> rowKeyGenerator, final Materialized> materialized ) { final KTable table = queryBuilder.getStreamsBuilder() @@ -346,43 +352,41 @@ private static String tableChangeLogOpName(final ExecutionStepPropertiesV1 props return StreamsUtil.buildOpName(stacker.push("Reduce").getQueryContext()); } - private static Function, Object> windowedRowKeyGenerator( + private static Function, Collection> windowedRowKeyGenerator( final LogicalSchema schema ) { final org.apache.kafka.connect.data.Field keyField = getKeySchemaSingleField(schema); return windowedKey -> { if (windowedKey == null) { - return null; + return NULL_WINDOWED_KEY_COLUMNS; } final Window window = windowedKey.window(); - final long start = window.start(); - final String end = window instanceof SessionWindow ? String.valueOf(window.end()) : "-"; final Object key = windowedKey.key().get(keyField); - return String.format("%s : Window{start=%d end=%s}", key, start, end); + return Arrays.asList(window.start(), window.end(), key); }; } - private static Function nonWindowedRowKeyGenerator( + private static Function> nonWindowedRowKeyGenerator( final LogicalSchema schema ) { final org.apache.kafka.connect.data.Field keyField = getKeySchemaSingleField(schema); return key -> { if (key == null) { - return null; + return Collections.singletonList(null); } - return key.get(keyField); + return Collections.singletonList(key.get(keyField)); }; } private static class AddKeyAndTimestampColumns implements ValueTransformerWithKeySupplier { - private final Function rowKeyGenerator; + private final Function> rowKeyGenerator; - AddKeyAndTimestampColumns(final Function rowKeyGenerator) { + AddKeyAndTimestampColumns(final Function> rowKeyGenerator) { this.rowKeyGenerator = requireNonNull(rowKeyGenerator, "rowKeyGenerator"); } @@ -403,7 +407,7 @@ public GenericRow transform(final K key, final GenericRow row) { } row.getColumns().add(0, processorContext.timestamp()); - row.getColumns().add(1, rowKeyGenerator.apply(key)); + row.getColumns().addAll(1, rowKeyGenerator.apply(key)); return row; } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java index ec307da36b62..1eaecfc6f9e1 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java @@ -56,6 +56,7 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.SchemaUtil; import java.util.HashSet; import java.util.LinkedList; import java.util.Optional; @@ -97,18 +98,28 @@ public class SourceBuilderTest { private static final LogicalSchema SOURCE_SCHEMA = LogicalSchema.builder() + .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT) .valueColumn(ColumnName.of("field1"), SqlTypes.STRING) .valueColumn(ColumnName.of("field2"), SqlTypes.BIGINT) .build(); + private static final Schema KEY_SCHEMA = SchemaBuilder.struct() - .field("k1", Schema.OPTIONAL_STRING_SCHEMA) + .field(SchemaUtil.ROWKEY_NAME.name(), Schema.OPTIONAL_FLOAT64_SCHEMA) .build(); - private static final Struct KEY = new Struct(KEY_SCHEMA).put("k1", "foo"); + + private static final double A_KEY = 10.11; + + private static final Struct KEY = new Struct(KEY_SCHEMA) + .put(SchemaUtil.ROWKEY_NAME.name(), A_KEY); + private static final SourceName ALIAS = SourceName.of("alias"); private static final LogicalSchema SCHEMA = SOURCE_SCHEMA .withMetaAndKeyColsInValue(false); + private static final LogicalSchema WINDOWED_SCHEMA = SOURCE_SCHEMA + .withMetaAndKeyColsInValue(true); + private static final KsqlConfig KSQL_CONFIG = new KsqlConfig(ImmutableMap.of()); private static final Optional TIMESTAMP_COLUMN = Optional.of( @@ -118,6 +129,9 @@ public class SourceBuilderTest { ) ); + private static final long A_WINDOW_START = 10L; + private static final long A_WINDOW_END = 20L; + private final Set SERDE_OPTIONS = new HashSet<>(); private final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema.from(SOURCE_SCHEMA, SERDE_OPTIONS); private static final String TOPIC_NAME = "topic"; @@ -158,7 +172,7 @@ public class SourceBuilderTest { @Mock private Materialized> materialized; @Captor - private ArgumentCaptor transformSupplierCaptor; + private ArgumentCaptor> transformSupplierCaptor; @Captor private ArgumentCaptor timestampExtractorCaptor; private final GenericRow row = new GenericRow(new LinkedList<>(ImmutableList.of("baz", 123))); @@ -399,7 +413,7 @@ public void shouldReturnCorrectSchemaForWindowedSourceStream() { final KStreamHolder builtKstream = windowedStreamSource.build(planBuilder); // Then: - assertThat(builtKstream.getSchema(), is(SCHEMA)); + assertThat(builtKstream.getSchema(), is(WINDOWED_SCHEMA)); } @Test @@ -411,7 +425,7 @@ public void shouldReturnCorrectSchemaForWindowedSourceTable() { final KTableHolder> builtKTable = windowedTableSource.build(planBuilder); // Then: - assertThat(builtKTable.getSchema(), is(SCHEMA)); + assertThat(builtKTable.getSchema(), is(WINDOWED_SCHEMA)); } @Test @@ -449,7 +463,7 @@ public void shouldAddRowTimeAndRowKeyColumnsToNonWindowedStream() { final GenericRow withTimestamp = transformer.transform(KEY, row); // Then: - assertThat(withTimestamp, equalTo(new GenericRow(456L, "foo", "baz", 123))); + assertThat(withTimestamp, equalTo(new GenericRow(456L, A_KEY, "baz", 123))); } @Test @@ -463,7 +477,7 @@ public void shouldAddRowTimeAndRowKeyColumnsToNonWindowedTable() { final GenericRow withTimestamp = transformer.transform(KEY, row); // Then: - assertThat(withTimestamp, equalTo(new GenericRow(456L, "foo", "baz", 123))); + assertThat(withTimestamp, equalTo(new GenericRow(456L, A_KEY, "baz", 123))); } @Test @@ -491,7 +505,7 @@ public void shouldAddRowTimeAndTimeWindowedRowKeyColumnsToStream() { final Windowed key = new Windowed<>( KEY, - new TimeWindow(10L, 20L) + new TimeWindow(A_WINDOW_START, A_WINDOW_END) ); // When: @@ -499,7 +513,7 @@ public void shouldAddRowTimeAndTimeWindowedRowKeyColumnsToStream() { // Then: assertThat(withTimestamp, - equalTo(new GenericRow(456L, "foo : Window{start=10 end=-}", "baz", 123))); + equalTo(new GenericRow(456L, A_WINDOW_START, A_WINDOW_END, A_KEY, "baz", 123))); } @Test @@ -511,7 +525,7 @@ public void shouldAddRowTimeAndTimeWindowedRowKeyColumnsToTable() { final Windowed key = new Windowed<>( KEY, - new TimeWindow(10L, 20L) + new TimeWindow(A_WINDOW_START, A_WINDOW_END) ); // When: @@ -519,7 +533,7 @@ public void shouldAddRowTimeAndTimeWindowedRowKeyColumnsToTable() { // Then: assertThat(withTimestamp, - equalTo(new GenericRow(456L, "foo : Window{start=10 end=-}", "baz", 123))); + is(new GenericRow(456L, A_WINDOW_START, A_WINDOW_END, A_KEY, "baz", 123))); } @Test @@ -531,7 +545,7 @@ public void shouldAddRowTimeAndSessionWindowedRowKeyColumnsToStream() { final Windowed key = new Windowed<>( KEY, - new SessionWindow(10L, 20L) + new SessionWindow(A_WINDOW_START, A_WINDOW_END) ); // When: @@ -539,7 +553,7 @@ public void shouldAddRowTimeAndSessionWindowedRowKeyColumnsToStream() { // Then: assertThat(withTimestamp, - equalTo(new GenericRow(456L, "foo : Window{start=10 end=20}", "baz", 123))); + equalTo(new GenericRow(456L, A_WINDOW_START, A_WINDOW_END, A_KEY, "baz", 123))); } @Test @@ -551,7 +565,7 @@ public void shouldAddRowTimeAndSessionWindowedRowKeyColumnsToTable() { final Windowed key = new Windowed<>( KEY, - new SessionWindow(10L, 20L) + new SessionWindow(A_WINDOW_START, A_WINDOW_END) ); // When: @@ -559,7 +573,7 @@ public void shouldAddRowTimeAndSessionWindowedRowKeyColumnsToTable() { // Then: assertThat(withTimestamp, - equalTo(new GenericRow(456L, "foo : Window{start=10 end=20}", "baz", 123))); + equalTo(new GenericRow(456L, A_WINDOW_START, A_WINDOW_END, A_KEY, "baz", 123))); } @Test