Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [SPARK-1328] Add vector statistics #268

Closed
wants to merge 38 commits into from

Conversation

yinxusen
Copy link
Contributor

As with the new vector system in MLlib, we find that it is good to add some new APIs to precess the RDD[Vector]. Beside, the former implementation of computeStat is not stable which could loss precision, and has the possibility to cause Nan in scientific computing, just as said in the SPARK-1328.

APIs contain:

  • rowMeans(): RDD[Double]
  • rowNorm2(): RDD[Double]
  • rowSDs(): RDD[Double]
  • colMeans(): Vector
  • colMeans(size: Int): Vector
  • colNorm2(): Vector
  • colNorm2(size: Int): Vector
  • colSDs(): Vector
  • colSDs(size: Int): Vector
  • maxOption((Vector, Vector) => Boolean): Option[Vector]
  • minOption((Vector, Vector) => Boolean): Option[Vector]
  • rowShrink(): RDD[Vector]
  • colShrink(): RDD[Vector]

This is working in process now, and some more APIs will add to LabeledPoint. Moreover, the implicit declaration will move from MLUtils to MLContext later.

@AmplabJenkins
Copy link

Merged build triggered. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Merged build started. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13565/

@mengxr
Copy link
Contributor

mengxr commented Mar 29, 2014

@yinxusen Thanks for working on this! I don't think row statistics are important because they represent values for different features. For column statistics, instead of implementing each statistic separately, we can compute all common statistics like (n, nnz, mean, variance, max, min) in a single job. This adds little overhead to the computation.

Btw, Vector.toArray is an expensive operation for SparseVector. You should use breeze's axpy to aggregate vectors.

@yinxusen
Copy link
Contributor Author

Yes, @mengxr I find it is cool to merge them together and rewrite the SD in a single pass, just like the page said. I'll fix it ASAP.

@AmplabJenkins
Copy link

Build triggered. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Build started. Build is starting -or- tests failed to complete.

@yinxusen
Copy link
Contributor Author

I remove all APIs and reserve a new one call statistics(), which can compute six statistical quantities in a single pass.

Row-wise statistical quantities are removed because they are meaningless.

Column-wise shrinkage and row-wise shrinkage are also removed, because they belong to feature/data selection. We should re-consider them and write in other semantic class such as LabeledPoint.

@AmplabJenkins
Copy link

Build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13585/

* }}},
* with the size of Vector as input parameter.
*/
def statistics(size: Int): (Vector, Vector, Double, Vector, Vector, Vector) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap the return type to a class called VectorRDDStatisticalSummary that provides mean, variance, count: Long, min, max, and std. So later we can add more summary statistics to it. I would also recommend changing the method name to summarize or summarizeStatistics.

@mengxr
Copy link
Contributor

mengxr commented Mar 30, 2014

@yinxusen I want to see whether we can improve the performance. First of all, the complexity should be O(nnz) instead of O(n d). This is a little bit tricky. Basically, we don't need to update zeros for a sparse vector. Suppose we have the following column:

1.0
0.0
2.0
0.0
3.0
0.0
0.0

To apply the formula for mean/variance, you need to do the update at the second row. This is actually not necessary because the summary statistics are invariant to the ordering. Imagine we have this column re-arranged:

1.0
2.0
3.0
0.0
0.0
0.0
0.0

which still have the same summary statistics. We can skip all zeros and do mean/variance updates based on the non-zero count instead of global count. After aggregate, we know we need to accumulate 4 zeros to the column, which is a constant-time operation.

I'm okay if you don't have enough time to make the change in this PR. Put a TODO and we can fix it later.

@yinxusen
Copy link
Contributor Author

@mengxr I am not very sure of the concept of sparse vector. In your example, do you mean the column is Vector(1.0, 0.0, 2.0, 0.0, 3.0, 0.0, 0.0) or
RDD( Vector(1.0), Vector(0.0), Vector(2.0), Vector(0.0), Vector(3.0), Vector(0.0), Vector(0.0) )?

