-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize count agg expr with null column statistics #1063
Optimize count agg expr with null column statistics #1063
Conversation
@Dandandan @rdettai FYI - I took a first cut at this. Still reviewing on my end - but wanted to at least open the PR in case either of you had specific thoughts. |
{ | ||
return Some(( | ||
ScalarValue::UInt64(Some((num_rows - val) as u64)), | ||
"COUNT(Uint8(1))".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"COUNT(Uint8(1))".to_string(), | |
"COUNT(UInt8(1))".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be the column name instead of UInt8
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My naive interpretation was that UInt8(1) was a required placeholder to go with the num_rows
value in the line before it. Meaning if the column was put there then it would actually perform the count calculation - which we dont want since were using stats.
Would def be good to get a better explanation though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be the column name here indeed for it to work.
The current way hash aggregates work is by accessing the value of the aggregate in a projection by the name generated by the expression in the aggregate (e.g. COUNT(UInt8(1))
for count(*)
but COUNT(col)
for a non null count. So by rewriting and giving it the same name, this avoids computing the aggregate while still being compatible with the usage of the aggregate in the projection.
I think the approach looks good @matthewmturner Code could be simplified a bit or take more cases into account (as mentioned in TODO) |
To confirm, are you referring to the "Optimize with exprs other than Column"? |
It would be very reassuring if these optimizations came with a more systematic testing routine. I agree that unit tests in this precise scenario are not very powerful. Currently we have a test here: https://github.com/apache/arrow-datafusion/blob/558d8ecfe2d1b0737de1e2548c5fbedc0ea17ada/datafusion/tests/sql.rs#L3020 that ensures that the count optimizer rule kicks in properly with |
I'm happy to do this. Do you think better to make that file on a separate PR or maybe i could just add it for these count optimizations for now and a separate PR for the other optimizations? |
Thanks for your motivation! In this PR only tests for this feature are required. |
@rdettai @Dandandan sry for delay on this. ive been on vacation with very limited internet / computer access. Will pick this up early next week when im back. |
It seems that something strange happened with your git modules, hence the modifications that are shown in the diff for |
@rdettai @Dandandan - I got this passing CI. Will start looking into tests I can do for this feature now. I'm going to need to study the structure a bit though to figure out the right way to do it - if you have anything specific in mind let me know. |
@Dandandan @rdettai FYI I added a few tests. |
let conf = ExecutionConfig::new(); | ||
let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; | ||
|
||
assert!(optimized.as_any().is::<ProjectionExec>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment here that the added ProjectionExec
is a sign the optimization was applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure - added it.
@Dandandan @rdettai can you provide some color on why we have different return expressions for the Specifically, the expression for |
I'm not sure this is a very well defined thing in the SQL standards, but usually in query engines, projected columns get default names. For instance the result value of If you start up the Datafusion CLI as explained in > SELECT count(*) FROM foo;
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 1 |
+-----------------+ > SELECT min(a) FROM foo;
+------------+
| MIN(foo.a) |
+------------+
| 1 |
+------------+ So what would you expect the column name to be for Also, it would be pretty nice if the response column names were the same whether the plan went through an optimization or not 😄 Side note: I checked on Athena (if I remember correctly you are used to that engine), and their choice for example is to simply name the column |
@rdettai thank you for the explanation - it makes sense and you remember correctly :) I can create a separate issue to look into the column names with / without optimization point. |
I think the following two specs are related to the discussion here: https://arrow.apache.org/datafusion/specification/invariants.html#physical-schema-is-invariant-under-physical-optimization Note that the existing specs are not set in stone, so we can propose changes to them if we can back them with good reasons. |
@Dandandan @rdettai anything else needed on this PR? |
@@ -311,6 +372,33 @@ mod tests { | |||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_count_partial_with_nulls_direct_child() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not testing the code that you have added, it tests that take_optimizable_count
also works if there are nulls in the source dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thx for picking that up. Looking into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. If I understand correctly, you have copied all the existing tests, just changing the datasource for one that has nulls in it:
- this is very verbose
- it isn't really testing anything new
- it is not testing the code you have added
These optimizations only kick in in corner cases. My feeling is that they are only worth it if we manage to write them in a clean and well tested fashion, otherwise we take the benefit/risk ratio might quickly become bad 😃. Of course, @Dandandan might have another opinion as he opened the issue initially.
@rdettai thank you for the feedback. It seems I underestimated the task. I will check it out more on my side - of course any specific guidance welcome as well :) One follow up question, on your point that these optimizations only kick in in corner cases. Can you just clarify how a count or count on data with nulls is considered a corner case? I would have thought that would be considered a standard / common scenario. Thx again. |
@rdettai @Dandandan one follow up question I realize I should have asked before. Was the 'take_optimized_count' meant as an optimization for |
@rdettai @Dandandan thank you for your patience on this. I've made a number of updates - would you be able to check it out? Regarding the point on taking into account more cases - should that be done in a separate PR and done in conjunction with updating |
@Dandandan @rdettai do you have any thoughts on this? |
@alamb do you have any thoughts on this PR / how to move forward? |
Sorry for the delay @matthewmturner -- let me look more carefully at this PR. Note I think @rdettai is out this week and next which may explain the slow response. I have no such excuse however :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran out of time to do a more thorough review, but I left some hints -- hopefully they make sense @matthewmturner ?
let conf = ExecutionConfig::new(); | ||
let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; | ||
|
||
let (col, count) = match nulls { | ||
false => (Field::new("COUNT(Uint8(1))", DataType::UInt64, false), 3), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why the column name output is different for columns with NULLs (and columns that don't have nulls)
I think the difference is if the aggregate is COUNT(*)
--> COUNT(UInt8(1))
and COUNT(col)
--> COUNT(col)
This can be seen in the datafusion-cli on master
❯ create table foo as select * from (values (1, NULL), (2, 2), (3,3)) as sq;
0 rows in set. Query took 0.007 seconds.
❯ select * from foo;
+---------+---------+
| column1 | column2 |
+---------+---------+
| 1 | |
| 2 | 2 |
| 3 | 3 |
+---------+---------+
3 rows in set. Query took 0.003 seconds.
❯ select count(column1) from foo;
+--------------------+
| COUNT(foo.column1) |
+--------------------+
| 3 |
+--------------------+
1 row in set. Query took 0.004 seconds.
❯ select count(column2) from foo;
+--------------------+
| COUNT(foo.column2) |
+--------------------+
| 2 |
+--------------------+
1 row in set. Query took 0.004 seconds.
❯ select count(*) from foo;
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 3 |
+-----------------+
1 row in set. Query took 0.002 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is just a case of a poorly named variable. it should really be something like col_count
i.e. if the test is on a table count or column count. when using datafusion-cli on this branch i get the same output as what you showed. i will update.
&stats.column_statistics, | ||
agg_expr.as_any().downcast_ref::<expressions::Count>(), | ||
) { | ||
if casted_expr.expressions().len() == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this code handles count(col)
whereas the code above only handles count(*)
-- that seems strange -- perhaps we should update it so both can handle count(col)
and count(*)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that COUNT(*)
doesnt need to have a separate handler for nulls - assuming we expect same behavior as psql. For example in psql when i do the following:
postgres=# create table foo as select * from (values (1,NULL),(NULL,2),(3,3)) as sq;
SELECT 3
postgres=# select * from foo;
column1 | column2
---------+---------
1 |
| 2
3 | 3
(3 rows)
postgres=# select count(*) from foo;
count
-------
3
(1 row)
Does it make sense to reframe these optimizations as the following:
take_optimizable_table_count
(current take_optimizable_count
)=> comes from COUNT(*)
and returns num_rows
take_optimizable_column_count
(current take_optimizable_count_with_nulls
) => comes from COUNT(col)
and return num_rows - null_count
for col
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those names make more sense to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok - ive updated. Let me know if anything else needed.
@alamb no apology necessary, thank you for the feedback. i will look into your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@matthewmturner something seems to have gone wrong with the most recent merge (it has 10k lines added 🤔 )
I am not sure what is going on here.
Perhaps it is time to rebase this PR (I am happy to help do so) -- this PR has been hanging out for so long I want to help get it in
d0cd143
to
1c65b6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code will need some refactoring before being able to cover more cases of aggregation simplification, but this is good enough for now.
let conf = ExecutionConfig::new(); | ||
let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; | ||
|
||
let (col, count) = match nulls { | ||
false => (Field::new("COUNT(Uint8(1))", DataType::UInt64, false), 3), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
false => (Field::new("COUNT(Uint8(1))", DataType::UInt64, false), 3), | |
false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3), |
@rdettai are you referring to handling more than just column expressions? if so, i can do a follow on pr for updating this and the min/max optimizations (which this was based on). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- thank you for sticking with it @matthewmturner !
Which issue does this PR close?
Closes #904
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?