-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: append additional fields to row values in KsqlMaterialization.java
to fix pull query filtering
#7336
Conversation
@confluentinc It looks like @cprasad1 just signed our Contributor License Agreement. 👍 Always at your service, clabot |
...c/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java
Show resolved
Hide resolved
@@ -174,7 +174,7 @@ public MaterializedWindowedTable windowed() { | |||
final Builder<WindowedRow> builder = ImmutableList.builder(); | |||
|
|||
for (final WindowedRow row : result) { | |||
filterAndTransform(row.windowedKey(), row.value(), row.rowTime()) | |||
filterAndTransform(row.windowedKey(), getIntermediateRow(row), row.rowTime()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure if we even need to append the extra columns for windowed rows. Windowed state stores might need more testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate a bit more here? Is that because the windowed key may have the needed columns for filtering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang my understanding is that windowed tables always need an aggregation, so they have a state store backing the aggregated table. If that is the case, then we don't need to append this extra metadata as KsqlMaterialization
is sophisticated enough to handle those cases (we have test cases for that). I noticed that we follow a similar pattern for windowed rows in ProjectOperator
and SelectOperator
of generating intermediate rows on which filters and transformations can be applied. I added these fields as a hedge against potential cases that we might miss (obviously it comes at a cost). That being said, I have a couple of questions for you:
- Are there any type of Windowed tables that are not queryable today that we want to be able to query?
- Can Windowed tables be derived without doing any aggregations? (Specifically,
GROUP BY
aggregations)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tested it by try a windowed query with group by and having clause. I think that might trigger KsqlMaterialization
. In that case, presumably you could say HAVING WINDOWSTART > 20
and therefore need to have those columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @AlanConfluent , I think today if you have a WINDOW BY + GROUP BY + HAVING
you would have a first windowed table from windowBy+groupBy
aggregations, and then a second windowed table from having
as a filtering condition. So in that sense not all windowed tables should be generated with aggregations, they can also be generated from table statless operators from other existing windowed tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to have this as a RQTT test where the having clause mentions windowstart or windowend
{"row":{"columns":["F"]}} | ||
|
||
]} | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, there are no additional tests for Windowed materialization. Did we intend to make any new new type of Windowed table queryable with all these changes @AlanConfluent ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could always tested a windowed case like
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID HAVING COUNT(1) > 2;",
We don't have a lot of tests with the having clause for pull queries and I think it might trigger the KsqlMaterialization
transform logic to apply the filtering. Since they can apply for group bys I assume they work for windowed tables as well. Normal where clauses presumably don't touch KsqlMaterialization
when a group by is in place since the materialization happens after the filter has been applied, so that might not need additional testing.
But in general, you're right that a lot of windowed logic has already been tested fairly well in https://github.com/confluentinc/ksql/blob/master/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json since windows require a group by.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are good cases. It would be good to do a windowed table + group by + having clause mentioning windowstart. That's one last case I don't see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't allow WINDOWSTART
that at the moment, so it's not a testable case. The specific error message is Window bounds column WINDOWSTART can only be used in the SELECT clause of windowed aggregations and can not be passed to aggregate functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related: #4397
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still wondering, if @AlanConfluent 's query is possible:
CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID HAVING COUNT(1) > 2;
note it does not try to group by window-start/end, while we should still be able to pull query it with conditions on other columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering what's the difference for windowed stores, for my own education. Otherwise the fix lgtm.
@@ -174,7 +174,7 @@ public MaterializedWindowedTable windowed() { | |||
final Builder<WindowedRow> builder = ImmutableList.builder(); | |||
|
|||
for (final WindowedRow row : result) { | |||
filterAndTransform(row.windowedKey(), row.value(), row.rowTime()) | |||
filterAndTransform(row.windowedKey(), getIntermediateRow(row), row.rowTime()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate a bit more here? Is that because the windowed key may have the needed columns for filtering?
...s/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java
Show resolved
Hide resolved
...s/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java
Show resolved
Hide resolved
...c/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java
Outdated
Show resolved
Hide resolved
materialization = new KsqlMaterialization( | ||
inner, | ||
SCHEMA, | ||
ImmutableList.of(project, filter) | ||
ImmutableList.of(filter, project) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you swap these because this is a more realistic ordering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
{"row":{"columns":["F"]}} | ||
|
||
]} | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could always tested a windowed case like
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID HAVING COUNT(1) > 2;",
We don't have a lot of tests with the having clause for pull queries and I think it might trigger the KsqlMaterialization
transform logic to apply the filtering. Since they can apply for group bys I assume they work for windowed tables as well. Normal where clauses presumably don't touch KsqlMaterialization
when a group by is in place since the materialization happens after the filter has been applied, so that might not need additional testing.
But in general, you're right that a lot of windowed logic has already been tested fairly well in https://github.com/confluentinc/ksql/blob/master/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json since windows require a group by.
@@ -174,7 +174,7 @@ public MaterializedWindowedTable windowed() { | |||
final Builder<WindowedRow> builder = ImmutableList.builder(); | |||
|
|||
for (final WindowedRow row : result) { | |||
filterAndTransform(row.windowedKey(), row.value(), row.rowTime()) | |||
filterAndTransform(row.windowedKey(), getIntermediateRow(row), row.rowTime()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tested it by try a windowed query with group by and having clause. I think that might trigger KsqlMaterialization
. In that case, presumably you could say HAVING WINDOWSTART > 20
and therefore need to have those columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice PR and good test cases
...s/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the added testing coverage!
] | ||
}, | ||
{ | ||
"name": "persistent query with KEY filter and projection +++ pull query table scan and single key lookup ***FAILURE***", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we explicitly state the the error root cause here? Since otherwise the name is exactly the same as above except we say it is a ***FAILURE***
case.
{"row":{"columns":["F"]}} | ||
|
||
]} | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still wondering, if @AlanConfluent 's query is possible:
CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID HAVING COUNT(1) > 2;
note it does not try to group by window-start/end, while we should still be able to pull query it with conditions on other columns?
…ava` to fix pull query filtering (confluentinc#7336) * passes unit tests * semantic * start adding mores RQTT * start adding mores RQTT 2 * start adding mores RQTT 3 all greeeeen * start adding mores RQTT 3 all greeeeen MAX COMPLEX * FINISH RQTT * added windowed table tests * modified tests * add small comment * add small comment fix checkstyle Co-authored-by: Chittaranjan Prasad <>
"name": "windowed - select star with HAVING filter", | ||
"statements": [ | ||
"CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", | ||
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID HAVING COUNT(1) > 1;", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang is this the test similar to what you are interested in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, thanks!
…ava` to fix pull query filtering (#7336) (#7342) * passes unit tests * semantic * start adding mores RQTT * start adding mores RQTT 2 * start adding mores RQTT 3 all greeeeen * start adding mores RQTT 3 all greeeeen MAX COMPLEX * FINISH RQTT * added windowed table tests * modified tests * add small comment * add small comment fix checkstyle Co-authored-by: Chittaranjan Prasad <>
Description
Fixes #7312 and other similar situations by appending additional fields to row values in
KsqlMaterialization.java
Testing done
Reviewer checklist