-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ksqlDB table incorrectly returns window-based field as null
in pull query
#4015
Comments
Likely, this is related to the special handling the UDAFs Workaround: |
@big-andy-coates - there's no way to format |
This workaround kind of works, but as @mikebin says, there's no way to format the field since it can't be referenced (#3633) |
null
valuesnull
in pull query
Yes, that's right |
fixes: confluentinc#3871 Is needed to fix: - confluentinc#3633 - confluentinc#4015 Before this change the version of `ROWKEY` copied to the value schema during processing of data in the Streams topology was always of type `STRING` regardless of the actual key type. This is because windowed keys had a `ROWKEY` in the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`. While `ROWKEY` in the value schema was a `STRING`, `ROWKEY` in the key schema was the actual type, e.g. `INT`. This is confusing and will lead to bugs. Also, the formated string isn't very friendly for users. This change looks to introduce the `WINDOWSTART` and `WINDOWEND` columns that were reserved in confluentinc#4388. The obvious approach would be to add `WINDOWSTART` and `WINDOWEND` as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this. For now, we only add the windows bounds columns when we `LogicalSchema.withMetaAndKeyColsInValue(true)`. This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work. With this change `ROWKEY` for windowed sources no longer has the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`: `ROWKEY` is now only the _actual_ key and the window bounds can be accessed by `WINDOWSTART` and `WINDOWEND`. These two window bounds columns are included in a pull `SELECT *` query. Likewise a join will include the window bounds columns from both sides in the join result if the join is `SELECT *`. ## Examples: ### Push queries * A select * on a windowed source will not include `WINDOWSTART` and `WINDOWEND`. `ROWKEY` will be the actual key, not a formatted string. ``` ksql> SELECT * FROM windowedSource emit changes -- old output +---------------+------------------------------------------------------+--------+---------+------+ | ROWTIME | ROWKEY | USERID | PAGEID | TOTAL| +---------------+------------------------------------------------------+--------+---------+------+ | 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1 | | 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1 | -- new output +---------------+---------------+---------------+------------------+--------+---------+------+ | ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL| +---------------+---------------+---------------+------------------+--------+---------+------+ | 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 | | 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 | ``` * `WINDOWSTART` and `WINDOWEND` are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc. For example: ```sql SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes; ``` However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax: **KNOWN LIMITATION** Where a query builds a windowed source from a non-windowed source the window bounds columns are not available. For example: ``` -- won't yet work: SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY; ``` This issue is tracked by: confluentinc#4397 * Joins of windowed sources include the `WINDOWSTART` and `WINDOWEND` columns from both sides. ### Pull queries **KNOWN LIMITATION** Pull queries have not been updated yet. This will be done in a follow up PR confluentinc#3633. This is mainly to keep this PR manageable. ### Persistent queries Persistent C*AS queries work similar to push queries and have the same known limitation. BREAKING CHANGE: Any query of a windowed source that uses `ROWKEY` in the SELECT projection will see the contents of `ROWKEY` change from a formatted `STRING` containing the underlying key and the window bounds, to just the underlying key. Queries can access the window bounds using `WINDOWSTART` and `WINDOWEND`. BREAKING CHANGE: Joins on windowed sources now include `WINDOWSTART` and `WINDOWEND` columns from both sides on a `SELECT *`.
FYI, once #4404 is merged the workaround will allow formatting of the window bounds:
|
#4401) * chore: support window bounds columns in persistent and pull queries fixes: #3871 Is needed to fix: - #3633 - #4015 Before this change the version of `ROWKEY` copied to the value schema during processing of data in the Streams topology was always of type `STRING` regardless of the actual key type. This is because windowed keys had a `ROWKEY` in the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`. While `ROWKEY` in the value schema was a `STRING`, `ROWKEY` in the key schema was the actual type, e.g. `INT`. This is confusing and will lead to bugs. Also, the formated string isn't very friendly for users. This change looks to introduce the `WINDOWSTART` and `WINDOWEND` columns that were reserved in #4388. The obvious approach would be to add `WINDOWSTART` and `WINDOWEND` as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this. For now, we only add the windows bounds columns when we `LogicalSchema.withMetaAndKeyColsInValue(true)`. This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work. With this change `ROWKEY` for windowed sources no longer has the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`: `ROWKEY` is now only the _actual_ key and the window bounds can be accessed by `WINDOWSTART` and `WINDOWEND`. These two window bounds columns are included in a pull `SELECT *` query. Likewise a join will include the window bounds columns from both sides in the join result if the join is `SELECT *`. ## Examples: ### Push queries * A select * on a windowed source will not include `WINDOWSTART` and `WINDOWEND`. `ROWKEY` will be the actual key, not a formatted string. ``` ksql> SELECT * FROM windowedSource emit changes -- old output +---------------+------------------------------------------------------+--------+---------+------+ | ROWTIME | ROWKEY | USERID | PAGEID | TOTAL| +---------------+------------------------------------------------------+--------+---------+------+ | 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1 | | 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1 | -- new output +---------------+---------------+---------------+------------------+--------+---------+------+ | ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL| +---------------+---------------+---------------+------------------+--------+---------+------+ | 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 | | 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 | ``` * `WINDOWSTART` and `WINDOWEND` are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc. For example: ```sql SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes; ``` However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax: **KNOWN LIMITATION** Where a query builds a windowed source from a non-windowed source the window bounds columns are not available. For example: ``` -- won't yet work: SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY; ``` This issue is tracked by: #4397 * Joins of windowed sources include the `WINDOWSTART` and `WINDOWEND` columns from both sides. ### Pull queries **KNOWN LIMITATION** Pull queries have not been updated yet. This will be done in a follow up PR #3633. This is mainly to keep this PR manageable. ### Persistent queries Persistent C*AS queries work similar to push queries and have the same known limitation. BREAKING CHANGE: Any query of a windowed source that uses `ROWKEY` in the SELECT projection will see the contents of `ROWKEY` change from a formatted `STRING` containing the underlying key and the window bounds, to just the underlying key. Queries can access the window bounds using `WINDOWSTART` and `WINDOWEND`. BREAKING CHANGE: Joins on windowed sources now include `WINDOWSTART` and `WINDOWEND` columns from both sides on a `SELECT *`.
Can be recreated with the following RQTT test: {
"name": "windowStart and windowEnd UDAFs",
"statements": [
"CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_TS, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY LANG;",
"SELECT WINDOW_TS, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en';"
],
"inputs": [
{"topic": "test_topic", "timestamp": 1580223282123, "value": {"lang": "en"}},
{"topic": "test_topic", "timestamp": 1580223282323, "value": {"lang": "en"}},
{"topic": "test_topic", "timestamp": 1580223283123, "value": {"lang": "en"}}
],
"responses": [
{"admin": {"@type": "currentStatus"}},
{"admin": {"@type": "currentStatus"}},
{"query": [
{"header":{"schema":"`WINDOW_TS` STRING, `LANG` STRING, `TWEET_COUNT` BIGINT"}},
{"row":{"columns":["2020-01-28 14:54:42 +0000", "en", 1]}},
{"row":{"columns":["2020-01-28 14:54:42 +0000", "en", 2]}}
]}
]
} Which fails because row returned is:
|
fixes: confluentinc#4015 At present `WindowStart()` and `WindowEnd()` UDAFs are special cased. This special casing was not being applied to pull queries. This change corrects this. Consider: ```json -- setup: CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON'); CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WEND, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY LANG; -- pull query: SELECT WSTART, WEND, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en'; ``` Before this change the pull query would return `null` values for `WSTART` and `WEND`, even though the data was correctly populated in the `TWEET_LANG` topic. With this change the correct values for `WSTART` and `WEND` are returned. The main fix is in `StreamAggregateBuilder`, which now applies a suitable `map` call that will apply the special processing to any pull query.
fixes: #4015 At present `WindowStart()` and `WindowEnd()` UDAFs are special cased. This special casing was not being applied to pull queries. This change corrects this. Consider: ```json -- setup: CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON'); CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WEND, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY LANG; -- pull query: SELECT WSTART, WEND, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en'; ``` Before this change the pull query would return `null` values for `WSTART` and `WEND`, even though the data was correctly populated in the `TWEET_LANG` topic. With this change the correct values for `WSTART` and `WEND` are returned. The main fix is in `StreamAggregateBuilder`, which now applies a suitable `map` call that will apply the special processing to any pull query.
ksqlDB 0.6
Create table:
Table:
There's data:
✅ Select just some columns (PUSH):
⛔️ Select just some columns (PULL) - shows
null
forWINDOW_TS
where should be a valueTangentially related: #4000
The text was updated successfully, but these errors were encountered: