-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support window bounds columns on windowed group by #4397
Comments
fixes: confluentinc#3871 Is needed to fix: - confluentinc#3633 - confluentinc#4015 Before this change the version of `ROWKEY` copied to the value schema during processing of data in the Streams topology was always of type `STRING` regardless of the actual key type. This is because windowed keys had a `ROWKEY` in the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`. While `ROWKEY` in the value schema was a `STRING`, `ROWKEY` in the key schema was the actual type, e.g. `INT`. This is confusing and will lead to bugs. Also, the formated string isn't very friendly for users. This change looks to introduce the `WINDOWSTART` and `WINDOWEND` columns that were reserved in confluentinc#4388. The obvious approach would be to add `WINDOWSTART` and `WINDOWEND` as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this. For now, we only add the windows bounds columns when we `LogicalSchema.withMetaAndKeyColsInValue(true)`. This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work. With this change `ROWKEY` for windowed sources no longer has the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`: `ROWKEY` is now only the _actual_ key and the window bounds can be accessed by `WINDOWSTART` and `WINDOWEND`. These two window bounds columns are included in a pull `SELECT *` query. Likewise a join will include the window bounds columns from both sides in the join result if the join is `SELECT *`. ## Examples: ### Push queries * A select * on a windowed source will not include `WINDOWSTART` and `WINDOWEND`. `ROWKEY` will be the actual key, not a formatted string. ``` ksql> SELECT * FROM windowedSource emit changes -- old output +---------------+------------------------------------------------------+--------+---------+------+ | ROWTIME | ROWKEY | USERID | PAGEID | TOTAL| +---------------+------------------------------------------------------+--------+---------+------+ | 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1 | | 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1 | -- new output +---------------+---------------+---------------+------------------+--------+---------+------+ | ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL| +---------------+---------------+---------------+------------------+--------+---------+------+ | 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 | | 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 | ``` * `WINDOWSTART` and `WINDOWEND` are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc. For example: ```sql SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes; ``` However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax: **KNOWN LIMITATION** Where a query builds a windowed source from a non-windowed source the window bounds columns are not available. For example: ``` -- won't yet work: SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY; ``` This issue is tracked by: confluentinc#4397 * Joins of windowed sources include the `WINDOWSTART` and `WINDOWEND` columns from both sides. ### Pull queries **KNOWN LIMITATION** Pull queries have not been updated yet. This will be done in a follow up PR confluentinc#3633. This is mainly to keep this PR manageable. ### Persistent queries Persistent C*AS queries work similar to push queries and have the same known limitation. BREAKING CHANGE: Any query of a windowed source that uses `ROWKEY` in the SELECT projection will see the contents of `ROWKEY` change from a formatted `STRING` containing the underlying key and the window bounds, to just the underlying key. Queries can access the window bounds using `WINDOWSTART` and `WINDOWEND`. BREAKING CHANGE: Joins on windowed sources now include `WINDOWSTART` and `WINDOWEND` columns from both sides on a `SELECT *`.
#4401) * chore: support window bounds columns in persistent and pull queries fixes: #3871 Is needed to fix: - #3633 - #4015 Before this change the version of `ROWKEY` copied to the value schema during processing of data in the Streams topology was always of type `STRING` regardless of the actual key type. This is because windowed keys had a `ROWKEY` in the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`. While `ROWKEY` in the value schema was a `STRING`, `ROWKEY` in the key schema was the actual type, e.g. `INT`. This is confusing and will lead to bugs. Also, the formated string isn't very friendly for users. This change looks to introduce the `WINDOWSTART` and `WINDOWEND` columns that were reserved in #4388. The obvious approach would be to add `WINDOWSTART` and `WINDOWEND` as columns in the key schema. Unfortunately, this would be a much bigger change as many parts of the code currently rely on there being only a single key column. The planned structured key work will resolve this. For now, we only add the windows bounds columns when we `LogicalSchema.withMetaAndKeyColsInValue(true)`. This is a bit of a temporary hack, but gets us where we need to be. This will be cleaned up as part of the structured key work. With this change `ROWKEY` for windowed sources no longer has the format `<actual key> : Window{start=<windowStart>, end=<windowEnd>}`: `ROWKEY` is now only the _actual_ key and the window bounds can be accessed by `WINDOWSTART` and `WINDOWEND`. These two window bounds columns are included in a pull `SELECT *` query. Likewise a join will include the window bounds columns from both sides in the join result if the join is `SELECT *`. ## Examples: ### Push queries * A select * on a windowed source will not include `WINDOWSTART` and `WINDOWEND`. `ROWKEY` will be the actual key, not a formatted string. ``` ksql> SELECT * FROM windowedSource emit changes -- old output +---------------+------------------------------------------------------+--------+---------+------+ | ROWTIME | ROWKEY | USERID | PAGEID | TOTAL| +---------------+------------------------------------------------------+--------+---------+------+ | 1557183929488 | User_9|+|Page_39 : Window{start=1557183900000 end=-} | User_9 | Page_39 | 1 | | 1557183930211 | User_1|+|Page_79 : Window{start=1557183900000 end=-} | User_1 | Page_79 | 1 | -- new output +---------------+---------------+---------------+------------------+--------+---------+------+ | ROWTIME | WINDOWSTART | WINDOWEND | ROWKEY | USERID | PAGEID | TOTAL| +---------------+---------------+---------------+------------------+--------+---------+------+ | 1557183919786 | 1557183900000 | 1557183960000 | User_5|+|Page_12 | User_5 | Page_12 | 1 | | 1557183929488 | 1557183900000 | 1557183960000 | User_9|+|Page_39 | User_9 | Page_39 | 1 | ``` * `WINDOWSTART` and `WINDOWEND` are available in the SELECT, GROUPBY, WHERE, HAVING clauses etc. For example: ```sql SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss Z') FROM windowedSource emit changes; ``` However, don't get too excited just yet as there is a known limitation that drastically reduces the availability of this syntax: **KNOWN LIMITATION** Where a query builds a windowed source from a non-windowed source the window bounds columns are not available. For example: ``` -- won't yet work: SELECT WINDOWSTART FROM FROM someSource WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY; ``` This issue is tracked by: #4397 * Joins of windowed sources include the `WINDOWSTART` and `WINDOWEND` columns from both sides. ### Pull queries **KNOWN LIMITATION** Pull queries have not been updated yet. This will be done in a follow up PR #3633. This is mainly to keep this PR manageable. ### Persistent queries Persistent C*AS queries work similar to push queries and have the same known limitation. BREAKING CHANGE: Any query of a windowed source that uses `ROWKEY` in the SELECT projection will see the contents of `ROWKEY` change from a formatted `STRING` containing the underlying key and the window bounds, to just the underlying key. Queries can access the window bounds using `WINDOWSTART` and `WINDOWEND`. BREAKING CHANGE: Joins on windowed sources now include `WINDOWSTART` and `WINDOWEND` columns from both sides on a `SELECT *`.
While it should be possible to make
The following places can not so easily be supported:
|
Partial fix, (for non-aggregate select expressions), available in #4450 |
Removing 0.7 release tags as the remaining work is not needed for 0.7 and, likely, requires a Streams change. |
To document the related Streams work:
Personally, I feel we first need to finish up: So my personal plan is to finish up KIP-478 and then tackle KIP-300. @big-andy-coates , do you think this captures the relevant needs, or do we need to create a different AK Jira issue? |
Hey @vvcephei, I'm not sure to be honest. KIP-300 seems to me to be fixing the limitation that its not possible to build a windowed table from a windowed changelog. So I don't see how that's related. KAFKA-7777 also doesn't seem to be on-topic. What's needed is:
For example, The second thing we'd ideally need is a It doesn't seem any of the listed items above address either of these items, but I may be missing something. |
Aha! Thanks for the clarification, @big-andy-coates . That is indeed different than what I was thinking. Do you want to create a Kafka Jira ticket to track this desire? |
So after the upgrade we aren't able to use the UDAfs as they have been removed, and also the the column names aren't accessible in GROUPBY, WHERE, or HAVING. Is there a workaround for this? Also seems like the fix won't be part of 0.11? |
Hi @muneebshahid, The UDAFs, when they existing, wouldn't have been usable in GROUP BY or WHERE, and I can't think of a use-case for using the old windowStart() and windowEnd() UDAFs in the HAVING clause, if that was supported. Are you saying you were using the UDAFs in the HAVING clause and have no lost this functionality? Sorry if that is the case. Can you explain your use-case and provide example SQL so we can understand what you're trying to achieve? Thanks. |
Hey @big-andy-coates thank you for the response. And yes we are using them in "HAVING" clause. Our use case is that we are tracking changes in the last x minutes. For this Hopping window was the most suitable one. And since the messages mostly arrive in order, we are mainly interested in the very first window (the window that extends most in the past from the current moment). For example, have a look at this script
then these five windows are emitted for a message. | MTS | WS | WE | Total To retain just the first one we are using
2*1000 to account for the window advance. One way for us would be to handle this outside ksql, but that means a lot of useless messages will be consumed, particularly for larger window sizes which then need to be filtered out. If you have some suggestions then please let me know. Thank you. |
Thanks for details of the use-case @muneebshahid. Sorry this functionality has been temporarily lost. I can appreciate this must be frustrating. Please be assured we are working towards reinstating such functionality with our work towards ksqlDB supporting structured keys. |
Hey @big-andy-coates , could you create an AK streams ticket for the needed changes so that we would not forget about it? |
+1 from community post requiring |
Hi all, We are experiencing the same restriction on Financial Services domain. Is there any update on the issue? Should we expect a support in short notice? Thanks. |
Any update on when this feature will be added? |
Also looking for update on this feature. |
Not currently planned on short term road map |
Any update on when this feature will be added? |
KSQL currently lets you take a non-windowed stream and perform a windowed group by:
Which is essentially grouping by not just
something
, but also implicitly by the window bounds.This might be more correctly written with a Tumbling table function:
Where the Tumbling table function returns one row for each row in
S
, with the addition of thewindowstart
andwindowend
columns. (Note: Hopping and session table functions are also possible, though in the case of the latter the table function would also emit retractions).In a correct SQL model
windowstart
andwindowend
would therefore be available as fields within the selection, e.g.This would allow us to do away with
windowStart()
andwindowEnd()
udafs!!!!!Unfortunately, this is not currently possible. Using the window bounds columns results in an unknown column error.
Nor are the columns available to UDAFs, e.g.
QTT test:
Results in error:
The text was updated successfully, but these errors were encountered: