Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14352][SQL] approxQuantile should support multi columns #12135

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

import sys
import warnings
import random

if sys.version >= '3':
Expand Down Expand Up @@ -1348,7 +1347,7 @@ def replace(self, to_replace, value, subset=None):
@since(2.0)
def approxQuantile(self, col, probabilities, relativeError):
"""
Calculates the approximate quantiles of a numerical column of a
Calculates the approximate quantiles of numerical columns of a
DataFrame.

The result of this algorithm has the following deterministic bound:
Expand All @@ -1365,18 +1364,41 @@ def approxQuantile(self, col, probabilities, relativeError):
Space-efficient Online Computation of Quantile Summaries]]
by Greenwald and Khanna.

:param col: the name of the numerical column
Note that rows containing any null values will be removed before calculation.

:param col: str, list.
Can be a single column name, or a list of names for multiple columns.
:param probabilities: a list of quantile probabilities
Each number must belong to [0, 1].
For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
:param relativeError: The relative target precision to achieve
(>= 0). If set to zero, the exact quantiles are computed, which
could be very expensive. Note that values greater than 1 are
accepted but give the same result as 1.
:return: the approximate quantiles at the given probabilities
:return: the approximate quantiles at the given probabilities. If
the input `col` is a string, the output is a list of float. If the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

float should be pluralized (e.g. is a list of floats)

input `col` is a list or tuple of strings, the output is also a
list, but each element in it is a list of float, i.e., the output
is a list of list of float.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to have a versionchanged:: directive to explain that in 2.1 we added support for multiple columns - but it probably shouldn't block or anything like that just could maybe make things a bit clearer for people reading the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. I have added a brief versionchanged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also float -> floats here


.. versionchanged:: 2.2
Added support for multiple columns.
"""
if not isinstance(col, str):
raise ValueError("col should be a string.")

if not isinstance(col, (str, list, tuple)):
raise ValueError("col should be a string, list or tuple, but got %r" % type(col))

isStr = isinstance(col, str)

if isinstance(col, tuple):
col = list(col)
elif isinstance(col, str):
col = [col]

for c in col:
if not isinstance(c, str):
raise ValueError("columns should be strings, but got %r" % type(c))
col = _to_list(self._sc, col)

if not isinstance(probabilities, (list, tuple)):
raise ValueError("probabilities should be a list or tuple")
Expand All @@ -1392,7 +1414,8 @@ def approxQuantile(self, col, probabilities, relativeError):
relativeError = float(relativeError)

jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError)
return list(jaq)
jaq_list = [list(j) for j in jaq]
return jaq_list[0] if isStr else jaq_list

@since(1.4)
def corr(self, col1, col2, method=None):
Expand Down
23 changes: 22 additions & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,11 +874,32 @@ def test_first_last_ignorenulls(self):
self.assertEqual([Row(a=None, b=1, c=None, d=98)], df3.collect())

def test_approxQuantile(self):
df = self.sc.parallelize([Row(a=i) for i in range(10)]).toDF()
df = self.sc.parallelize([Row(a=i, b=i+10) for i in range(10)]).toDF()
aq = df.stat.approxQuantile("a", [0.1, 0.5, 0.9], 0.1)
self.assertTrue(isinstance(aq, list))
self.assertEqual(len(aq), 3)
self.assertTrue(all(isinstance(q, float) for q in aq))
aqs = df.stat.approxQuantile(["a", "b"], [0.1, 0.5, 0.9], 0.1)
self.assertTrue(isinstance(aqs, list))
self.assertEqual(len(aqs), 2)
self.assertTrue(isinstance(aqs[0], list))
self.assertEqual(len(aqs[0]), 3)
self.assertTrue(all(isinstance(q, float) for q in aqs[0]))
self.assertTrue(isinstance(aqs[1], list))
self.assertEqual(len(aqs[1]), 3)
self.assertTrue(all(isinstance(q, float) for q in aqs[1]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually one minor thing on the Python side, since we do check the type and explicitily throw an error, maybe it would be good to have a test that asserts we get the error we are expecting ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Good point. I add the error checking in the tests, BTW I add some tests for the tuple input type. Thanks for reviewing!

aqt = df.stat.approxQuantile(("a", "b"), [0.1, 0.5, 0.9], 0.1)
self.assertTrue(isinstance(aqt, list))
self.assertEqual(len(aqt), 2)
self.assertTrue(isinstance(aqt[0], list))
self.assertEqual(len(aqt[0]), 3)
self.assertTrue(all(isinstance(q, float) for q in aqt[0]))
self.assertTrue(isinstance(aqt[1], list))
self.assertEqual(len(aqt[1]), 3)
self.assertTrue(all(isinstance(q, float) for q in aqt[1]))
self.assertRaises(ValueError, lambda: df.stat.approxQuantile(123, [0.1, 0.9], 0.1))
self.assertRaises(ValueError, lambda: df.stat.approxQuantile(("a", 123), [0.1, 0.9], 0.1))
self.assertRaises(ValueError, lambda: df.stat.approxQuantile(["a", 123], [0.1, 0.9], 0.1))

def test_corr(self):
import math
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.stat._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch}

Expand Down Expand Up @@ -74,14 +75,44 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
Seq(col), probabilities, relativeError).head.toArray
}

/**
* Calculates the approximate quantiles of numerical columns of a DataFrame.
* @see [[DataFrameStatsFunctions.approxQuantile(col:Str* approxQuantile]] for
* detailed description.
*
* Note that rows containing any null or NaN values values will be removed before
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

values values -> values

@zhengruifeng Could you submit a follow-up PR to add test cases for null values?

* calculation.
* @param cols the names of the numerical columns
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if the users provide the number that is not in this boundary? Do we have a test case to verify the behavior?

* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (>= 0).
Copy link
Member

@HyukjinKwon HyukjinKwon Feb 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a kind comment for the future changes and to inform as I know it is super easy for javadoc8 to be broken, It seems javadoc8 complains it as below:

[error] .../spark/sql/core/target/java/org/apache/spark/sql/DataFrameStatFunctions.java:43: error: unexpected content
[error]    * @see {@link DataFrameStatsFunctions.approxQuantile(col:Str* approxQuantile} for
[error]      ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/DataFrameStatFunctions.java:52: error: bad use of '>'
[error]    * @param relativeError The relative target precision to achieve (>= 0).
[error]

We could do this as

@param relativeError The relative target precision to achieve (greater or equal to 0).

and fix the link as below If there is no better choice:

@see `DataFrameStatsFunctions.approxQuantile` for detailed description.

Just FYI, there are several cases in #16013

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these just warnings generated? It would be nice to know during Jenkins testing if javadoc8 (or scaladoc for that matter) breaks.

The 2nd case links nicely to the single-arg version of the method, which contains the detailed doc, in Scaladoc. Pity it won't work with javadoc - is there another way to link it correctly? I suspect that what will work for javadoc will break the link for scaladoc...

Copy link
Member

@HyukjinKwon HyukjinKwon Feb 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea.. so, kindly @jkbradley opened a JIRA here - http://issues.apache.org/jira/browse/SPARK-18692

Actually, they are errors that make documentation building failed in javadoc8. I and many guys had a hard time to figure that out a good way AKAIK (honestly, I would like to say that I have tried all the combination I could think. To make it worse, it seems case-by-case up to my observation and tests) and it kind of ended up with the one above.. as we are anyway going to drop Java 7 support in near future up to my knowledge.

Copy link
Member

@HyukjinKwon HyukjinKwon Feb 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I will ping you if I happen to find another good way to make some links for both.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW, IMHO, at least for now, building javadoc everytime might be good to do but not required. We can avoid them at our best in our PRs and then sweep them when the release is close or in other related PRs if there are.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create an issue to build javadoc with Java 8 to Jenkins then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that JIRA is actually here - https://issues.apache.org/jira/browse/SPARK-18692 if we are talking about the same thing :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, sorry the comments imply building it separately from the main jenkins build, but if we want to avoid breaking Java 8 unidoc I was thinking building it as part of the normal PR build process would be better. Regardless lets move discussion over to that JIRA :)

* If set to zero, the exact quantiles are computed, which could be very expensive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is also missing.

Actually, you also need to consider the illegal cases, like negative values.

* Note that values greater than 1 are accepted but give the same result as 1.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like you did not add any test case to verify it.

* @return the approximate quantiles at the given probabilities of each column
*
* @note Rows containing any NaN values will be removed before calculation
*
* @since 2.2.0
*/
def approxQuantile(
cols: Array[String],
probabilities: Array[Double],
relativeError: Double): Array[Array[Double]] = {
StatFunctions.multipleApproxQuantiles(df.select(cols.map(col): _*).na.drop(), cols,
probabilities, relativeError).map(_.toArray).toArray
}


/**
* Python-friendly version of [[approxQuantile()]]
*/
private[spark] def approxQuantile(
col: String,
cols: List[String],
probabilities: List[Double],
relativeError: Double): java.util.List[Double] = {
approxQuantile(col, probabilities.toArray, relativeError).toList.asJava
relativeError: Double): java.util.List[java.util.List[Double]] = {
approxQuantile(cols.toArray, probabilities.toArray, relativeError)
.map(_.toList.asJava).toList.asJava
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indent is not right.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,26 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
assert(math.abs(s2 - q2 * n) < error_single)
assert(math.abs(d1 - 2 * q1 * n) < error_double)
assert(math.abs(d2 - 2 * q2 * n) < error_double)

// Multiple columns
val Array(Array(ms1, ms2), Array(md1, md2)) =
df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), epsilon)

assert(math.abs(ms1 - q1 * n) < error_single)
assert(math.abs(ms2 - q2 * n) < error_single)
assert(math.abs(md1 - 2 * q1 * n) < error_double)
assert(math.abs(md2 - 2 * q2 * n) < error_double)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just add a multi-column NaN test too.

}
// test approxQuantile on NaN values
val dfNaN = Seq(Double.NaN, 1.0, Double.NaN, Double.NaN).toDF("input")
val resNaN = dfNaN.stat.approxQuantile("input", Array(q1, q2), epsilons.head)
assert(resNaN.count(_.isNaN) === 0)
// test approxQuantile on multi-column NaN values
val dfNaN2 = Seq((Double.NaN, 1.0), (1.0, 1.0), (-1.0, Double.NaN), (Double.NaN, Double.NaN))
.toDF("input1", "input2")
val resNaN2 = dfNaN2.stat.approxQuantile(Array("input1", "input2"),
Array(q1, q2), epsilons.head)
assert(resNaN2.flatten.count(_.isNaN) === 0)
}

test("crosstab") {
Expand Down