Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unify behavior for PARTITION BY and GROUP BY #3982

Merged
merged 1 commit into from
Dec 2, 2019

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Nov 26, 2019

BREAKING CHANGE: this change makes it so that PARTITION BY statements
use the source schema, not the value/projection schema, when selecting
the value to partition by. This is consistent with GROUP BY, and
standard SQL for GROUP by. Any statement that previously used PARTITION
BY may need to be reworked.

Fixes #2701

Description

This change does two things: (1) it pulls PARTITION BY into the Query object so that it mimics the GROUP BY behavior and makes it clear that it is a function of the source data (not something post-projection) and (2) introduced a RepartitionNode in the Logical Plan that handles partitions before anything else, instead of using the projection schema.

A nice side effect is that the syntax is now ... PARTITION BY FOO EMIT CHANGES instead of ... EMIT CHANGES PARTITION BY FOO

Testing done

Updated existing test coverage to reflect the new behavior.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@hjafarpour
Copy link
Contributor

@agavra it seems that the reason for this change is more because of implementation details than user experience! It's more intuitive to partition by one of the columns in the current schema rather than a field that may not exist in the result schema! Consider the following example:

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY new_col1;

We know that the result schema is new_col1, new_col2 and it is partitioned by the first column, new_col1.
But if you make the proposed change, the query would be the following:

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY col1;

and the result would be partitioned based on a column that no longer exist!

@agavra
Copy link
Contributor Author

agavra commented Nov 27, 2019

@hjafarpour - originally, I thought the same thing, but this isn't motivated by implementation. Your argument would stand for GROUP BY as well yet databases don't have that (if I SELECT COUNT(*) GROUP BY foo it isn't necessary that foo appears in the result schema). It's not that the value doesn't exist anymore, it's just the ROWKEY column in ksqlDB. And I think it makes sense to follow the lead of other databases in this example.

Also, @big-andy-coates convinced me: GROUP BY defines what the key is set to, similarly PARTITION BY defines what the key is set to. A GROUP BY is implicitly also a PARTITION BY (because of the nature of setting the key). This means they should have behavior. Andy's #2701 (comment) here makes a lot of sense as a way of thinking about it.

@hjafarpour
Copy link
Contributor

@agavra I'm still not convinced with making PARTITION BY behave like GROUP BY. These are different operations for different purposes. GROUP BY is used to indicate grouping on the data source to perform aggregation on. On the other hand, PARTITION BY is used to repartition the result of a query.
In addition, now you remove the ability of repartitioning based on the result columns unless you add the support for complex expressions in the partition by clause. For instance, now we won't be able to have the following query:

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY new_col1;

and it should be written as the following:

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY col1*col2;

which we don't support it yet!

Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@big-andy-coates
Copy link
Contributor

A nice side effect is that the syntax is now ... PARTITION BY FOO EMIT CHANGES instead of ... EMIT CHANGES PARTITION BY FOO

@agavra work calling ^^^ out in the breaking change description?

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Nov 28, 2019

I'm still not convinced with making PARTITION BY behave like GROUP BY. These are different operations for different purposes

A GROUP BY is effectively a PARTITION BY, (selectKey in KS parlance?), followed by an aggregation. They two should behave in a consistent manner.

Consider the following example:

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY new_col1;

We know that the result schema is new_col1, new_col2 and it is partitioned by the first column, new_col1.

@hjafarpour, I think you're still thinking in terms of only the value columns being part of the schema. We've moving away from this with the work on primitive keys, (almost there) and structured keys. With this work done the key columns are as much a part of the schema as the value columns.

If you take into account that the key columns are in a schema, then the example starts to look more like:

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY col1*col2 AS Id;

And the resulting schema is something like the following, (make up types) ID INT KEY, NEW_COL1 INT, NEW_COL2 INT.

But actually, this is duplicating the key column in the value, which is a waste of space if the downstream system doesn't need it, so your initial query might be better re-written as:

CREATE STREAM foo AS SELECT col3*100 as new_col2 
FROM bar 
PARTITION BY col1*col2 AS new_col1;

with a result schema of NEW_COL1 INT KEY, NEW_COL2 INT.

However, I agree we'll need expression support in the PARTITION BY -> @agavra, do we get that for free with your change promoting this to a proper logic node? Or is this something we'll need to add? If it's the former, we should add some QTT tests covering this.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra, this looks great.

One thing to check through ... we should avoid the RepartitionNode if its PARTITION BY ROWKEY.

And as discussed, we'd ideally need expression support in PARTITION BY, which has been requested anyway and brings things inline with GROUP BY. (Separate PR -> maybe create a Gibhub issue to track and add to the project)

@@ -188,6 +188,10 @@ protected AstNode visitQuery(final Query node, final C context) {
final Optional<GroupBy> groupBy = node.getGroupBy()
.map(exp -> ((GroupBy) rewriter.apply(exp, context)));

// don't rewrite the partitionBy because we expect it to be
// exactly as it was (a single, un-aliased, column reference)
final Optional<Expression> partitionBy = node.getPartitionBy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, may need to change...

+ "| Logger: InsertQuery_1.S1"));
assertThat(lines[2], equalTo("\t\t\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING"
assertThat(lines[2],
containsString("[ REKEY ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the schema now prefixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just uses the source schema which is prefixed, for some reason, you can look at the rest of this test class to see this. As long as the sink schema is the same it should be fine I think.

Changing this is out of scope, though we should probably do it.

@@ -78,6 +77,7 @@ query
(WINDOW windowExpression)?
(WHERE where=booleanExpression)?
(GROUP BY groupBy)?
(PARTITION BY partitionBy=identifier)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much nicer!

@hjafarpour
Copy link
Contributor

@big-andy-coates yes, you are right in describing my thinking. Key in the current model is more of a meta field than an equivalent part of the schema. I know with the structured key support this will change and this change in partition by behavior can be more justified as you explained. The question is when will we have structured key supported?
I know you are working on primitive key support. With KSQL rebranding to ksqlDB I think this is a good time to make major breaking changes such as structured key support. Do you have plans to add structured key support soon after primitive key?

BREAKING CHANGE: this change makes it so that PARTITION BY statements
use the _source_ schema, not the value/projection schema, when selecting
the value to partition by. This is consistent with GROUP BY, and
standard SQL for GROUP by. Any statement that previously used PARTITION
BY may need to be reworked.
BREAKING CHANGE: when querying with EMIT CHANGES and PARTITION BY, the
PARTITION BY clause should now come before EMIT CHANGES.
@agavra
Copy link
Contributor Author

agavra commented Dec 2, 2019

Thanks for the review @big-andy-coates!

One thing to check through ... we should avoid the RepartitionNode if its PARTITION BY ROWKEY.

I don't think we should do that - even with this implementation, we don't repartition because of (your) optimization that's done inside the repartition node (SchemaKStream#selectKey). This way the logical plan still represents exactly what the user told us to do and we don't duplicate this checking logic in two places. Also with the changes around keys (and also supporting expressions in PARTITION BY) I think it's better to keep the logical plan simple.

@agavra agavra merged commit 67d3f8c into confluentinc:master Dec 2, 2019
@agavra agavra deleted the partition_by branch December 2, 2019 19:35
@big-andy-coates
Copy link
Contributor

One thing to check through ... we should avoid the RepartitionNode if its PARTITION BY ROWKEY.

I think the logical plan should not have a repartition step if its PARTITION BY ROWKEY, because logically that step is not needed.

If anything, the logic should only exist in the logical plan and the physical plan should throw if it gets through to it still with a repartition.

@vpapavas
Copy link
Member

vpapavas commented Dec 3, 2019

Hey @big-andy-coates ,

I am confused about this:

If you take into account that the key columns are in a schema, then the example starts to look more like:

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY col1*col2 AS Id; 

Why does the fact that the key columns will be part of the schema play a role here and change the example query?

Also, I feel that we add more functionality to the PARTITION BY operator than we should. It should not control what columns will be in the output, much like GROUP BY doesn't. If you have a GROUP BY col1 but don't include col1 in the projection, then the column will not be in the output schema. The same should hold for the PARTITION BY.

If we agree on this, why would the example query @hjafarpour wrote

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY new_col1;

be written as

CREATE STREAM foo AS SELECT col1*col2 AS new_col1, col3*100 as new_col2 
FROM bar 
PARTITION BY col1*col2 AS Id;

There should not be an AS term in the PARTITION BY clause, it should only be in the SELECT clause, no?

Finally, the rewriting of the query you suggest for avoiding the duplication of columns should be part of the optimizer. But if we don't allow the PARTITION BY to control the output schema (as in add extra columns) then this would would not be an issue.

@agavra
Copy link
Contributor Author

agavra commented Dec 3, 2019

Thanks for looking into this @vpapavas!

Why does the fact that the key columns will be part of the schema play a role here and change the example query?

PARTITION BY is the mechanism that we've given for defining the ROWKEY. If ROWKEY becomes multiple columns in the future, we need a way to define those. PARTITION BY makes it explicit that these columns will cause a repartitioning. So it will be part of the output schema, even if we don't give it a name.

It should not control what columns will be in the output, much like GROUP BY doesn't. If you have a GROUP BY col1 but don't include col1 in the projection, then the column will not be in the output schema. The same should hold for the PARTITION BY.

I don't think this is necessarily true; today, the result of the GROUP BY becomes the ROWKEY of the result schema (so it does define the output schema in the same way PARTITION BY does). With structured keys, it will be a set of key fields if you group by more than one column.

There should not be an AS term in the PARTITION BY clause, it should only be in the SELECT clause, no?

Today there is an "implicit" AS - AS ROWKEY. You can't have AS ROWKEY in the SELECT clause (if I understand correctly).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Breaking Change: Inconsistency between GROUP BY and PARTITION BY's handling of aliases
5 participants