Skip to content

Commit

Permalink
fix: partial push & persistent query support for window bounds columns (
Browse files Browse the repository at this point in the history
#4401)

* chore: support window bounds columns in persistent and pull queries

fixes: #3871

Is needed to fix:
- #3633
- #4015

Before this change the version of `ROWKEY` copied to the value schema during processing of data in the Streams topology was always of type `STRING` regardless of the actual key type. This is because windowed keys had a `ROWKEY` in the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`. While `ROWKEY` in the value schema was a `STRING`, `ROWKEY` in the key schema was the actual type, e.g. `INT`.  This is confusing and will lead to bugs.  Also, the formated string isn't very friendly for users.

This change looks to introduce the `WINDOWSTART` and `WINDOWEND` columns that were reserved in #4388. The obvious approach would be to add `WINDOWSTART` and `WINDOWEND` as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this.

For now, we only add the windows bounds columns when we `LogicalSchema.withMetaAndKeyColsInValue(true)`. This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work.

With this change `ROWKEY` for windowed sources no longer has the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`: `ROWKEY` is now only the _actual_ key and the window bounds can be accessed by `WINDOWSTART` and `WINDOWEND`. These two window bounds columns are included in a pull `SELECT *` query. Likewise a join will include the window bounds columns from both sides in the join result if the join is `SELECT *`.

## Examples:

### Push queries

* A select * on a windowed source will not include `WINDOWSTART` and `WINDOWEND`. `ROWKEY` will be the actual key, not a formatted string.

```
ksql> SELECT * FROM windowedSource emit changes

-- old output
+---------------+------------------------------------------------------+--------+---------+------+
| ROWTIME       | ROWKEY                                               | USERID | PAGEID  | TOTAL|
+---------------+------------------------------------------------------+--------+---------+------+
| 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1    |
| 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1    |

-- new output
+---------------+---------------+---------------+------------------+--------+---------+------+
| ROWTIME       | WINDOWSTART   | WINDOWEND     | ROWKEY           | USERID | PAGEID  | TOTAL|
+---------------+---------------+---------------+------------------+--------+---------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1    |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1    |
```

* `WINDOWSTART` and `WINDOWEND` are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc.

For example:

```sql
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes;
```

However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax:

**KNOWN LIMITATION**
Where a query builds a windowed source from a non-windowed source the window bounds columns are not available.  For example:

```
-- won't yet work:
SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY;
```

This issue is tracked by: #4397

* Joins of windowed sources include the `WINDOWSTART` and `WINDOWEND` columns from both sides.

### Pull queries

**KNOWN LIMITATION**
Pull queries have not been updated yet. This will be done in a follow up PR #3633. This is mainly to keep this PR manageable.

### Persistent queries

Persistent C*AS queries work similar to push queries and have the same known limitation.

BREAKING CHANGE: Any query of a windowed source that uses `ROWKEY` in the SELECT projection will see the contents of `ROWKEY` change from a formatted `STRING` containing the underlying key and the window bounds, to just the underlying key.  Queries can access the window bounds using `WINDOWSTART` and `WINDOWEND`.

BREAKING CHANGE: Joins on windowed sources now include `WINDOWSTART` and `WINDOWEND` columns from both sides on a `SELECT *`.
  • Loading branch information
big-andy-coates authored Jan 29, 2020
1 parent fd3b751 commit 48aa6ec
Show file tree
Hide file tree
Showing 19 changed files with 520 additions and 249 deletions.
47 changes: 38 additions & 9 deletions docs-md/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@ tagline: Create a Stream from a Kafka topic
description: Learn how to use the CREATE STREAM statement on a Kafka topic
---

In ksqlDB, you create streams from {{ site.aktm }} topics, and you create
streams of query results from other streams.
In ksqlDB, you create streams from existing {{ site.aktm }} topics, create
streams that will create new {{ site.aktm }} topics, or create streams of
query results from other streams.

- Use the CREATE STREAM statement to create a stream from a Kafka
topic.
- Use the CREATE STREAM statement to create a stream from an existing Kafka
topic, or a new Kafka topic.
- Use the CREATE STREAM AS SELECT statement to create a query stream
from an existing stream.

!!! note
Creating tables is similar to creating streams. For more information,
see [Create a ksqlDB Table](create-a-table.md).

Create a Stream from a Kafka topic
----------------------------------
Create a Stream from an existing Kafka topic
--------------------------------------------

Use the CREATE STREAM statement to create a stream from an underlying
Use the CREATE STREAM statement to create a stream from an existing underlying
Kafka topic. The Kafka topic must exist already in your Kafka cluster.

The following examples show how to create streams from a Kafka topic,
Expand Down Expand Up @@ -102,6 +103,9 @@ The previous SQL statement makes no assumptions about the Kafka message
key in the underlying Kafka topic. If the value of the message key in
the topic is the same as one of the columns defined in the stream, you
can specify the key in the WITH clause of the CREATE STREAM statement.
If you use this column name later to perform a join or a repartition, ksqlDB
knows that no repartition is needed. In effect, the named column becomes an
alias for ROWKEY.

For example, if the Kafka message key has the same value as the `pageid`
column, you can write the CREATE STREAM statement like this:
Expand Down Expand Up @@ -180,6 +184,31 @@ Kafka topic : pageviews (partitions: 1, replication: 1)
[...]
```

Create a Stream backed by a new Kafka Topic
-------------------------------------------

Use the CREATE STREAM statement to create a stream without a preexisting
topic by providing the PARTITIONS count, and optionally the REPLICA count,
in the WITH clause.

Taking the example of the pageviews table from above, but where the underlying
Kafka topic does not already exist, you can create the stream by pasting
the following CREATE STREAM statement into the CLI:

```sql
CREATE STREAM pageviews
(viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
PARTITIONS=4,
REPLICAS=3
VALUE_FORMAT='DELIMITED')
EMIT CHANGES;
```

This will create the pageviews topics for you with the supplied partition and replica count.

Create a Persistent Streaming Query from a Stream
-------------------------------------------------

Expand Down Expand Up @@ -276,9 +305,9 @@ Your output should resemble:

```
Query ID | Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------------------------------------
CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20' EMIT CHANGES;
--------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
```

Expand Down
97 changes: 69 additions & 28 deletions docs-md/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@ tagline: Create a Table from a Kafka topic
description: Learn how to use the CREATE TABLE statement on a Kafka topic
---

In ksqlDB, you create tables from {{ site.aktm }} topics, and you create
tables of query results from other tables or streams.
In ksqlDB, you create tables from existing {{ site.aktm }} topics, create
tables that will create new {{ site.ak }} topics, or create tables of
query results from other tables or streams.

- Use the CREATE TABLE statement to create a table from a Kafka topic.
- Use the CREATE TABLE statement to create a table from an existing Kafka topic,
or a new Kafka topic.
- Use the CREATE TABLE AS SELECT statement to create a table with
query results from an existing table or stream.

!!! note
Creating streams is similar to creating tables. For more information,
see [Create a ksqlDB Stream](create-a-stream.md).

Create a Table from a Kafka Topic
---------------------------------
Create a Table from an existing Kafka Topic
-------------------------------------------

Use the CREATE TABLE statement to create a table from an underlying
Kafka topic. The Kafka topic must exist already in your Kafka cluster.
Use the CREATE TABLE statement to create a table from an existing
underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster.

The following examples show how to create tables from a Kafka topic,
named `users`. To see these examples in action, create the `users` topic
Expand Down Expand Up @@ -109,13 +111,16 @@ SELECT statement:
SELECT * FROM users EMIT CHANGES;
```

Your output should resemble:
Assuming the table has content, your output should resemble:

```
1541439611069 | User_2 | 1498028899054 | User_2 | MALE | Region_1
1541439611320 | User_6 | 1505677113995 | User_6 | FEMALE | Region_7
1541439611396 | User_5 | 1491338621627 | User_5 | OTHER | Region_2
1541439611536 | User_9 | 1492621173463 | User_9 | FEMALE | Region_3
+---------------+--------+---------------+--------+--------+----------+
| ROWTIME | ROWKEY | REGISTERTIME | USERID | GENDER | REGIONID |
+---------------+--------+---------------+--------+--------+----------+
| 1541439611069 | User_2 | 1498028899054 | User_2 | MALE | Region_1 |
| 1541439611320 | User_6 | 1505677113995 | User_6 | FEMALE | Region_7 |
| 1541439611396 | User_5 | 1491338621627 | User_5 | OTHER | Region_2 |
| 1541439611536 | User_9 | 1492621173463 | User_9 | FEMALE | Region_3 |
^CQuery terminated
```

Expand All @@ -124,6 +129,33 @@ Press Ctrl+C to stop printing the query results.
The table values update continuously with the most recent records,
because the underlying `users` topic receives new messages continuously.

Create a Table backed by a new Kafka Topic
------------------------------------------

Use the CREATE TABLE statement to create a table without a preexisting
topic by providing the PARTITIONS count, and optionally the REPLICA count,
in the WITH clause.

Taking the example of the users table from above, but where the underlying
Kafka topic does not already exist, you can create the table by pasting
the following CREATE TABLE statement into the CLI:

```sql
CREATE TABLE users
(registertime BIGINT,
userid VARCHAR,
gender VARCHAR,
regionid VARCHAR)
WITH (KAFKA_TOPIC = 'users',
VALUE_FORMAT='JSON',
PARTITIONS=4,
REPLICAS=3
KEY = 'userid');
```

This will create the users topics for you with the supplied partition and replica count.


Create a ksqlDB Table with Streaming Query Results
--------------------------------------------------

Expand Down Expand Up @@ -206,9 +238,9 @@ Your output should resemble:

```
Query ID | Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------------------------------------------------
CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE' EMIT CHANGES;
------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
```

Expand All @@ -225,9 +257,9 @@ to include a function like COUNT(*) in the SELECT clause.

```sql
CREATE TABLE pageviews_table AS
SELECT viewtime, userid, pageid, COUNT(*) AS TOTAL
SELECT userid, pageid, COUNT(*) AS TOTAL
FROM pageviews_original WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY viewtime, userid, pageid
GROUP BY userid, pageid
EMIT CHANGES;
```

Expand All @@ -250,30 +282,39 @@ SELECT * FROM pageviews_table EMIT CHANGES;
Your output should resemble:

```
1557183929488 | 1557183929488|+|User_9|+|Page_39 : Window{start=1557183900000 end=-} | 1557183929488 | User_9 | Page_39 | 1
1557183930211 | 1557183930211|+|User_1|+|Page_79 : Window{start=1557183900000 end=-} | 1557183930211 | User_1 | Page_79 | 1
1557183930687 | 1557183930687|+|User_9|+|Page_34 : Window{start=1557183900000 end=-} | 1557183930687 | User_9 | Page_34 | 1
1557183929786 | 1557183929786|+|User_5|+|Page_12 : Window{start=1557183900000 end=-} | 1557183929786 | User_5 | Page_12 | 1
1557183931095 | 1557183931095|+|User_3|+|Page_43 : Window{start=1557183900000 end=-} | 1557183931095 | User_3 | Page_43 | 1
1557183930184 | 1557183930184|+|User_1|+|Page_29 : Window{start=1557183900000 end=-} | 1557183930184 | User_1 | Page_29 | 1
1557183930727 | 1557183930726|+|User_6|+|Page_93 : Window{start=1557183900000 end=-} | 1557183930726 | User_6 | Page_93 | 1
+---------------+---------------+---------------+------------------+--------+---------+------+
| ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL|
+---------------+---------------+---------------+------------------+--------+---------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 |
| 1557183930211 | 1557183900000 | 1557183960000 | User_1|+|Page_79 | User_1 | Page_79 | 1 |
| 1557183930687 | 1557183900000 | 1557183960000 | User_9|+|Page_34 | User_9 | Page_34 | 1 |
| 1557183929786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 2 |
| 1557183931095 | 1557183900000 | 1557183960000 | User_3|+|Page_43 | User_3 | Page_43 | 1 |
| 1557183930184 | 1557183900000 | 1557183960000 | User_1|+|Page_29 | User_1 | Page_29 | 1 |
| 1557183930727 | 1557183900000 | 1557183960000 | User_6|+|Page_93 | User_6 | Page_93 | 3 |
^CQuery terminated
```

!!! note
It is possible for the same key to be output multiple time when emitting changes
to the table. This is because each time the row in the table changes it will be emitted.

Look up the value for a specific key within the table by using a SELECT
statement.

```sql
SELECT * FROM pageviews_table WHERE ROWKEY='1557183929488|+|User_9|+|Page_39';
SELECT * FROM pageviews_table WHERE ROWKEY='User_9|+|Page_39';
```

Your output should resemble:

```
ROWKEY STRING KEY | WINDOWSTART BIGINT KEY | VIEWTIME BIGINT | USERID STRING | PAGEID STRING | TOTAL BIGINT
----------------------------------------------------------------------------------------------------------------------------
1557183929488|+|User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1
----------------------------------------------------------------------------------------------------------------------------
+-----------------+---------------+---------------+--------+---------+-------+
ROWKEY | WINDOWSTART | ROWTIME | USERID | PAGEID | TOTAL |
------------------+---------------+---------------+--------+---------+-------+
User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1 |
Query terminated
```


Expand Down
20 changes: 10 additions & 10 deletions docs-md/tutorials/clickstream-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ SELECT * FROM EVENTS_PER_MIN EMIT CHANGES LIMIT 5;
Your output should resemble:

```
1536662819576 | 24 : Window{start=1536662760000 end=-} | 24 | 12
1536662819685 | 4 : Window{start=1536662760000 end=-} | 4 | 19
1536662847582 | 4 : Window{start=1536662820000 end=-} | 4 | 75
1536662847586 | 24 : Window{start=1536662820000 end=-} | 24 | 101
1536662879959 | 29 : Window{start=1536662820000 end=-} | 29 | 2
1536662819576 | 1536662760000 | 1536662765000 | 24 | 24 | 12
1536662819685 | 1536662760000 | 1536662765000 | 4 | 4 | 19
1536662847582 | 1536662820000 | 1536662825000 | 4 | 4 | 75
1536662847586 | 1536662820000 | 1536662825000 | 24 | 24 | 101
1536662879959 | 1536662820000 | 1536662825000 | 29 | 29 | 2
Limit Reached
Query terminated
```
Expand All @@ -323,11 +323,11 @@ SELECT * FROM PAGES_PER_MIN EMIT CHANGES LIMIT 5;
Your output should resemble:

```
1536662784977 | 21 : Window{start=1536662725000 end=-} | 21 | 2
1536662789353 | 21 : Window{start=1536662730000 end=-} | 21 | 7
1536662793715 | 21 : Window{start=1536662735000 end=-} | 21 | 20
1536662799627 | 21 : Window{start=1536662740000 end=-} | 21 | 35
1536662804534 | 21 : Window{start=1536662745000 end=-} | 21 | 40
1536662784977 | 1536662725000 | 1536662730000 | 21 | 21 | 2
1536662789353 | 1536662730000 | 1536662735000 | 21 | 21 | 7
1536662793715 | 1536662735000 | 1536662740000 | 21 | 21 | 20
1536662799627 | 1536662740000 | 1536662745000 | 21 | 21 | 35
1536662804534 | 1536662745000 | 1536662750000 | 21 | 21 | 40
Limit Reached
Query terminated
```
Expand Down
10 changes: 5 additions & 5 deletions ksql-clickstream-demo/non-docker-clickstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ These steps will guide you through how to setup your environment and run the cli
Your output should resemble:
```bash
1503585475000 | 4 : Window{start=1503585475000 end=-} | 4 | 14
1503585480000 | 25 : Window{start=1503585480000 end=-} | 25 | 9
1503585480000 | 16 : Window{start=1503585480000 end=-} | 16 | 6
1503585475000 | 25 : Window{start=1503585475000 end=-} | 25 | 20
1503585480000 | 37 : Window{start=1503585480000 end=-} | 37 | 6
1503585475000 | 1503585475000 | 1503585480000 | 4 | 4 | 14
1503585480000 | 1503585480000 | 1503585485000 | 25 | 25 | 9
1503585480000 | 1503585480000 | 1503585485000 | 16 | 16 | 6
1503585475000 | 1503585475000 | 1503585480000 | 25 | 25 | 20
1503585480000 | 1503585480000 | 1503585485000 | 37 | 37 | 6
LIMIT reached
Query terminated
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ public boolean valueContainsAny(final Set<ColumnName> names) {
*
* <p>If the columns already exist in the value schema the function returns the same schema.
*
* @param windowed indicates that the source is windowed; meaning key column copied to
* value will be of type {@link SqlTypes#STRING}, inline with how {@code SourceBuilder}
* creates a {@code String} {@code ROWKEY} for windowed sources.
* @param windowed indicates that the source is windowed; meaning {@code WINDOWSTART} and {@code
* WINDOWEND} columns will added to the value schema to represent the window bounds.
* @return the new schema.
*/
public LogicalSchema withMetaAndKeyColsInValue(final boolean windowed) {
Expand Down Expand Up @@ -244,16 +243,27 @@ private LogicalSchema rebuild(
builder.add(Column.of(c.name(), c.type(), Namespace.VALUE, valueIndex++));
}

if (windowedKey) {
builder.add(
Column.of(SchemaUtil.WINDOWSTART_NAME, SqlTypes.BIGINT, Namespace.VALUE, valueIndex++));
builder.add(
Column.of(SchemaUtil.WINDOWEND_NAME, SqlTypes.BIGINT, Namespace.VALUE, valueIndex++));
}

for (final Column c : key) {
final SqlType type = windowedKey ? SqlTypes.STRING : c.type();
builder.add(Column.of(c.name(), type, Namespace.VALUE, valueIndex++));
builder.add(Column.of(c.name(), c.type(), Namespace.VALUE, valueIndex++));
}
}

for (final Column c : value) {
if (c.name().equals(SchemaUtil.WINDOWSTART_NAME)
|| c.name().equals(SchemaUtil.WINDOWEND_NAME)
) {
continue;
}

if (findColumnMatching(
(withNamespace(Namespace.META).or(withNamespace(Namespace.KEY))
.and(withRef(c.ref()))
(withNamespace(Namespace.META).or(withNamespace(Namespace.KEY)).and(withRef(c.ref()))
)).isPresent()) {
continue;
}
Expand Down
Loading

0 comments on commit 48aa6ec

Please sign in to comment.