Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stddev and variance #1525

Merged
merged 22 commits into from
Jan 10, 2022
Merged

Add stddev and variance #1525

merged 22 commits into from
Jan 10, 2022

Conversation

realno
Copy link
Contributor

@realno realno commented Jan 6, 2022

Which issue does this PR close?

This PR covers one of the functions (stddev) in #1486

Rationale for this change

This change is to add functions to calculate standard deviation and variance to DataFusion.

What changes are included in this PR?

Two aggregation functions variance and stddev are added to DataFusion.
Also added some arithmetic functions (add, mul and div) to ScalarValue since they are likely to be commonly used by many other operators. There are limitations to the current form of the functions - please see the code. It may be worth creating a new issue to capture the work properly.

The algrithm used is an online implementation and numerically stable. It is based on this paper:
Welford, B. P. (1962). "Note on a method for calculating corrected sums of squares and products". Technometrics. 4 (3): 419–420. doi:10.2307/1266577. JSTOR 1266577.

Another benefit of the algorithm is it is parallelizable.

It has been analyzed here:
Ling, Robert F. (1974). "Comparison of Several Algorithms for Computing Sample Means and Variances". Journal of the American Statistical Association. 69 (348): 859–866. doi:10.2307/2286154. JSTOR 2286154.

Some limitations:

  1. The implemetation doesn't support decimal type for now. Here are the reasons after some evaluation:

There is yet an officially stable Decimal library for Rust. There are a few: bigdecimal, rust-decimal but it is not clear if they are battle tested. Also the interface is slightly different than the Decimal defined in Arrow.

It feels more natrual to include some of the arithmetic operators either in core Arrow or somewhere in ScalarValue. I think it deserves a discussion before start the implemention.

  1. batch_update and batch_merge use default implementation as for now because of the nature of the algorithm so there may be room for performance improvement with batch mode.

Are there any user-facing changes?

No change to existing functionalities or APIs. Six new functions are added and are available through SQL interface.

The functions added are following:

var(col1): calculate the variance (sample) of col1
var_samp(col1): calculate the variance (sample) of col1
var_pop(col1): calculate the variance (population) of col1
stddev(col1): calculate the standard deviation (sample) of col1
stddev_samp(col1): calculate the standard deviation (sample) of col1
stddev_pop(col1): calculate the standard deviation (population) of col1

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Jan 6, 2022
@houqp houqp added the enhancement New feature or request label Jan 7, 2022
Copy link
Contributor

@liukun4515 liukun4515 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will review it carefully again after learn the algorithm about these two functions.

@matthewmturner
Copy link
Contributor

Perhaps @Dandandan would be interested in this as he was involved in db-benchmark which this will help

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really nice piece of work @realno - thank you so much ❤️

I especially like the thorough testing.

I did not review the referenced papers, but I did run some basic tests against postgres and I got some strange results

cargo run datafusion-cli

Sometimes the answers are different from query to query

❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
+--------------------+
| STDDEV(sq.column1) |
+--------------------+
| 0.7760297817881877 |
+--------------------+
1 row in set. Query took 0.008 seconds.
❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
+--------------------+
| STDDEV(sq.column1) |
+--------------------+
| NaN                |
+--------------------+

And neither of those answers matches postgres:

alamb=# select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
         stddev         
------------------------
 0.95043849529221686157
(1 row)

Postgres and google sheets do match
Screen Shot 2022-01-07 at 4 15 53 PM

datafusion/src/scalar.rs Show resolved Hide resolved
datafusion/src/scalar.rs Show resolved Hide resolved
datafusion/src/scalar.rs Show resolved Hide resolved
datafusion/src/scalar.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/expressions/mod.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/expressions/mod.rs Outdated Show resolved Hide resolved

if !is_empty {
let new_count = self.count + 1;
let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ScalarValues like this to accumulate each operation is likely to be quite slow during runtime. However, I think it would be fine to put in as a first initial implementation and then implement an optimized version using update_batch and arrow compute kernels as a follow on PR

    /// updates the accumulator's state from a vector of arrays.
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree. I have plan to investigate this as followup PR. The current challenge is the algorithm will loose parallelizability if using a batch friendly algorithm. And I need to spend more time to understand the code. One question I have is will there be a chance update and batch_update can be used in the same job, i.e. if one job can call update on some data and batch_update on some other data. Reason for that is the online version of the algorithm requires an intermediate value to be calculated so it is not compatible with batch mode, that is, we can only do all batch or all online.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand the discussion here about online and parallelism with respect to update and update_batch

update_batch is actually what the HashAggregator calls during execution. The default implementation of update_batch simply iterates over each row of the input array, converting it to a ScalarValue, and calls update convert the https://github.com/apache/arrow-datafusion/blob/415c5e124af18a05500514f78604366d860dcf5a/datafusion/src/physical_plan/mod.rs#L572-L583

My mental picture of aggregation is like this:

┌───────────────────────┐             ┌──────────────────────┐                                       
│                       │             │                      │                                       
│  Input RecordBatch 0  │ update_batch│     Accumulator      │                                       
│                       │────────────▶│                      │───────┐                               
│                       │             │                      │       │                               
└───────────────────────┘             └──────────────────────┘       │                               
                                                                     │                               
                                                                     │     merge                     
                                                                     │                               
┌───────────────────────┐             ┌──────────────────────┐       │       ┌──────────────────────┐
│                       │             │                      │       │       │                      │
│  Input RecordBatch 1  │ update_batch│     Accumulator      │       │       │     Accumulator      │
│                       │────────────▶│                      │───────┼──────▶│                      │
│                       │             │                      │       │       │                      │
└───────────────────────┘             └──────────────────────┘       │       └──────────────────────┘
                                                                     │                               
                                                                     │                               
                                                                     │                               
                                                                     │                               
           ...                                   ...                 │                               
                                                                     │                               
                                                                     │                               
                                                                     │                               
┌───────────────────────┐             ┌──────────────────────┐       │                               
│                       │             │                      │       │                               
│  Input RecordBatch N  │ update_batch│     Accumulator      │       │                               
│                       │────────────▶│                      │───────┘                               
│                       │             │                      │                                       
└───────────────────────┘             └──────────────────────┘                                       

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry let me try to explain it a bit better.

The online algorithm uses an intermediate value for each record called m2. Here is how m2 is calculated:
m2(n) = m2(n-1) + (delta1*delta2)
So the calculation of m2(n) depends on the value of m2(n-1). And it is similar for delta1 and delta2 - they depends on mean(n-1). i.e. the algorithm is iterative. So what I meant was this algorithm can't benefit from complete vectorization - for each batch it has to be iterative.

But I see your point that we can avoid per-record type conversion and matching since in batch_update we can use the raw array. I think I can make the change in this PR. Thanks for pointing it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what I meant was this algorithm can't benefit from complete vectorization - for each batch it has to be iterative.

Ah that makes sense 👍

@realno realno closed this Jan 7, 2022
@realno realno reopened this Jan 7, 2022
@realno
Copy link
Contributor Author

realno commented Jan 8, 2022

@alamb All the comments are addressed, there are some questions/discussion please take a look when you have time. Otherwise the PR is ready.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the changes and ran some new tests -- very nice @realno

I think this PR is good enough to merge as is -- it is well tested and adds a new feature.

My only concern is the introduction of ScalarValue arithmetic functions rather than using the arrow compute kernels. I would love to know what @jimexist or @Dandandan thinks.

The way to avoid adding more logic to ScalarValue is to implement update_batch for VarianceAccumulator in terms of Arrays (and change update to convert its values into arrays). Perhaps we could merge this PR in with this approach, and file a follow on ticket (I would be happy to do so) to remove scalar value math and improve the performance


if !is_empty {
let new_count = self.count + 1;
let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand the discussion here about online and parallelism with respect to update and update_batch

update_batch is actually what the HashAggregator calls during execution. The default implementation of update_batch simply iterates over each row of the input array, converting it to a ScalarValue, and calls update convert the https://github.com/apache/arrow-datafusion/blob/415c5e124af18a05500514f78604366d860dcf5a/datafusion/src/physical_plan/mod.rs#L572-L583

My mental picture of aggregation is like this:

┌───────────────────────┐             ┌──────────────────────┐                                       
│                       │             │                      │                                       
│  Input RecordBatch 0  │ update_batch│     Accumulator      │                                       
│                       │────────────▶│                      │───────┐                               
│                       │             │                      │       │                               
└───────────────────────┘             └──────────────────────┘       │                               
                                                                     │                               
                                                                     │     merge                     
                                                                     │                               
┌───────────────────────┐             ┌──────────────────────┐       │       ┌──────────────────────┐
│                       │             │                      │       │       │                      │
│  Input RecordBatch 1  │ update_batch│     Accumulator      │       │       │     Accumulator      │
│                       │────────────▶│                      │───────┼──────▶│                      │
│                       │             │                      │       │       │                      │
└───────────────────────┘             └──────────────────────┘       │       └──────────────────────┘
                                                                     │                               
                                                                     │                               
                                                                     │                               
                                                                     │                               
           ...                                   ...                 │                               
                                                                     │                               
                                                                     │                               
                                                                     │                               
┌───────────────────────┐             ┌──────────────────────┐       │                               
│                       │             │                      │       │                               
│  Input RecordBatch N  │ update_batch│     Accumulator      │       │                               
│                       │────────────▶│                      │───────┘                               
│                       │             │                      │                                       
└───────────────────────┘             └──────────────────────┘                                       

datafusion/src/scalar.rs Show resolved Hide resolved
@realno
Copy link
Contributor Author

realno commented Jan 8, 2022

@alamb thanks for the thorough review! After it is merged I will open a followup PR to add batch_update and batch_merge as discussed.

I want to discuss the aggregator interface a bit further. The update function doesn't seem to provide much value with the current architecture, it seems under all circumstances batch_update will perform better. Would it be a reasonable choice to just keep batch_update (and remove update)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if update can provide additional value such as lower latency in streaming context.

With that said I am still new to the code base and could have missed something here. It would be nice to hear more people's opinion. Also please let me know if there is a more appropriate forum to discuss this.

@alamb
Copy link
Contributor

alamb commented Jan 9, 2022

I want to discuss the aggregator interface a bit further. The update function doesn't seem to provide much value with the current architecture, it seems under all circumstances batch_update will perform better. Would it be a reasonable choice to just keep batch_update (and remove update)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if update can provide additional value such as lower latency in streaming context.

I another other rationale for update is to lower the barrier to entry for new users to write Accumulators (otherwise you have to understand Arrow arrays and compute kernels, etc to write one), as well as being backwards compatible with the original interface

Perhaps we could add some comments to update clarifying that for aggregators should prefer to implement batch_update and update is present only for backwards compatibility / ease of use?

@realno
Copy link
Contributor Author

realno commented Jan 9, 2022

I want to discuss the aggregator interface a bit further. The update function doesn't seem to provide much value with the current architecture, it seems under all circumstances batch_update will perform better. Would it be a reasonable choice to just keep batch_update (and remove update)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if update can provide additional value such as lower latency in streaming context.

I another other rationale for update is to lower the barrier to entry for new users to write Accumulators (otherwise you have to understand Arrow arrays and compute kernels, etc to write one), as well as being backwards compatible with the original interface

Perhaps we could add some comments to update clarifying that for aggregators should prefer to implement batch_update and update is present only for backwards compatibility / ease of use?

I see, that makes sense. It is a good idea to add some comments.

@alamb
Copy link
Contributor

alamb commented Jan 10, 2022

Unless anyone has another other thoughts on this PR I'll plan to merge it later today

@alamb alamb merged commit 90de12a into apache:master Jan 10, 2022
@alamb
Copy link
Contributor

alamb commented Jan 10, 2022

Thanks again @realno -- would you like me to file a ticket for the the follow on work?

(specifically removing ScalarValue::add etc and implementing update_batch instead?

@alamb
Copy link
Contributor

alamb commented Jan 10, 2022

Perhaps we could add some comments to update clarifying that for aggregators should prefer to implement batch_update and update is present only for backwards compatibility / ease of use?

I see, that makes sense. It is a good idea to add some comments.

I proposed some doc clarification in #1542

@alamb alamb changed the title Add stddev operator Add stddev and variance Jan 10, 2022
@realno
Copy link
Contributor Author

realno commented Jan 11, 2022

Thanks again @realno -- would you like me to file a ticket for the the follow on work?

(specifically removing ScalarValue::add etc and implementing update_batch instead?

I have the changes ready, will open a PR today. If you prefer to have a ticket as reference I can create it too.

@realno
Copy link
Contributor Author

realno commented Jan 11, 2022

FYI @alamb , the PR is ready: #1547

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants