Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement exact median, add AggregateState #3009

Merged
merged 23 commits into from
Aug 5, 2022
Merged

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Aug 1, 2022

Which issue does this PR close?

Closes #2925

Rationale for this change

Needed for h2o benchmarks.

What changes are included in this PR?

  • Add the ability for accumulators to return either arrays or scalar values
  • Implement new median aggregate
  • Remove a few calls to unwrap

Are there any user-facing changes?

Yes, if implementing UDAFs.

@andygrove andygrove added the api change Changes the API exposed to users of the crate label Aug 1, 2022
@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner labels Aug 1, 2022
@codecov-commenter
Copy link

codecov-commenter commented Aug 1, 2022

Codecov Report

Merging #3009 (ca9d6a9) into master (c7fa789) will decrease coverage by 0.00%.
The diff coverage is 79.79%.

@@            Coverage Diff             @@
##           master    #3009      +/-   ##
==========================================
- Coverage   85.81%   85.81%   -0.01%     
==========================================
  Files         282      286       +4     
  Lines       51531    51790     +259     
==========================================
+ Hits        44219    44441     +222     
- Misses       7312     7349      +37     
Impacted Files Coverage Δ
.../physical-expr/src/aggregate/array_agg_distinct.rs 80.18% <0.00%> (ø)
datafusion/physical-expr/src/aggregate/mod.rs 25.00% <ø> (ø)
datafusion/physical-expr/src/expressions/mod.rs 100.00% <ø> (ø)
datafusion/proto/src/from_proto.rs 35.53% <0.00%> (-0.05%) ⬇️
datafusion/proto/src/lib.rs 93.47% <0.00%> (ø)
datafusion/proto/src/to_proto.rs 53.03% <0.00%> (-0.19%) ⬇️
datafusion/physical-expr/src/aggregate/median.rs 63.85% <63.85%> (ø)
datafusion/physical-expr/src/aggregate/build_in.rs 89.92% <66.66%> (-0.26%) ⬇️
datafusion/expr/src/accumulator.rs 77.77% <77.77%> (ø)
...tafusion/core/src/physical_plan/aggregates/hash.rs 92.95% <100.00%> (ø)
... and 29 more

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@andygrove andygrove changed the title WIP: Implement exact median Implement exact median Aug 3, 2022
@andygrove andygrove marked this pull request as ready for review August 3, 2022 02:00
@andygrove andygrove requested a review from alamb August 3, 2022 02:27
@alamb
Copy link
Contributor

alamb commented Aug 3, 2022

I plan to review this carefully later today

@alamb alamb changed the title Implement exact median Implement exact median, add AggregateState Aug 3, 2022
alamb
alamb previously approved these changes Aug 3, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove -- I think this is great!

