From 42c26bdd5a3ac24bbe3a4101ee252c072373efe6 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 23 Feb 2017 15:44:55 -0800 Subject: [PATCH 1/8] changes --- .../apache/spark/ml/stat/Correlations.scala | 25 ++++++ .../org/apache/spark/ml/stat/Statistics.scala | 86 +++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala new file mode 100644 index 0000000000000..34df85e52fd60 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +/** + * + */ +object Correlations { + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala new file mode 100644 index 0000000000000..8b3b0c8427aa5 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row} + +/** + * API for statistical functions in MLlib, compatible with Dataframes and Datasets. + * + * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset.stat]] + * to MLlib's Vector types. + */ +@Since("2.2.0") +object Statistics { + + /** + * Compute the correlation matrix for the input RDD of Vectors using the specified method. + * Methods currently supported: `pearson` (default), `spearman`. + * + * @param dataset a dataset or a dataframe + * @param column the name of the column of vectors for which the correlation coefficient needs + * to be computed. This must be a column of the dataset, and it must contain + * Vector objects. + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + * @return A dataframe that contains the correlation matrix of the column of vectors. This + * dataframe contains a single row and a single column of name + * '$METHODNAME($COLUMN)'. + * @throws IllegalArgumentException if the column is not a valid column in the dataset, or if + * the content of this column is not of type Vector. + * + * Here is how to access the correlation coefficient: + * {{{ + * val data: Dataset[Vector] = ... + * val Row(coeff: Matrix) = Statistics.corr(data, "value").head + * // coeff now contains the Pearson correlation matrix. + * }}} + * + * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to + * avoid recomputing the common lineage. + */ + // TODO: how do we handle missing values? + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { + val rdd = dataset.select(column).map { case Row(v: Vector) => OldVectors.fromML(v)} .rdd + val oldM = OldStatistics.corr(rdd, method) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, new VectorUDT, nullable = true))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + } + + /** + * Compute the correlation matrix for the input Dataset of Vectors. + * @param dataset a dataset or dataframe + * @param column a column of this dataset + * @return + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String): DataFrame = { + corr(dataset, column, "pearson") + } +} From d9f6a6c6c2457920fbf1002da5da62ff7d29b46c Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Tue, 28 Feb 2017 16:31:19 -0800 Subject: [PATCH 2/8] commit --- .../org/apache/spark/ml/stat/Statistics.scala | 12 ++- .../spark/ml/stat/StatisticsSuite.scala | 102 ++++++++++++++++++ 2 files changed, 110 insertions(+), 4 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala index 8b3b0c8427aa5..45a14d9cb7b03 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala @@ -20,11 +20,12 @@ package org.apache.spark.ml.stat import scala.collection.JavaConverters._ import org.apache.spark.annotation.Since -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.mllib.stat.{Statistics => OldStatistics} -import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.types.{StructField, StructType} /** * API for statistical functions in MLlib, compatible with Dataframes and Datasets. @@ -66,10 +67,13 @@ object Statistics { // TODO: how do we handle missing values? @Since("2.2.0") def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { - val rdd = dataset.select(column).map { case Row(v: Vector) => OldVectors.fromML(v)} .rdd + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) +// case r: GenericRowWithSchema => OldVectors.fromML(r.getAs[Vector](0)) + } val oldM = OldStatistics.corr(rdd, method) val name = s"$method($column)" - val schema = StructType(Array(StructField(name, new VectorUDT, nullable = true))) + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = true))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala new file mode 100644 index 0000000000000..8555098cf8d39 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import breeze.linalg.{DenseMatrix => BDM, Matrix => BM} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.Matrix +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Row} + + +class StatisticsSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { + + import StatisticsSuite._ + + val xData = Array(1.0, 0.0, -2.0) + val yData = Array(4.0, 5.0, 3.0) + val zeros = new Array[Double](3) + val data = Seq( + Vectors.dense(1.0, 0.0, 0.0, -2.0), + Vectors.dense(4.0, 5.0, 0.0, 3.0), + Vectors.dense(6.0, 7.0, 0.0, 8.0), + Vectors.dense(9.0, 0.0, 0.0, 1.0) + ) + + private def X = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") + + private def extract(df: DataFrame): BDM[Double] = { + val Array(Row(mat: Matrix)) = df.collect() + mat.asBreeze.toDenseMatrix + } + + + test("corr(X) default, pearson") { + val defaultMat = Statistics.corr(X, "features") + val pearsonMat = Statistics.corr(X, "features", "pearson") + // scalastyle:off + val expected = BDM( + (1.00000000, 0.05564149, Double.NaN, 0.4004714), + (0.05564149, 1.00000000, Double.NaN, 0.9135959), + (Double.NaN, Double.NaN, 1.00000000, Double.NaN), + (0.40047142, 0.91359586, Double.NaN, 1.0000000)) + // scalastyle:on + + assert(matrixApproxEqual(extract(defaultMat), expected)) + assert(matrixApproxEqual(extract(pearsonMat), expected)) + } + + test("corr(X) spearman") { + val spearmanMat = Statistics.corr(X, "features", "spearman") + // scalastyle:off + val expected = BDM( + (1.0000000, 0.1054093, Double.NaN, 0.4000000), + (0.1054093, 1.0000000, Double.NaN, 0.9486833), + (Double.NaN, Double.NaN, 1.00000000, Double.NaN), + (0.4000000, 0.9486833, Double.NaN, 1.0000000)) + // scalastyle:on + assert(matrixApproxEqual(extract(spearmanMat), expected)) + } + +} + + +object StatisticsSuite extends Logging { + + def approxEqual(v1: Double, v2: Double, threshold: Double = 1e-6): Boolean = { + if (v1.isNaN) { + v2.isNaN + } else { + math.abs(v1 - v2) <= threshold + } + } + + def matrixApproxEqual(A: BM[Double], B: BM[Double], threshold: Double = 1e-6): Boolean = { + for (i <- 0 until A.rows; j <- 0 until A.cols) { + if (!approxEqual(A(i, j), B(i, j), threshold)) { + logInfo("i, j = " + i + ", " + j + " actual: " + A(i, j) + " expected:" + B(i, j)) + return false + } + } + true + } + +} \ No newline at end of file From 7d4ccfef4e6d3a7b65c3cca149afb250414aea4c Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Tue, 28 Feb 2017 16:31:34 -0800 Subject: [PATCH 3/8] Cleanup --- mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala index 45a14d9cb7b03..3b5b34a938115 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala @@ -24,7 +24,6 @@ import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.mllib.stat.{Statistics => OldStatistics} import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{StructField, StructType} /** From a85a889b341b99393793b7558691d4b3029157ed Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Wed, 15 Mar 2017 16:25:05 -0700 Subject: [PATCH 4/8] moving code --- .../org/apache/spark/ml/stat/Statistics.scala | 15 +++--- .../spark/ml/stat/StatisticsSuite.scala | 28 ++-------- .../apache/spark/ml/util/LinalgUtils.scala | 54 +++++++++++++++++++ 3 files changed, 64 insertions(+), 33 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala index 3b5b34a938115..d9ac493468df9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.stat import scala.collection.JavaConverters._ -import org.apache.spark.annotation.Since +import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.mllib.stat.{Statistics => OldStatistics} @@ -30,17 +30,18 @@ import org.apache.spark.sql.types.{StructField, StructType} * API for statistical functions in MLlib, compatible with Dataframes and Datasets. * * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset.stat]] - * to MLlib's Vector types. + * to spark.ml's Vector types. */ @Since("2.2.0") +@Experimental object Statistics { /** * Compute the correlation matrix for the input RDD of Vectors using the specified method. * Methods currently supported: `pearson` (default), `spearman`. * - * @param dataset a dataset or a dataframe - * @param column the name of the column of vectors for which the correlation coefficient needs + * @param dataset A dataset or a dataframe + * @param column The name of the column of vectors for which the correlation coefficient needs * to be computed. This must be a column of the dataset, and it must contain * Vector objects. * @param method String specifying the method to use for computing correlation. @@ -63,12 +64,10 @@ object Statistics { * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to * avoid recomputing the common lineage. */ - // TODO: how do we handle missing values? @Since("2.2.0") def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { val rdd = dataset.select(column).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) -// case r: GenericRowWithSchema => OldVectors.fromML(r.getAs[Vector](0)) } val oldM = OldStatistics.corr(rdd, method) val name = s"$method($column)" @@ -78,8 +77,8 @@ object Statistics { /** * Compute the correlation matrix for the input Dataset of Vectors. - * @param dataset a dataset or dataframe - * @param column a column of this dataset + * @param dataset A dataset or dataframe + * @param column A column of this dataset * @return */ @Since("2.2.0") diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala index 8555098cf8d39..a97f4eb3ad3a9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala @@ -17,19 +17,20 @@ package org.apache.spark.ml.stat -import breeze.linalg.{DenseMatrix => BDM, Matrix => BM} +import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Matrix import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.util.LinalgUtils import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} class StatisticsSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { - import StatisticsSuite._ + import LinalgUtils._ val xData = Array(1.0, 0.0, -2.0) val yData = Array(4.0, 5.0, 3.0) @@ -77,26 +78,3 @@ class StatisticsSuite extends SparkFunSuite with MLlibTestSparkContext with Logg } } - - -object StatisticsSuite extends Logging { - - def approxEqual(v1: Double, v2: Double, threshold: Double = 1e-6): Boolean = { - if (v1.isNaN) { - v2.isNaN - } else { - math.abs(v1 - v2) <= threshold - } - } - - def matrixApproxEqual(A: BM[Double], B: BM[Double], threshold: Double = 1e-6): Boolean = { - for (i <- 0 until A.rows; j <- 0 until A.cols) { - if (!approxEqual(A(i, j), B(i, j), threshold)) { - logInfo("i, j = " + i + ", " + j + " actual: " + A(i, j) + " expected:" + B(i, j)) - return false - } - } - true - } - -} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala new file mode 100644 index 0000000000000..ee3a97a90ce93 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.util + +import breeze.linalg.{Matrix => BM} + +import org.apache.spark.internal.Logging + +/** + * Utility test methods for linear algebra. + */ +object LinalgUtils extends Logging { + + + /** + * Returns true if two numbers are equal up to some tolerance. + */ + def approxEqual(v1: Double, v2: Double, threshold: Double = 1e-6): Boolean = { + if (v1.isNaN) { + v2.isNaN + } else { + math.abs(v1 - v2) <= threshold + } + } + + /** + * Returns true if two numbers are equal coefficient-wise up to some tolerance. + */ + def matrixApproxEqual(A: BM[Double], B: BM[Double], threshold: Double = 1e-6): Boolean = { + for (i <- 0 until A.rows; j <- 0 until A.cols) { + if (!approxEqual(A(i, j), B(i, j), threshold)) { + logInfo("i, j = " + i + ", " + j + " actual: " + A(i, j) + " expected:" + B(i, j)) + return false + } + } + true + } + +} From 2aeb6ee142fe8836eecaac1414f46715fd36cc24 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Wed, 15 Mar 2017 16:27:39 -0700 Subject: [PATCH 5/8] renaming --- .../apache/spark/ml/stat/Correlations.scala | 63 +++++++++++++ .../org/apache/spark/ml/stat/Statistics.scala | 88 ------------------- ...icsSuite.scala => CorrelationsSuite.scala} | 8 +- 3 files changed, 67 insertions(+), 92 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala rename mllib/src/test/scala/org/apache/spark/ml/stat/{StatisticsSuite.scala => CorrelationsSuite.scala} (90%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala index 34df85e52fd60..f272125c75864 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala @@ -17,9 +17,72 @@ package org.apache.spark.ml.stat +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.types.{StructField, StructType} + /** + * API for statistical functions in MLlib, compatible with Dataframes and Datasets. * + * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset.stat]] + * to spark.ml's Vector types. */ +@Since("2.2.0") +@Experimental object Correlations { + /** + * Compute the correlation matrix for the input RDD of Vectors using the specified method. + * Methods currently supported: `pearson` (default), `spearman`. + * + * @param dataset A dataset or a dataframe + * @param column The name of the column of vectors for which the correlation coefficient needs + * to be computed. This must be a column of the dataset, and it must contain + * Vector objects. + * @param method String specifying the method to use for computing correlation. + * Supported: `pearson` (default), `spearman` + * @return A dataframe that contains the correlation matrix of the column of vectors. This + * dataframe contains a single row and a single column of name + * '$METHODNAME($COLUMN)'. + * @throws IllegalArgumentException if the column is not a valid column in the dataset, or if + * the content of this column is not of type Vector. + * + * Here is how to access the correlation coefficient: + * {{{ + * val data: Dataset[Vector] = ... + * val Row(coeff: Matrix) = Statistics.corr(data, "value").head + * // coeff now contains the Pearson correlation matrix. + * }}} + * + * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to + * avoid recomputing the common lineage. + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { + val rdd = dataset.select(column).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) + } + val oldM = OldStatistics.corr(rdd, method) + val name = s"$method($column)" + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = true))) + dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) + } + + /** + * Compute the correlation matrix for the input Dataset of Vectors. + * @param dataset A dataset or dataframe + * @param column A column of this dataset + * @return + */ + @Since("2.2.0") + def corr(dataset: Dataset[_], column: String): DataFrame = { + corr(dataset, column, "pearson") + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala deleted file mode 100644 index d9ac493468df9..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Statistics.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.stat - -import scala.collection.JavaConverters._ - -import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} -import org.apache.spark.mllib.linalg.{Vectors => OldVectors} -import org.apache.spark.mllib.stat.{Statistics => OldStatistics} -import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.types.{StructField, StructType} - -/** - * API for statistical functions in MLlib, compatible with Dataframes and Datasets. - * - * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset.stat]] - * to spark.ml's Vector types. - */ -@Since("2.2.0") -@Experimental -object Statistics { - - /** - * Compute the correlation matrix for the input RDD of Vectors using the specified method. - * Methods currently supported: `pearson` (default), `spearman`. - * - * @param dataset A dataset or a dataframe - * @param column The name of the column of vectors for which the correlation coefficient needs - * to be computed. This must be a column of the dataset, and it must contain - * Vector objects. - * @param method String specifying the method to use for computing correlation. - * Supported: `pearson` (default), `spearman` - * @return A dataframe that contains the correlation matrix of the column of vectors. This - * dataframe contains a single row and a single column of name - * '$METHODNAME($COLUMN)'. - * @throws IllegalArgumentException if the column is not a valid column in the dataset, or if - * the content of this column is not of type Vector. - * - * Here is how to access the correlation coefficient: - * {{{ - * val data: Dataset[Vector] = ... - * val Row(coeff: Matrix) = Statistics.corr(data, "value").head - * // coeff now contains the Pearson correlation matrix. - * }}} - * - * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column - * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], - * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to - * avoid recomputing the common lineage. - */ - @Since("2.2.0") - def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { - val rdd = dataset.select(column).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) - } - val oldM = OldStatistics.corr(rdd, method) - val name = s"$method($column)" - val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = true))) - dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) - } - - /** - * Compute the correlation matrix for the input Dataset of Vectors. - * @param dataset A dataset or dataframe - * @param column A column of this dataset - * @return - */ - @Since("2.2.0") - def corr(dataset: Dataset[_], column: String): DataFrame = { - corr(dataset, column, "pearson") - } -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationsSuite.scala similarity index 90% rename from mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala rename to mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationsSuite.scala index a97f4eb3ad3a9..e184fc4ad25e2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/stat/StatisticsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationsSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} -class StatisticsSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { +class CorrelationsSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { import LinalgUtils._ @@ -51,8 +51,8 @@ class StatisticsSuite extends SparkFunSuite with MLlibTestSparkContext with Logg test("corr(X) default, pearson") { - val defaultMat = Statistics.corr(X, "features") - val pearsonMat = Statistics.corr(X, "features", "pearson") + val defaultMat = Correlations.corr(X, "features") + val pearsonMat = Correlations.corr(X, "features", "pearson") // scalastyle:off val expected = BDM( (1.00000000, 0.05564149, Double.NaN, 0.4004714), @@ -66,7 +66,7 @@ class StatisticsSuite extends SparkFunSuite with MLlibTestSparkContext with Logg } test("corr(X) spearman") { - val spearmanMat = Statistics.corr(X, "features", "spearman") + val spearmanMat = Correlations.corr(X, "features", "spearman") // scalastyle:off val expected = BDM( (1.0000000, 0.1054093, Double.NaN, 0.4000000), From 6040e4c4f624392171bfca295c59555ddced6b15 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Wed, 22 Mar 2017 12:06:04 -0700 Subject: [PATCH 6/8] comments --- .../apache/spark/ml/util/TestingUtils.scala | 8 +++++ .../{Correlations.scala => Correlation.scala} | 10 +++---- ...ionsSuite.scala => CorrelationSuite.scala} | 29 +++++++++---------- 3 files changed, 25 insertions(+), 22 deletions(-) rename mllib/src/main/scala/org/apache/spark/ml/stat/{Correlations.scala => Correlation.scala} (94%) rename mllib/src/test/scala/org/apache/spark/ml/stat/{CorrelationsSuite.scala => CorrelationSuite.scala} (71%) diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala b/mllib-local/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala index 2327917e2cad7..30edd00fb53e1 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala @@ -32,6 +32,10 @@ object TestingUtils { * the relative tolerance is meaningless, so the exception will be raised to warn users. */ private def RelativeErrorComparison(x: Double, y: Double, eps: Double): Boolean = { + // Special case for NaNs + if (x.isNaN && y.isNaN) { + return true + } val absX = math.abs(x) val absY = math.abs(y) val diff = math.abs(x - y) @@ -49,6 +53,10 @@ object TestingUtils { * Private helper function for comparing two values using absolute tolerance. */ private def AbsoluteErrorComparison(x: Double, y: Double, eps: Double): Boolean = { + // Special case for NaNs + if (x.isNaN && y.isNaN) { + return true + } math.abs(x - y) < eps } diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala similarity index 94% rename from mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala rename to mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala index f272125c75864..5a7257a845b7d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlations.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -27,16 +27,17 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.types.{StructField, StructType} /** - * API for statistical functions in MLlib, compatible with Dataframes and Datasets. + * API for correlation functions in MLlib, compatible with Dataframes and Datasets. * * The functions in this package generalize the functions in [[org.apache.spark.sql.Dataset.stat]] * to spark.ml's Vector types. */ @Since("2.2.0") @Experimental -object Correlations { +object Correlation { /** + * :: Experimental :: * Compute the correlation matrix for the input RDD of Vectors using the specified method. * Methods currently supported: `pearson` (default), `spearman`. * @@ -71,15 +72,12 @@ object Correlations { } val oldM = OldStatistics.corr(rdd, method) val name = s"$method($column)" - val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = true))) + val schema = StructType(Array(StructField(name, SQLDataTypes.MatrixType, nullable = false))) dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema) } /** * Compute the correlation matrix for the input Dataset of Vectors. - * @param dataset A dataset or dataframe - * @param column A column of this dataset - * @return */ @Since("2.2.0") def corr(dataset: Dataset[_], column: String): DataFrame = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationSuite.scala similarity index 71% rename from mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationsSuite.scala rename to mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationSuite.scala index e184fc4ad25e2..7d935e651f220 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/stat/CorrelationSuite.scala @@ -21,16 +21,13 @@ import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging -import org.apache.spark.ml.linalg.Matrix -import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.util.LinalgUtils +import org.apache.spark.ml.linalg.{Matrices, Matrix, Vectors} +import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} -class CorrelationsSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { - - import LinalgUtils._ +class CorrelationSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { val xData = Array(1.0, 0.0, -2.0) val yData = Array(4.0, 5.0, 3.0) @@ -51,30 +48,30 @@ class CorrelationsSuite extends SparkFunSuite with MLlibTestSparkContext with Lo test("corr(X) default, pearson") { - val defaultMat = Correlations.corr(X, "features") - val pearsonMat = Correlations.corr(X, "features", "pearson") + val defaultMat = Correlation.corr(X, "features") + val pearsonMat = Correlation.corr(X, "features", "pearson") // scalastyle:off - val expected = BDM( + val expected = Matrices.fromBreeze(BDM( (1.00000000, 0.05564149, Double.NaN, 0.4004714), (0.05564149, 1.00000000, Double.NaN, 0.9135959), (Double.NaN, Double.NaN, 1.00000000, Double.NaN), - (0.40047142, 0.91359586, Double.NaN, 1.0000000)) + (0.40047142, 0.91359586, Double.NaN, 1.0000000))) // scalastyle:on - assert(matrixApproxEqual(extract(defaultMat), expected)) - assert(matrixApproxEqual(extract(pearsonMat), expected)) + assert(Matrices.fromBreeze(extract(defaultMat)) ~== expected absTol 1e-4) + assert(Matrices.fromBreeze(extract(pearsonMat)) ~== expected absTol 1e-4) } test("corr(X) spearman") { - val spearmanMat = Correlations.corr(X, "features", "spearman") + val spearmanMat = Correlation.corr(X, "features", "spearman") // scalastyle:off - val expected = BDM( + val expected = Matrices.fromBreeze(BDM( (1.0000000, 0.1054093, Double.NaN, 0.4000000), (0.1054093, 1.0000000, Double.NaN, 0.9486833), (Double.NaN, Double.NaN, 1.00000000, Double.NaN), - (0.4000000, 0.9486833, Double.NaN, 1.0000000)) + (0.4000000, 0.9486833, Double.NaN, 1.0000000))) // scalastyle:on - assert(matrixApproxEqual(extract(spearmanMat), expected)) + assert(Matrices.fromBreeze(extract(spearmanMat)) ~== expected absTol 1e-4) } } From 2151e8a0204d628f3e77c782e03d4f17e1674109 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Wed, 22 Mar 2017 12:07:25 -0700 Subject: [PATCH 7/8] unused file --- .../apache/spark/ml/util/LinalgUtils.scala | 54 ------------------- 1 file changed, 54 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala deleted file mode 100644 index ee3a97a90ce93..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/ml/util/LinalgUtils.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.util - -import breeze.linalg.{Matrix => BM} - -import org.apache.spark.internal.Logging - -/** - * Utility test methods for linear algebra. - */ -object LinalgUtils extends Logging { - - - /** - * Returns true if two numbers are equal up to some tolerance. - */ - def approxEqual(v1: Double, v2: Double, threshold: Double = 1e-6): Boolean = { - if (v1.isNaN) { - v2.isNaN - } else { - math.abs(v1 - v2) <= threshold - } - } - - /** - * Returns true if two numbers are equal coefficient-wise up to some tolerance. - */ - def matrixApproxEqual(A: BM[Double], B: BM[Double], threshold: Double = 1e-6): Boolean = { - for (i <- 0 until A.rows; j <- 0 until A.cols) { - if (!approxEqual(A(i, j), B(i, j), threshold)) { - logInfo("i, j = " + i + ", " + j + " actual: " + A(i, j) + " expected:" + B(i, j)) - return false - } - } - true - } - -} From 7c540e5080aa10894d33cfa9924b65bd551375ab Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 23 Mar 2017 13:51:18 -0700 Subject: [PATCH 8/8] comment --- mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala index 5a7257a845b7d..a7243ccbf28cc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -77,7 +77,7 @@ object Correlation { } /** - * Compute the correlation matrix for the input Dataset of Vectors. + * Compute the Pearson correlation matrix for the input Dataset of Vectors. */ @Since("2.2.0") def corr(dataset: Dataset[_], column: String): DataFrame = {