If it is the case 1, then it is easy to rewrite it in O(nnz), otherwise, it will be difficult, because we cannot judge whether a column is sparse or not before we count the nnz. If the case 1 is your mean, then I think I should treat sparse vector different with the dense one with the following code:

RDD.take(1).head.type match { case DenseVector[Double] => xxx case SparseVector[Double] => xxx }.

@mengxr
Copy link
Contributor

mengxr commented Mar 31, 2014

@yinxusen I mean a column. You don't need to check the type as you already use activeIterator to get the non-zero elements. The approach I suggested is to only update mean/variance/min/max for non-zeros. After we scan all the items, we know how many zeros we need to append to each column, and then we do a constant-time update for each column.

@yinxusen
Copy link
Contributor Author

yinxusen commented Apr 1, 2014

@mengxr Ah... I totally understand your mean. Code is on the way.

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@AmplabJenkins
Copy link

Build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13630/

math.abs(lhs - rhs) / denominator < 0.3
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build failed might be caught by this empty line in the end. I'll fix it in next commit with other problems.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14025/

@yinxusen
Copy link
Contributor Author

Conflict in MLUtils and RowMatrix. I think it is OK now.
2014-4-11 AM5:31于 "Matei Zaharia" [email protected]写道:

Hey, unfortunately this no longer merges cleanly. Do you mind rebasing it?
I think some conflicting changes happened in MLUtils.


Reply to this email directly or view it on GitHubhttps://github.com//pull/268#issuecomment-40145491
.

@pwendell
Copy link
Contributor

Thanks, merged!

