Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: partial push & persistent query support for window bounds columns #4401

Merged

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Jan 29, 2020

Description

fixes: #3871

Is needed to fix:

Before this change the version of ROWKEY copied to the value schema during processing of data in the Streams topology was always of type STRING regardless of the actual key type. This is because windowed keys had a ROWKEY in the format <actual key> : Window{start=<windowStart>, end=<windowEnd>}. While ROWKEY in the value schema was a STRING, ROWKEY in the key schema was the actual type, e.g. INT. This is confusing and will lead to bugs. Also, the formated string isn't very friendly for users.

This change looks to introduce the WINDOWSTART and WINDOWEND columns that were reserved in #4388. The obvious approach would be to add WINDOWSTART and WINDOWEND as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this.

For now, we only add the windows bounds columns when we LogicalSchema.withMetaAndKeyColsInValue(true). This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work.

With this change ROWKEY for windowed sources no longer has the format <actual key> : Window{start=<windowStart>, end=<windowEnd>}: ROWKEY is now only the actual key and the window bounds can be accessed by WINDOWSTART and WINDOWEND. These two window bounds columns are included in a pull SELECT * query. Likewise a join will include the window bounds columns from both sides in the join result if the join is SELECT *.

Examples:

Push queries

  • A select * on a windowed source will not include WINDOWSTART and WINDOWEND. ROWKEY will be the actual key, not a formatted string.
ksql> SELECT * FROM windowedSource emit changes

-- old output
+---------------+------------------------------------------------------+--------+---------+------+
| ROWTIME       | ROWKEY                                               | USERID | PAGEID  | TOTAL|
+---------------+------------------------------------------------------+--------+---------+------+
| 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1    |
| 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1    |

-- new output
+---------------+---------------+---------------+------------------+--------+---------+------+
| ROWTIME       | WINDOWSTART   | WINDOWEND     | ROWKEY           | USERID | PAGEID  | TOTAL|
+---------------+---------------+---------------+------------------+--------+---------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1    |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1    |
  • WINDOWSTART and WINDOWEND are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc.

For example:

SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes;

However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax:

KNOWN LIMITATION
Where a query builds a windowed source from a non-windowed source the window bounds columns are not available. For example:

-- won't yet work:
SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY;

This issue is tracked by: #4397

  • Joins of windowed sources include the WINDOWSTART and WINDOWEND columns from both sides.

Pull queries

KNOWN LIMITATION
Pull queries have not been updated yet. This will be done in a follow up PR #3633. This is mainly to keep this PR manageable.

Persistent queries

Persistent C*AS queries work similar to push queries and have the same known limitation.

BREAKING CHANGE: Any query of a windowed source that uses ROWKEY in the SELECT projection will see the contents of ROWKEY change from a formatted STRING containing the underlying key and the window bounds, to just the underlying key. Queries can access the window bounds using WINDOWSTART and WINDOWEND.

BREAKING CHANGE: Joins on windowed sources now include WINDOWSTART and WINDOWEND columns from both sides on a SELECT *.

HOW TO REVIEW:

The main changes are:

  1. LogicalSchema.withMetaAndKeyValueColumns now adds WINDOWSTART and WINDOWEND columns into the VALUE schema for windowed sources.
  2. SourceBuilder not populates the new WINDOWSTART and WINDOWEND columns for windowed sources.
  3. Unfortunately, for this to work, a couple of places in Analysis and Analyzer had to be updated to call LogicalSchema.withMetaAndKeyValueColumns to ensure the window bounds columns are available. This is a bit hacky, but much less short-term work that adding WINDOWSTART and WINDOWEND to the key schema.

Testing done

Manual and suitable QTT/RQTT tests added.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

fixes: confluentinc#3871

Is needed to fix:
- confluentinc#3633
- confluentinc#4015

Before this change the version of `ROWKEY` copied to the value schema during processing of data in the Streams topology was always of type `STRING` regardless of the actual key type. This is because windowed keys had a `ROWKEY` in the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`. While `ROWKEY` in the value schema was a `STRING`, `ROWKEY` in the key schema was the actual type, e.g. `INT`.  This is confusing and will lead to bugs.  Also, the formated string isn't very friendly for users.

This change looks to introduce the `WINDOWSTART` and `WINDOWEND` columns that were reserved in confluentinc#4388. The obvious approach would be to add `WINDOWSTART` and `WINDOWEND` as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this.

For now, we only add the windows bounds columns when we `LogicalSchema.withMetaAndKeyColsInValue(true)`. This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work.

With this change `ROWKEY` for windowed sources no longer has the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`: `ROWKEY` is now only the _actual_ key and the window bounds can be accessed by `WINDOWSTART` and `WINDOWEND`. These two window bounds columns are included in a pull `SELECT *` query. Likewise a join will include the window bounds columns from both sides in the join result if the join is `SELECT *`.

## Examples:

### Push queries

* A select * on a windowed source will not include `WINDOWSTART` and `WINDOWEND`. `ROWKEY` will be the actual key, not a formatted string.

```
ksql> SELECT * FROM windowedSource emit changes

-- old output
+---------------+------------------------------------------------------+--------+---------+------+
| ROWTIME       | ROWKEY                                               | USERID | PAGEID  | TOTAL|
+---------------+------------------------------------------------------+--------+---------+------+
| 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1    |
| 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1    |

-- new output
+---------------+---------------+---------------+------------------+--------+---------+------+
| ROWTIME       | WINDOWSTART   | WINDOWEND     | ROWKEY           | USERID | PAGEID  | TOTAL|
+---------------+---------------+---------------+------------------+--------+---------+------+
| 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1    |
| 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1    |
```

* `WINDOWSTART` and `WINDOWEND` are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc.

For example:

```sql
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes;
```

However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax:

**KNOWN LIMITATION**
Where a query builds a windowed source from a non-windowed source the window bounds columns are not available.  For example:

```
-- won't yet work:
SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY;
```

This issue is tracked by: confluentinc#4397

* Joins of windowed sources include the `WINDOWSTART` and `WINDOWEND` columns from both sides.

### Pull queries

**KNOWN LIMITATION**
Pull queries have not been updated yet. This will be done in a follow up PR confluentinc#3633. This is mainly to keep this PR manageable.

### Persistent queries

Persistent C*AS queries work similar to push queries and have the same known limitation.

BREAKING CHANGE: Any query of a windowed source that uses `ROWKEY` in the SELECT projection will see the contents of `ROWKEY` change from a formatted `STRING` containing the underlying key and the window bounds, to just the underlying key.  Queries can access the window bounds using `WINDOWSTART` and `WINDOWEND`.

BREAKING CHANGE: Joins on windowed sources now include `WINDOWSTART` and `WINDOWEND` columns from both sides on a `SELECT *`.
@big-andy-coates big-andy-coates requested review from JimGalasyn and a team as code owners January 29, 2020 11:25
@big-andy-coates big-andy-coates changed the title chore: support window bounds columns in persistent and pull queries fix: partial push & persistent query support for window bounds columns Jan 29, 2020
KSQL does support `<` operator for strings... revert doc change & add test
Copy link
Contributor Author

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some notes on the docs and code changes.

- Use the CREATE STREAM statement to create a stream from a Kafka
topic.
- Use the CREATE STREAM statement to create a stream from an existing Kafka
topic, or a new Kafka topic.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While updating the docs to include window bounds columns and update ROWKEY I noticed some inconsistencies / old formats etc. I could of put these in their own PR, but it would have clashed, so I've lumped them in this PR (sorry). I'll call out my thinking for the changes...

Updated to reflect you can also create a stream, using CREATE STREAM, that will create its own backing topic in Kafka.

Comment on lines 106 to 108
If you later use this column name to perform a join or a repartition KSQL
knows no repartition is needed. In effect, the named column becomes an
alias for ROWKEY.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added clarification on why you'd want to set a key field in the WITH clause.

Comment on lines +187 to +211
Create a Stream backed by a new Kafka Topic
-------------------------------------------

Use the CREATE STREAM statement to create a stream without a preexisting
topic by providing the PARTITIONS count, and optionally the REPLICA count,
in the WITH clause.

Taking the example of the pageviews table from above, but where the underlying
Kafka topic does not already exist, you can create the stream by pasting
the following CREATE STREAM statement into the CLI:

```sql
CREATE STREAM pageviews
(viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
PARTITIONS=4,
REPLICAS=3
VALUE_FORMAT='DELIMITED')
EMIT CHANGES;
```

This will create the pageviews topics for you with the supplied partition and replica count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added example of how to create a stream over a NEW kafka topic, i.e. the command creates the topic.

CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20' EMIT CHANGES;
--------------------------------------------------------------------------------------------------------------------------------------------------------

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated inline with current formatting.

Comment on lines 8 to 13
In ksqlDB, you create tables from existing {{ site.aktm }} topics, create
tables that will create new {{ site.aktm }} topics, or create tables of
query results from other tables or streams.

- Use the CREATE TABLE statement to create a table from a Kafka topic.
- Use the CREATE TABLE statement to create a table from an existing Kafka topic,
or a new Kafka topic.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, KSQL can create tables that create their own topics.

Comment on lines +233 to +239
private static LogicalSchema buildStreamsSchema(final AliasedDataSource s) {
// Include metadata & key columns in the value schema to match the schema the streams
// topology will use.
return s.getDataSource()
.getSchema()
.withMetaAndKeyColsInValue(s.getDataSource().getKsqlTopic().getKeyFormat().isWindowed());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to do this to ensure the Analyser has access to window bounds columns. (Again, yes, this is a bit hacky).

Comment on lines -320 to -325
final boolean allowWindowMetaFields = pullQuery
&& analysis.getFromDataSources().get(0)
.getDataSource()
.getKsqlTopic()
.getKeyFormat()
.isWindowed();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can now remove old special handling of window bounds columns for pull queries.


// Non-join persistent queries only require value columns on SELECT *
// where as joins and transient queries require all columns in the select:
// See https://github.com/confluentinc/ksql/issues/3731 for more info
final List<Column> valueColumns = persistent && !analysis.isJoin()
? schema.value()
: schema.columns();
: schema.withMetaAndKeyColsInValue(windowed).value();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, a bit hacky, but ensures window bounds columns are available.

Comment on lines -95 to -98
if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) {
// window start doesn't have a source as its a special hacky column
return Optional.empty();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removal of previous hacks :)

Comment on lines +1 to +38
{
"tests": [
{
"name": "< operator",
"statements": [
"CREATE STREAM INPUT (text STRING) WITH (kafka_topic='test_topic', value_format='DELIMITED');",
"CREATE STREAM OUTPUT AS select text, text < 'b2' from INPUT;"
],
"inputs": [
{"topic": "test_topic", "value": "a1"},
{"topic": "test_topic", "value": "b1"},
{"topic": "test_topic", "value": "B2"},
{"topic": "test_topic", "value": "b2"},
{"topic": "test_topic", "value": "b3"},
{"topic": "test_topic", "value": "b10"},
{"topic": "test_topic", "value": "b01"}
],
"outputs": [
{"topic": "OUTPUT", "value": "a1,true"},
{"topic": "OUTPUT", "value": "b1,true"},
{"topic": "OUTPUT", "value": "B2,true"},
{"topic": "OUTPUT", "value": "b2,false"},
{"topic": "OUTPUT", "value": "b3,false"},
{"topic": "OUTPUT", "value": "b10,true"},
{"topic": "OUTPUT", "value": "b01,true"}
],
"post": {
"sources": [
{
"name": "OUTPUT",
"type": "stream",
"schema": "`ROWKEY` STRING KEY, `TEXT` STRING, `KSQL_COL_1` BOOLEAN"
}
]
}
}
]
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to test something while I was messing with the examples.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - thanks @big-andy-coates! With this change can we avoid copying the key into the value schema altogether?

big-andy-coates added a commit to big-andy-coates/ksql that referenced this pull request Jan 29, 2020
fixes: confluentinc#3633

This change sees pull queries share more functionality and code around the window bounds columns `WINDOWSTART` and `WINDOWEND` introduced in confluentinc#4388 and confluentinc#4401.

* pull queries on time windowed sources, i.e. `TUMBLING` and `HOPPING`, now have a `WINDOWEND` in their schema, just like `SESSION` and the new push query functionality.
* window bound columns are now accessible within the projection of a pull query, e.g. `SELECT WINDOWSTART, WINDOWEND FROM FOO WHERE ROWKEY=1;`
@big-andy-coates
Copy link
Contributor Author

LGTM - thanks @big-andy-coates! With this change can we avoid copying the key into the value schema altogether?

Nope. This actually causes the key to be copied into the value in more placed :( . Though those will go in time when structured keys comes along.

I'm actually OK with the streams topology copying the key columns into the value, to make them more easily accessible, now that the logical around this is clean and the 'final selection' does the right thing, i.e. now that we've fixed the bugs around this.

topic by providing the PARTITIONS count, and optionally the REPLICA count,
in the WITH clause.

Taking the example of the pageviews table from above, but where the underlying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Taking the example of the pageviews table from above, but where the underlying
Taking the example of the `pageviews` table from above, but where the underlying

EMIT CHANGES;
```

This will create the pageviews topics for you with the supplied partition and replica count.
Copy link
Member

@JimGalasyn JimGalasyn Jan 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This will create the pageviews topics for you with the supplied partition and replica count.
This statement creates the `pageviews` topic with the supplied partition and replica count.

topic by providing the PARTITIONS count, and optionally the REPLICA count,
in the WITH clause.

Taking the example of the users table from above, but where the underlying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Taking the example of the users table from above, but where the underlying
Taking the example of the `users` table from above, but where the underlying

in the WITH clause.

Taking the example of the users table from above, but where the underlying
Kafka topic does not already exist, you can create the table by pasting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Kafka topic does not already exist, you can create the table by pasting
Kafka topic does not already exist, you can create the table by using


Taking the example of the users table from above, but where the underlying
Kafka topic does not already exist, you can create the table by pasting
the following CREATE TABLE statement into the CLI:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
the following CREATE TABLE statement into the CLI:
the following CREATE TABLE statement.

KEY = 'userid');
```

This will create the users topics for you with the supplied partition and replica count.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This will create the users topics for you with the supplied partition and replica count.
This statement creates the `users` topic with the supplied partition and replica count.

^CQuery terminated
```

!!! note
It is possible for the same key to be output multiple time when emitting changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
It is possible for the same key to be output multiple time when emitting changes
It is possible for the same key to be output multiple times when emitting changes

^CQuery terminated
```

!!! note
It is possible for the same key to be output multiple time when emitting changes
to the table. This is because each time the row in the table changes it will be emitted.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
to the table. This is because each time the row in the table changes it will be emitted.
to the table, because each time the row in the table changes, the key is emitted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "it" here refers to the key.

Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome, thank you for the updates! LGTM with a few suggestions.

@big-andy-coates big-andy-coates merged commit 48aa6ec into confluentinc:master Jan 29, 2020
@big-andy-coates big-andy-coates deleted the push_window_bounds branch January 29, 2020 21:36
big-andy-coates added a commit that referenced this pull request Jan 30, 2020
* chore: add full window bounds support to pull queries

fixes: #3633

This change sees pull queries share more functionality and code around the window bounds columns `WINDOWSTART` and `WINDOWEND` introduced in #4388 and #4401.

* pull queries on time windowed sources, i.e. `TUMBLING` and `HOPPING`, now have a `WINDOWEND` in their schema, just like `SESSION` and the new push query functionality.
* window bound columns are now accessible within the projection of a pull query, e.g. `SELECT WINDOWSTART, WINDOWEND FROM FOO WHERE ROWKEY=1;`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Push queries: Inconsistency with pull queries, return all rows per window
3 participants