-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered #6034
Conversation
# Conflicts: # datafusion/core/src/physical_optimizer/repartition.rs # datafusion/core/src/physical_plan/joins/sort_merge_join.rs # datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs # datafusion/core/src/physical_plan/mod.rs # datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs # datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs # datafusion/core/src/physical_plan/windows/window_agg_exec.rs # datafusion/physical-expr/src/utils.rs
# Conflicts: # datafusion/core/src/test_util/mod.rs # datafusion/physical-expr/src/utils.rs
# Conflicts: # datafusion/common/src/utils.rs
# Conflicts: # datafusion/core/src/physical_optimizer/repartition.rs # datafusion/core/src/physical_optimizer/sort_pushdown.rs
# Conflicts: # datafusion/core/src/physical_plan/aggregates/mod.rs
# Conflicts: # datafusion/core/src/physical_plan/aggregates/row_hash.rs
# Conflicts: # datafusion/core/src/physical_plan/aggregates/mod.rs # datafusion/core/src/physical_plan/aggregates/row_hash.rs # datafusion/core/src/test_util/mod.rs # datafusion/core/tests/sql/window.rs
# Conflicts: # datafusion/core/tests/sqllogictests/test_files/window.slt
# Conflicts: # datafusion/common/src/utils.rs # datafusion/core/src/physical_optimizer/sort_enforcement.rs # datafusion/core/src/physical_plan/streaming.rs # datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs # datafusion/core/src/physical_plan/windows/window_agg_exec.rs # datafusion/physical-expr/src/utils.rs
Since most of the code is common with existing |
I ran some preliminary benchmarks against this branch and it seems like some queries have gotten slightly slower:
Script I used is here: https://github.com/alamb/datafusion-benchmarking/blob/628151e3e3d27ff6e5242052d017f71dcd0d80ef/bench.sh I am rerunning the numbers to see if I can reproduce the results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the structure to make a single GroupHash operator support both ordered and unordered data is very clever. Thank you @mustafasrepo
However, I suspect the overhead of this tracking is slowing down existing aggregation.
If this is in fact slowing down queries, then I think making separate operators for streaming and non streaming (as @mingmwang suggests and I tried to say in #5133 (comment))
I will post my next benchmark run shortly
.last() | ||
.map(|item| item.ordered_columns.clone()); | ||
|
||
if let Some(last_ordered_columns) = last_ordered_columns { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be mis understanding this code, but it seems like it is tracking per-group if the group can be emitted or not. As I understand The “Partial Streaming” / “Partitioned Streaming”
section of https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit#heading=h.uapxuhfa9wyi
The entire hash table could be flushed each time a new value of date is seen:
Perhaps with the obvious vectorization of only checking on record batch boundaries, or something
I got similar results in my next performance run:
|
For reference, here is the same benchmark run against
|
This is very helpful. The variance seems larger than one expects. It seems there may be a tiny slow-down of magnitude noise variance / 2 (in high cardinality cases?). @mustafasrepo and I just had a meeting to go over why it could be. He will respond explaining our theory in greater detail, answer your other questions and maybe even suggest a fix/improvement. Based on the discussion afterwards and the final numbers, we can reach a consensus on whether we should have two impls with some code duplication, or use the current structure -- we will then take the necessary steps accordingly. Thanks for all the reviews! |
# Conflicts: # datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs # datafusion/core/tests/sql/group_by.rs # datafusion/core/tests/sqllogictests/test_files/window.slt
I strongly suggest to have separate implementation(Exec) for Streaming Aggregation. This is similar to how we separate the |
/// The state that is built for each output group. | ||
#[derive(Debug)] | ||
pub struct GroupState { | ||
/// The actual group by values, stored sequentially | ||
group_by_values: OwnedRow, | ||
|
||
ordered_columns: Option<Vec<ScalarValue>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not efficient. We should avoid using Vec of Vec structs in the critical data structs. The Vec itself is actually a pointer which will point to some other memory address.
Because the GroupState is hold in a global Vec, if we store ordered_columns
in another Vec, when the code access those member, the memory access pattern will be very random.
You can implement something similar to arrow's GenericStringArray
to achieve a better memory layout.
/// Prune the groups from the `self.aggr_state.group_states` which are in | ||
/// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and | ||
/// we are sure that these groups cannot receive new rows.) status. | ||
fn prune(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good to keep the global group_states
in a relatively small size.
But is it possible that in some cases only a small percentage is pruned? And we pay for more copies. I'm not sure when the prune will be triggered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ordered_columns
store the section of the group by expression that defines ordering in GroupState
. When a different ordered_columns
is received, we are sure that previous groups with different ordered_columns
are finalized (They will no longer receive new value). At the end of group_aggregate_batch
we iterate over self.aggr_state.group_states
and mark the groups that have different ordered_columns
with the ordered_columns
of the most recent (last) group as prunable.
As an example, If the table is like below, and we know that it satisfies ORDER BY a ASC
a |
---|
1 |
1 |
2 |
2 |
3 |
3 |
and group by clause is GROUP BY a
group with ordered_columns= Some(vec![1])
and ordered_columns= Some(vec![2])
will be pruned. Since they are different than ordered_columns= Some(vec![3])
. However, last group is not pruned because we still can receive values with 3 for column a
Regarding the performance downgrade. I have examined the code to see where performance downgrade occurs. In new implementation we extend
However, I cannot say definitely this has fixed the problem.
My judgement is as follows for each case
Cons of first approach
Pros of 2nd approach
Pros of 3rd approach
Cons of 3rd approach
We can pursue any one of the above approaches. If community has a preference we can pursue that approach. |
@alamb, can you try measuring again on your end? I wonder if you will find a similar result with @mustafasrepo after his last change. If you also see no (or very little) performance change, I propose we get the ball rolling by merging this. We can always refactor the code to the 2nd approach with a follow-on PR. |
# Conflicts: # datafusion/core/src/physical_plan/aggregates/mod.rs
I am running the benchmarks again |
I am running with https://github.com/alamb/datafusion-benchmarking/blob/87ee101b70b15dd4529f124d65189b0fb87e09b7/bench.sh Running on a gcp machine cat /proc/cpuinfo
...
processor : 7
vendor_id : GenuineIntel
cpu family : 6
model : 79
model name : Intel(R) Xeon(R) CPU @ 2.20GHz
stepping : 0
microcode : 0xffffffff
cpu MHz : 2200.164
cache size : 56320 KB
physical id : 0
siblings : 8
core id : 3
cpu cores : 4
apicid : 7
initial apicid : 7
fpu : yes
fpu_exception : yes
cpuid level : 13
wp : yes
flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpui\
d tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti ssbd ibrs ibpb stibp fsgsbase tsc_adj\
ust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsaveopt arat md_clear arch_capabilities
bugs : cpu_meltdown spectre_v1 spectre_v2 spec_store_bypass l1tf mds swapgs taa mmio_stale_data retbleed
bogomips : 4400.32
clflush size : 64
cache_alignment : 64
address sizes : 46 bits physical, 48 bits virtual
power management: I am hoping to make the benchmarks easier to run / reproduce. I also plan to take another close look at this PR tomorrow |
@tustvold do you have any thoughts about why this code / PR could be making the grouping operator seemingly slow down? |
This is what makes sense to me, though I am not sure what @mingmwang thinks |
FWIW an empty vector just contains a I'm not very familiar with the group by code anymore, but the use of per-group allocations does immediately stand out to me as at risk of thrashing the memory allocator, and consequently making the code not just slow but also wildly unpredictable. I'm aware this isn't something introduced by this PR, but revisiting this design (see #4973) may make it easier to make changes in this area without introducing regressions. |
I agree, we will be happy to take part in that effort. Going back to the scope of this PR: If everyone agrees, we will generate another changeset with Approach 2, and @alamb can check how the performance looks there (since our local benchmarks do not show a difference even now). We can then discuss any trade-offs w.r.t code re-use/duplication and performance differences. We can move forward with either version based on the outcome of that discussion. We can also talk about the next steps for subsequent performance work in this context. Sounds good? |
Yes, I am OK with this Option 2 as the following ticket. |
I have implemented version 2. You can find it here. I have measured its performance, result can be found below.
|
Which issue does this PR close?
Closes #5133.
Rationale for this change
A discussed in the document. If some of the expressions in the
GROUP BY
clause is already ordered. We can generate aggregator results without breaking the pipeline. Consider the query belowIf source is ordered by
a, b
. We can calculate group by values in streaming fashion (Corresponds toFully Streaming
in the document). When the value ofa, b
columns change, it means that corresponding group will not receive any more values(Otherwise it would contradict with ordering).If the source were ordered by
a
. We can calculate group by values in streaming fashion also ((Corresponds toPartial Streaming
in the document)). In this case, However, results would be generated when the value of columna
change.What changes are included in this PR?
This PR enables us to produce non-pipeline breaking results for the conditions
Fully Streaming
andPartial Streaming
. Please see the document for more detailed discussion and what these terms refer.Since the behavior of the executor changes with existing ordering (If result can be calculated in streaming fashion output will have an ordering. Otherwise output will not have an ordering). This functionality requires us to calculate
output_ordering
dynamically. For this reason, this PR accompanies the api change fromfn output_ordering(&self) -> Option<&[PhysicalSortExpr]>
tofn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>>
. To support dynamic calculation of the output ordering. See the corresponding PR for more information.Are these changes tested?
Yes
aggregate_fuzz.rs
file contains random test to check for whether streamed results and existing version produces same result.Also when run on benchmarks I have verified that there is no regression. Benchmark results can be found below
Are there any user-facing changes?
api change