@asfgit asfgit closed this in fdfb45e Apr 12, 2014
asfgit pushed a commit that referenced this pull request Apr 12, 2014
As with the new vector system in MLlib, we find that it is good to add some new APIs to precess the `RDD[Vector]`. Beside, the former implementation of `computeStat` is not stable which could loss precision, and has the possibility to cause `Nan` in scientific computing, just as said in the [SPARK-1328](https://spark-project.atlassian.net/browse/SPARK-1328).

APIs contain:

* rowMeans(): RDD[Double]
* rowNorm2(): RDD[Double]
* rowSDs(): RDD[Double]
* colMeans(): Vector
* colMeans(size: Int): Vector
* colNorm2(): Vector
* colNorm2(size: Int): Vector
* colSDs(): Vector
* colSDs(size: Int): Vector
* maxOption((Vector, Vector) => Boolean): Option[Vector]
* minOption((Vector, Vector) => Boolean): Option[Vector]
* rowShrink(): RDD[Vector]
* colShrink(): RDD[Vector]

This is working in process now, and some more APIs will add to `LabeledPoint`. Moreover, the implicit declaration will move from `MLUtils` to `MLContext` later.

Author: Xusen Yin <[email protected]>
Author: Xiangrui Meng <[email protected]>

Closes #268 from yinxusen/vector-statistics and squashes the following commits:

d61363f [Xusen Yin] rebase to latest master
16ae684 [Xusen Yin] fix minor error and remove useless method
10cf5d3 [Xusen Yin] refine some return type
b064714 [Xusen Yin] remove computeStat in MLUtils
cbbefdb [Xiangrui Meng] update multivariate statistical summary interface and clean tests
4eaf28a [Xusen Yin] merge VectorRDDStatistics into RowMatrix
48ee053 [Xusen Yin] fix minor error
e624f93 [Xusen Yin] fix scala style error
1fba230 [Xusen Yin] merge while loop together
69e1f37 [Xusen Yin] remove lazy eval, and minor memory footprint
548e9de [Xusen Yin] minor revision
86522c4 [Xusen Yin] add comments on functions
dc77e38 [Xusen Yin] test sparse vector RDD
18cf072 [Xusen Yin] change def to lazy val to make sure that the computations in function be evaluated only once
f7a3ca2 [Xusen Yin] fix the corner case of maxmin
967d041 [Xusen Yin] full revision with Aggregator class
138300c [Xusen Yin] add new Aggregator class
1376ff4 [Xusen Yin] rename variables and adjust code
4a5c38d [Xusen Yin] add scala doc, refine code and comments
036b7a5 [Xusen Yin] fix the bug of Nan occur
f6e8e9a [Xusen Yin] add sparse vectors test
4cfbadf [Xusen Yin] fix bug of min max
4e4fbd1 [Xusen Yin] separate seqop and combop out as independent functions
a6d5a2e [Xusen Yin] rewrite for only computing non-zero elements
3980287 [Xusen Yin] rename variables
62a2c3e [Xusen Yin] use axpy and in-place if possible
9a75ebd [Xusen Yin] add case class to wrap return values
d816ac7 [Xusen Yin] remove useless APIs
c4651bb [Xusen Yin] remove row-wise APIs and refine code
1338ea1 [Xusen Yin] all-in-one version test passed
cc65810 [Xusen Yin] add parallel mean and variance
9af2e95 [Xusen Yin] refine the code style
ad6c82d [Xusen Yin] add shrink test
e09d5d2 [Xusen Yin] add scala docs and refine shrink method
8ef3377 [Xusen Yin] pass all tests
28cf060 [Xusen Yin] fix error of column means
54b19ab [Xusen Yin] add new API to shrink RDD[Vector]
8c6c0e1 [Xusen Yin] add basic statistics
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
As with the new vector system in MLlib, we find that it is good to add some new APIs to precess the `RDD[Vector]`. Beside, the former implementation of `computeStat` is not stable which could loss precision, and has the possibility to cause `Nan` in scientific computing, just as said in the [SPARK-1328](https://spark-project.atlassian.net/browse/SPARK-1328).

APIs contain:

* rowMeans(): RDD[Double]
* rowNorm2(): RDD[Double]
* rowSDs(): RDD[Double]
* colMeans(): Vector
* colMeans(size: Int): Vector
* colNorm2(): Vector
* colNorm2(size: Int): Vector
* colSDs(): Vector
* colSDs(size: Int): Vector
* maxOption((Vector, Vector) => Boolean): Option[Vector]
* minOption((Vector, Vector) => Boolean): Option[Vector]
* rowShrink(): RDD[Vector]
* colShrink(): RDD[Vector]

This is working in process now, and some more APIs will add to `LabeledPoint`. Moreover, the implicit declaration will move from `MLUtils` to `MLContext` later.

Author: Xusen Yin <[email protected]>
Author: Xiangrui Meng <[email protected]>

Closes apache#268 from yinxusen/vector-statistics and squashes the following commits:

d61363f [Xusen Yin] rebase to latest master
16ae684 [Xusen Yin] fix minor error and remove useless method
10cf5d3 [Xusen Yin] refine some return type
b064714 [Xusen Yin] remove computeStat in MLUtils
cbbefdb [Xiangrui Meng] update multivariate statistical summary interface and clean tests
4eaf28a [Xusen Yin] merge VectorRDDStatistics into RowMatrix
48ee053 [Xusen Yin] fix minor error
e624f93 [Xusen Yin] fix scala style error
1fba230 [Xusen Yin] merge while loop together
69e1f37 [Xusen Yin] remove lazy eval, and minor memory footprint
548e9de [Xusen Yin] minor revision
86522c4 [Xusen Yin] add comments on functions
dc77e38 [Xusen Yin] test sparse vector RDD
18cf072 [Xusen Yin] change def to lazy val to make sure that the computations in function be evaluated only once
f7a3ca2 [Xusen Yin] fix the corner case of maxmin
967d041 [Xusen Yin] full revision with Aggregator class
138300c [Xusen Yin] add new Aggregator class
1376ff4 [Xusen Yin] rename variables and adjust code
4a5c38d [Xusen Yin] add scala doc, refine code and comments
036b7a5 [Xusen Yin] fix the bug of Nan occur
f6e8e9a [Xusen Yin] add sparse vectors test
4cfbadf [Xusen Yin] fix bug of min max
4e4fbd1 [Xusen Yin] separate seqop and combop out as independent functions
a6d5a2e [Xusen Yin] rewrite for only computing non-zero elements
3980287 [Xusen Yin] rename variables
62a2c3e [Xusen Yin] use axpy and in-place if possible
9a75ebd [Xusen Yin] add case class to wrap return values
d816ac7 [Xusen Yin] remove useless APIs
c4651bb [Xusen Yin] remove row-wise APIs and refine code
1338ea1 [Xusen Yin] all-in-one version test passed
cc65810 [Xusen Yin] add parallel mean and variance
9af2e95 [Xusen Yin] refine the code style
ad6c82d [Xusen Yin] add shrink test
e09d5d2 [Xusen Yin] add scala docs and refine shrink method
8ef3377 [Xusen Yin] pass all tests
28cf060 [Xusen Yin] fix error of column means
54b19ab [Xusen Yin] add new API to shrink RDD[Vector]
8c6c0e1 [Xusen Yin] add basic statistics
mccheah pushed a commit to mccheah/spark that referenced this pull request Oct 12, 2017
Igosuki pushed a commit to Adikteev/spark that referenced this pull request Jul 31, 2018
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
peter-toth added a commit to peter-toth/spark that referenced this pull request Nov 26, 2024
…nt filters

This is a WIP version of apache#37630 at commit 83c59ab5e7e2abfaf83abe7ec418f30a5c7a41ea, but we introduce the `spark.cloudera.sql.advancedSubqueryMerge.enabled` (default true) to disable the feature if needed.

After apache#32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine `Filter` nodes with different conditions if those conditions can be merged in an ancestor `Aggregate` node.

Consider the following query with 2 subqueries:
```
SELECT
  (SELECT avg(a) FROM t WHERE c = 1)
  (SELECT sum(a) FROM t WHERE c = 2)
```
where the subqueries can be merged to:
```
SELECT
  avg(a) FILTER (WHERE c = 1),
  sum(b) FILTER (WHERE c = 2)
FORM t
WHERE c = 1 OR c = 2
```
After this PR the 2 subqueries are merged to this optimized form:
```
== Optimized Logical Plan ==
Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()apache#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L]
:  :- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :  +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:  :     +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :        +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :           +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
:  +- Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286]
:     +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)apache#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
:        +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:           +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:              +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
+- OneRowRelation
```
and physical form:
```
== Physical Plan ==
*(1) Project [Subquery scalar-subquery#260, [id=apache#148].avg(a) AS scalarsubquery()apache#277, ReusedSubquery Subquery scalar-subquery#260, [id=apache#148].sum(b) AS scalarsubquery()#278L]
:  :- Subquery scalar-subquery#260, [id=apache#148]
:  :  +- *(2) Project [named_struct(avg(a), avg(a)apache#268, sum(b), sum(b)#271L) AS mergedValue#286]
:  :     +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)apache#268, sum(b)#271L])
:  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143]
:  :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L])
:  :              +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285]
:  :                 +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))
:  :                    +- *(1) ColumnarToRow
:  :                       +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int>
:  +- ReusedSubquery Subquery scalar-subquery#260, [id=apache#148]
+- *(1) Scan OneRowRelation[]
```

The optimization in this PR doesn't kick in if the filters (`c = 1`, `c = 2`) are partition or bucket filters (to avoid possible performance degradation), but allows merging pushed-down data filters depending on a new `spark.sql.planMerge.ignorePushedDataFilters` config value (default `true`).

Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - Merge different filters off                   9526           9634          97          0.0   244257993.6       1.0X
[info] q9 - Merge different filters on                    3798           3881         133          0.0    97381735.1       2.5X
```
The performance improvement in case of `q9` comes from merging 15 subqueries into 1 subquery (apache#32298 was able to merge 15 subqueries into 5).

No.

Existing and new UTs.

Change-Id: Ibeab5772549660ed217707f9b7cdac39491bf096
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants