diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 2db5dce8b143a..d4771d779f9f4 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -25,6 +25,7 @@ from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \ _deserialize_double_vector, _dot, _squared_distance from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.regression import LabeledPoint from pyspark.tests import PySparkTestCase @@ -41,16 +42,21 @@ class VectorTests(unittest.TestCase): def test_serialize(self): sv = SparseVector(4, {1: 1, 3: 2}) dv = array([1., 2., 3., 4.]) + lst = [1, 2, 3, 4] self.assertTrue(sv is _convert_vector(sv)) self.assertTrue(dv is _convert_vector(dv)) + self.assertTrue(array_equal(dv, _convert_vector(lst))) self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(sv))) self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv)))) + self.assertTrue(array_equal(dv, + _deserialize_double_vector(_serialize_double_vector(lst)))) def test_dot(self): sv = SparseVector(4, {1: 1, 3: 2}) dv = array([1., 2., 3., 4.]) + lst = [1, 2, 3, 4] mat = array([[1., 2., 3., 4.], [1., 2., 3., 4.], [1., 2., 3., 4.], @@ -59,10 +65,109 @@ def test_dot(self): self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat))) self.assertEquals(30.0, _dot(dv, dv)) self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat))) + self.assertEquals(30.0, _dot(lst, dv)) + self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat))) + + def test_squared_distance(self): + sv = SparseVector(4, {1: 1, 3: 2}) + dv = array([1., 2., 3., 4.]) + lst = [4, 3, 2, 1] + self.assertEquals(15.0, _squared_distance(sv, dv)) + self.assertEquals(25.0, _squared_distance(sv, lst)) + self.assertEquals(20.0, _squared_distance(dv, lst)) + self.assertEquals(15.0, _squared_distance(dv, sv)) + self.assertEquals(25.0, _squared_distance(lst, sv)) + self.assertEquals(20.0, _squared_distance(lst, dv)) + self.assertEquals(0.0, _squared_distance(sv, sv)) + self.assertEquals(0.0, _squared_distance(dv, dv)) + self.assertEquals(0.0, _squared_distance(lst, lst)) + + +class ListTests(PySparkTestCase): + """ + Test MLlib algorithms on plain lists, to make sure they're passed through + as NumPy arrays. + """ + + def test_clustering(self): + from pyspark.mllib.clustering import KMeans + data = [ + [0, 1.1], + [0, 1.2], + [1.1, 0], + [1.2, 0], + ] + clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||") + self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) + self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) + + def test_classification(self): + from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes + data = [ + LabeledPoint(0.0, [1, 0]), + LabeledPoint(1.0, [0, 1]), + LabeledPoint(0.0, [2, 0]), + LabeledPoint(1.0, [0, 2]) + ] + rdd = self.sc.parallelize(data) + features = [p.features.tolist() for p in data] + + lr_model = LogisticRegressionWithSGD.train(rdd) + self.assertTrue(lr_model.predict(features[0]) <= 0) + self.assertTrue(lr_model.predict(features[1]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) + + svm_model = SVMWithSGD.train(rdd) + self.assertTrue(svm_model.predict(features[0]) <= 0) + self.assertTrue(svm_model.predict(features[1]) > 0) + self.assertTrue(svm_model.predict(features[2]) <= 0) + self.assertTrue(svm_model.predict(features[3]) > 0) + + nb_model = NaiveBayes.train(rdd) + self.assertTrue(nb_model.predict(features[0]) <= 0) + self.assertTrue(nb_model.predict(features[1]) > 0) + self.assertTrue(nb_model.predict(features[2]) <= 0) + self.assertTrue(nb_model.predict(features[3]) > 0) + + def test_regression(self): + from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ + RidgeRegressionWithSGD + data = [ + LabeledPoint(-1.0, [0, -1]), + LabeledPoint(1.0, [0, 1]), + LabeledPoint(-1.0, [0, -2]), + LabeledPoint(1.0, [0, 2]) + ] + rdd = self.sc.parallelize(data) + features = [p.features.tolist() for p in data] + + lr_model = LinearRegressionWithSGD.train(rdd) + self.assertTrue(lr_model.predict(features[0]) <= 0) + self.assertTrue(lr_model.predict(features[1]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) + + lasso_model = LassoWithSGD.train(rdd) + self.assertTrue(lasso_model.predict(features[0]) <= 0) + self.assertTrue(lasso_model.predict(features[1]) > 0) + self.assertTrue(lasso_model.predict(features[2]) <= 0) + self.assertTrue(lasso_model.predict(features[3]) > 0) + + rr_model = RidgeRegressionWithSGD.train(rdd) + self.assertTrue(rr_model.predict(features[0]) <= 0) + self.assertTrue(rr_model.predict(features[1]) > 0) + self.assertTrue(rr_model.predict(features[2]) <= 0) + self.assertTrue(rr_model.predict(features[3]) > 0) @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): + """ + Test both vector operations and MLlib algorithms with SciPy sparse matrices, + if SciPy is available. + """ + def test_serialize(self): from scipy.sparse import lil_matrix lil = lil_matrix((4, 1)) @@ -132,63 +237,61 @@ def test_clustering(self): def test_classification(self): from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes data = [ - self.scipy_matrix(3, {0: 0.0, 2: 0.0}), - self.scipy_matrix(3, {0: 1.0, 2: 1.0}), - self.scipy_matrix(3, {0: 0.0, 2: 0.0}), - self.scipy_matrix(3, {0: 1.0, 2: 2.0}) + LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})), + LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0})) ] rdd = self.sc.parallelize(data) - features = [ - self.scipy_matrix(2, {1: 0.0}), - self.scipy_matrix(2, {1: 1.0}), - self.scipy_matrix(2, {1: 2.0}), - ] + features = [p.features for p in data] lr_model = LogisticRegressionWithSGD.train(rdd) self.assertTrue(lr_model.predict(features[0]) <= 0) self.assertTrue(lr_model.predict(features[1]) > 0) - self.assertTrue(lr_model.predict(features[2]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) svm_model = SVMWithSGD.train(rdd) self.assertTrue(svm_model.predict(features[0]) <= 0) self.assertTrue(svm_model.predict(features[1]) > 0) - self.assertTrue(svm_model.predict(features[2]) > 0) + self.assertTrue(svm_model.predict(features[2]) <= 0) + self.assertTrue(svm_model.predict(features[3]) > 0) nb_model = NaiveBayes.train(rdd) self.assertTrue(nb_model.predict(features[0]) <= 0) self.assertTrue(nb_model.predict(features[1]) > 0) - self.assertTrue(nb_model.predict(features[2]) > 0) + self.assertTrue(nb_model.predict(features[2]) <= 0) + self.assertTrue(nb_model.predict(features[3]) > 0) def test_regression(self): from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ RidgeRegressionWithSGD data = [ - self.scipy_matrix(3, {0: -1.0, 2: -1.0}), - self.scipy_matrix(3, {0: 1.0, 2: 1.0}), - self.scipy_matrix(3, {0: -1.0, 2: -2.0}), - self.scipy_matrix(3, {0: 1.0, 2: 2.0}) + LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})), + LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0})) ] rdd = self.sc.parallelize(data) - features = [ - self.scipy_matrix(2, {1: -1.0}), - self.scipy_matrix(2, {1: 1.0}), - self.scipy_matrix(2, {1: 2.0}), - ] + features = [p.features for p in data] lr_model = LinearRegressionWithSGD.train(rdd) self.assertTrue(lr_model.predict(features[0]) <= 0) self.assertTrue(lr_model.predict(features[1]) > 0) - self.assertTrue(lr_model.predict(features[2]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) lasso_model = LassoWithSGD.train(rdd) self.assertTrue(lasso_model.predict(features[0]) <= 0) self.assertTrue(lasso_model.predict(features[1]) > 0) - self.assertTrue(lasso_model.predict(features[2]) > 0) + self.assertTrue(lasso_model.predict(features[2]) <= 0) + self.assertTrue(lasso_model.predict(features[3]) > 0) rr_model = RidgeRegressionWithSGD.train(rdd) self.assertTrue(rr_model.predict(features[0]) <= 0) self.assertTrue(rr_model.predict(features[1]) > 0) - self.assertTrue(rr_model.predict(features[2]) > 0) + self.assertTrue(rr_model.predict(features[2]) <= 0) + self.assertTrue(rr_model.predict(features[3]) > 0) if __name__ == "__main__": diff --git a/python/run-tests b/python/run-tests index abd5efcf69a94..7bbf10d05a817 100755 --- a/python/run-tests +++ b/python/run-tests @@ -60,6 +60,7 @@ run_test "pyspark/mllib/clustering.py" run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" +run_test "pyspark/mllib/tests.py" if [[ $FAILED == 0 ]]; then echo -en "\033[32m" # Green