Skip to content

Commit

Permalink
[MINOR][ML] Fix some PySpark & SparkR flaky tests
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Some PySpark & SparkR tests run with tiny dataset and tiny ```maxIter```, which means they are not converged. I don’t think checking intermediate result during iteration make sense, and these intermediate result may vulnerable and not stable, so we should switch to check the converged result. We hit this issue at #17746 when we upgrade breeze to 0.13.1.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang <[email protected]>

Closes #17757 from yanboliang/flaky-test.
  • Loading branch information
yanboliang committed Apr 26, 2017
1 parent 7fecf51 commit dbb06c6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 50 deletions.
17 changes: 2 additions & 15 deletions R/pkg/inst/tests/testthat/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -284,22 +284,11 @@ test_that("spark.mlp", {
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

# test initialWeights
model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
model <- spark.mlp(df, label ~ features, layers = c(4, 3), initialWeights =
c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "0.0", "1.0", "0.0"))
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

# Test formula works well
df <- suppressWarnings(createDataFrame(iris))
Expand All @@ -310,8 +299,6 @@ test_that("spark.mlp", {
expect_equal(summary$numOfOutputs, 3)
expect_equal(summary$layers, c(4, 3))
expect_equal(length(summary$weights), 15)
expect_equal(head(summary$weights, 5), list(-0.5793153, -4.652961, 6.216155, -6.649478,
-10.51147), tolerance = 1e-3)
})

test_that("spark.naiveBayes", {
Expand Down
71 changes: 36 additions & 35 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,34 +185,33 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
>>> from pyspark.sql import Row
>>> from pyspark.ml.linalg import Vectors
>>> bdf = sc.parallelize([
... Row(label=1.0, weight=2.0, features=Vectors.dense(1.0)),
... Row(label=0.0, weight=2.0, features=Vectors.sparse(1, [], []))]).toDF()
>>> blor = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight")
... Row(label=1.0, weight=1.0, features=Vectors.dense(0.0, 5.0)),
... Row(label=0.0, weight=2.0, features=Vectors.dense(1.0, 2.0)),
... Row(label=1.0, weight=3.0, features=Vectors.dense(2.0, 1.0)),
... Row(label=0.0, weight=4.0, features=Vectors.dense(3.0, 3.0))]).toDF()
>>> blor = LogisticRegression(regParam=0.01, weightCol="weight")
>>> blorModel = blor.fit(bdf)
>>> blorModel.coefficients
DenseVector([5.4...])
DenseVector([-1.080..., -0.646...])
>>> blorModel.intercept
-2.63...
>>> mdf = sc.parallelize([
... Row(label=1.0, weight=2.0, features=Vectors.dense(1.0)),
... Row(label=0.0, weight=2.0, features=Vectors.sparse(1, [], [])),
... Row(label=2.0, weight=2.0, features=Vectors.dense(3.0))]).toDF()
>>> mlor = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight",
... family="multinomial")
3.112...
>>> data_path = "data/mllib/sample_multiclass_classification_data.txt"
>>> mdf = spark.read.format("libsvm").load(data_path)
>>> mlor = LogisticRegression(regParam=0.1, elasticNetParam=1.0, family="multinomial")
>>> mlorModel = mlor.fit(mdf)
>>> mlorModel.coefficientMatrix
DenseMatrix(3, 1, [-2.3..., 0.2..., 2.1...], 1)
SparseMatrix(3, 4, [0, 1, 2, 3], [3, 2, 1], [1.87..., -2.75..., -0.50...], 1)
>>> mlorModel.interceptVector
DenseVector([2.1..., 0.6..., -2.8...])
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
DenseVector([0.04..., -0.42..., 0.37...])
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 1.0))]).toDF()
>>> result = blorModel.transform(test0).head()
>>> result.prediction
0.0
1.0
>>> result.probability
DenseVector([0.99..., 0.00...])
DenseVector([0.02..., 0.97...])
>>> result.rawPrediction
DenseVector([8.12..., -8.12...])
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
DenseVector([-3.54..., 3.54...])
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
>>> blorModel.transform(test1).head().prediction
1.0
>>> blor.setParams("vector")
Expand All @@ -222,8 +221,8 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
>>> lr_path = temp_path + "/lr"
>>> blor.save(lr_path)
>>> lr2 = LogisticRegression.load(lr_path)
>>> lr2.getMaxIter()
5
>>> lr2.getRegParam()
0.01
>>> model_path = temp_path + "/lr_model"
>>> blorModel.save(model_path)
>>> model2 = LogisticRegressionModel.load(model_path)
Expand Down Expand Up @@ -1480,31 +1479,33 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable):
>>> from pyspark.sql import Row
>>> from pyspark.ml.linalg import Vectors
>>> df = sc.parallelize([
... Row(label=0.0, features=Vectors.dense(1.0, 0.8)),
... Row(label=1.0, features=Vectors.sparse(2, [], [])),
... Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF()
>>> lr = LogisticRegression(maxIter=5, regParam=0.01)
>>> data_path = "data/mllib/sample_multiclass_classification_data.txt"
>>> df = spark.read.format("libsvm").load(data_path)
>>> lr = LogisticRegression(regParam=0.01)
>>> ovr = OneVsRest(classifier=lr)
>>> model = ovr.fit(df)
>>> [x.coefficients for x in model.models]
[DenseVector([4.9791, 2.426]), DenseVector([-4.1198, -5.9326]), DenseVector([-3.314, 5.2423])]
>>> model.models[0].coefficients
DenseVector([0.5..., -1.0..., 3.4..., 4.2...])
>>> model.models[1].coefficients
DenseVector([-2.1..., 3.1..., -2.6..., -2.3...])
>>> model.models[2].coefficients
DenseVector([0.3..., -3.4..., 1.0..., -1.1...])
>>> [x.intercept for x in model.models]
[-5.06544..., 2.30341..., -1.29133...]
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF()
[-2.7..., -2.5..., -1.3...]
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0, 1.0, 1.0))]).toDF()
>>> model.transform(test0).head().prediction
1.0
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
>>> model.transform(test1).head().prediction
0.0
>>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF()
>>> model.transform(test2).head().prediction
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(4, [0], [1.0]))]).toDF()
>>> model.transform(test1).head().prediction
2.0
>>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4, 0.3, 0.2))]).toDF()
>>> model.transform(test2).head().prediction
0.0
>>> model_path = temp_path + "/ovr_model"
>>> model.save(model_path)
>>> model2 = OneVsRestModel.load(model_path)
>>> model2.transform(test0).head().prediction
1.0
0.0
.. versionadded:: 2.0.0
"""
Expand Down

0 comments on commit dbb06c6

Please sign in to comment.