I think the way that other aggregates (like distinct) have handled the need to hold multiple values is to encode them in ScalarValue::List -- I suspect this approach will be much higher performance (and I can imagine adding other extension variants here like Box<dyn Any> or something to allow people to encode their aggregate state using whatever custom type they wanted 🤔

I am still somewhat worried about the split in Grouping that we have: a Row and a Column one -- e.g. the Row accumulator does not support median. However, I think this is tracked in #2723 and I don't think anything new is needed for this PR

https://github.com/apache/arrow-datafusion/blob/0ff59de810f344b197b2e9491a0a9aefca52d88f/datafusion/physical-expr/src/aggregate/row_accumulator.rs

But may

@@ -44,3 +44,27 @@ pub trait Accumulator: Send + Sync + Debug {
/// returns its value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;
}

#[derive(Debug)]
pub enum AggregateState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very elegant idea. Can you please add docstrings to AggregateState explaining what is going on?

I think it would be worth updating the docstrings in the accumulator trait with some discussion / examples of how to use the Array state.

datafusion/expr/src/accumulator.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/aggregate/count_distinct.rs Outdated Show resolved Hide resolved
use std::sync::Arc;

/// MEDIAN aggregate expression. This uses a lot of memory because all values need to be
/// stored in memory before a result can be computed. If an approximation is sufficient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is worth (perhaps as a follow on PR) putting a cap on the number of values DataFusion will try to buffer to compute median and throw a runtime error if that number is exceeded 🤔 That way we could avoid OOM kills

@alamb alamb dismissed their stale review August 3, 2022 20:30

I clicked "submit" too early

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks good to me (this time I actually finished the review!)

I left some suggestions on how to make the implementation possibly better, but I think any of them could be done as a follow on.

The only thing I think we should really do prior to merging this test is to add additional coverage in the sql_integration test

#[derive(Debug)]
struct MedianAccumulator {
data_type: DataType,
all_values: Vec<ArrayRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you would be better served here by using an ArrayBuilder (though I realize they are strongly typed so it might be more award -- though it is likely faster)

.map(|v| AggregateState::Array(v.clone()))
.collect();
if vec.is_empty() {
match self.data_type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to produce a single [0] element array? Wouldn't that mean that the 0 is now included in the median calculation even though it was not in the original data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These arrays have length 0. I just pushed a refactor to clean this up and make it more obvious.


fn evaluate(&self) -> Result<ScalarValue> {
match self.all_values[0].data_type() {
DataType::Int8 => median!(self, arrow::datatypes::Int8Type, Int8, 2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a macro here, I wonder if you could use the concat and take kernels

https://docs.rs/arrow/19.0.0/arrow/compute/kernels/concat/index.html
https://docs.rs/arrow/19.0.0/arrow/compute/kernels/take/index.html

Something like (untested):

let sorted = sort(concat(&self.all_values));
let len = sorted.len();
let mid = len / 2;
if len % 2 == 0 {
  let indexes: UInt64Array = [mid-1, mid].into_iter().collect();
  // 🤔  Not sure how to do an average:
  let values = average(take(sorted, indexes)) 
  ScalarValue::try_from_array(values, 0)
} else {
  ScalarValue::try_from_array(sorted, mid)
} 

But the need for an average stymies that - though I guess we could implement an average kernel in datafusion and then put it back into arrow

}

/// Combine all non-null values from provided arrays into a single array
fn combine_arrays<T: ArrowPrimitiveType>(arrays: &[ArrayRef]) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to do this with concat and take as well

Untested

let final_array = concat(arrays);
let indexes = final_array.iter().enumerate().filter_map(|(i, v)| v.map(|_| i)).collect();
take(final_array, indexes)

// specific language governing permissions and limitations
// under the License.

//! Utilities used in aggregates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -221,7 +221,7 @@ async fn csv_query_stddev_6() -> Result<()> {
}

#[tokio::test]
async fn csv_query_median_1() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, I would recommend adding a basic test in sql for a median for all the different data types that are supported (not just on aggregate_test_100 but a dedicated test setup with known data (maybe integers 10, 9, 8, ... 0)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in ef1effd

@andygrove
Copy link
Member Author

Thanks for the review @alamb. I will work on addressing the feedback over the next couple of days.

@yjshen
Copy link
Member

yjshen commented Aug 4, 2022

The main concern that departure row_aggregate from aggregate comes from the intention to do in-place updates for row-based states. I assume we could store pointers in RowLayout::WordAligned for varlena states during accumulation, finalizing and inlining them into the row-state while we are spilling later. UDAFs withBox<dyn Any> state requires more, perhaps an extra serde provided.

For the median state store, a Map<value, value_occurence_count> might be more likely to be space efficient. Though it may require more computations than the current approach, and likely not work with arrow compute kernels.

@andygrove
Copy link
Member Author

For the median state store, a Map<value, value_occurence_count> might be more likely to be space efficient. Though it may require more computations than the current approach, and likely not work with arrow compute kernels.

Clever idea. I'm not sure that would work for floating-point types?

@andygrove
Copy link
Member Author

I suspect this approach will be much higher performance

@alamb It isn't clear to me which approach you are referring to here. I assume you are saying that the approach in this PR of using Array rather than ScalarList::Vec is likely more performant?

@alamb
Copy link
Contributor

alamb commented Aug 4, 2022

@alamb It isn't clear to me which approach you are referring to here. I assume you are saying that the approach in this PR of using Array rather than ScalarList::Vec is likely more performant?

I guess I was saying that making lots of small Arrays and concatenating them together may not be all that much more performant than a Scalar::List but I haven't measured it.

Clever idea. I'm not sure that would work for floating-point types?

It would probably work but likely take up more space 😆

In general @yjshen 's approach would be good for low cardinality (relatively low numbers of distinct values) and not as good for high cardinality (relatively high numbers of distinct values) -- floating point data happens to often be quite high cardinality

@andygrove
Copy link
Member Author

@alamb I think this is ready for another look. I added tests and filed a couple of follow-on issues:

}

#[tokio::test]
async fn median_u8() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

"approx_median",
DataType::Float64,
Arc::new(Float64Array::from(vec![1.1, f64::NAN, f64::NAN, f64::NAN])),
"NaN", // probably not the desired behavior? - see https://github.com/apache/arrow-datafusion/issues/3039
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing for the win!

@@ -127,7 +127,13 @@ where
l.as_ref().parse::<f64>().unwrap(),
r.as_str().parse::<f64>().unwrap(),
);
assert!((l - r).abs() <= 2.0 * f64::EPSILON);
if l.is_nan() || r.is_nan() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@andygrove andygrove merged commit 245def0 into apache:master Aug 5, 2022
@andygrove andygrove deleted the median branch August 5, 2022 19:56
@ursabot
Copy link

ursabot commented Aug 5, 2022

Benchmark runs are scheduled for baseline = 581934d and contender = 245def0. 245def0 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for exact median aggregate function
5 participants