Skip to content

Commit

Permalink
[SPARK-29235][ML][PYSPARK] Support avgMetrics in read/write of CrossV…
Browse files Browse the repository at this point in the history
…alidatorModel

### What changes were proposed in this pull request?
 Currently pyspark doesn't write/read `avgMetrics` in `CrossValidatorModel`, whereas scala supports it.

### Why are the changes needed?
 Test step to reproduce it:

```
dataset = spark.createDataFrame([(Vectors.dense([0.0]), 0.0),
     (Vectors.dense([0.4]), 1.0),
     (Vectors.dense([0.5]), 0.0),
      (Vectors.dense([0.6]), 1.0),
      (Vectors.dense([1.0]), 1.0)] * 10,
     ["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,parallelism=2)
cvModel = cv.fit(dataset)
cvModel.write().save("/tmp/model")
cvModel2 = CrossValidatorModel.read().load("/tmp/model")
print(cvModel.avgMetrics) # prints non empty result as expected
print(cvModel2.avgMetrics) # Bug: prints an empty result.
```

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Manually tested

Before patch:
```
>>> cvModel.write().save("/tmp/model_0")
>>> cvModel2 = CrossValidatorModel.read().load("/tmp/model_0")
>>> print(cvModel2.avgMetrics)
[]
```

After patch:
```
>>> cvModel2 = CrossValidatorModel.read().load("/tmp/model_2")
>>> print(cvModel2.avgMetrics[0])
0.5
```

Closes #26038 from shahidki31/avgMetrics.

Authored-by: shahid <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
shahidki31 authored and srowen committed Oct 19, 2019
1 parent ab92e17 commit 4a6005c
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.common import _py2java
from pyspark.ml.common import _py2java, _java2py
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasCollectSubModels, HasParallelism, HasSeed
from pyspark.ml.util import *
Expand Down Expand Up @@ -216,6 +216,8 @@ class CrossValidator(Estimator, _CrossValidatorParams, HasParallelism, HasCollec
>>> from pyspark.ml.classification import LogisticRegression
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.tuning import CrossValidatorModel
>>> import tempfile
>>> dataset = spark.createDataFrame(
... [(Vectors.dense([0.0]), 0.0),
... (Vectors.dense([0.4]), 1.0),
Expand All @@ -233,6 +235,12 @@ class CrossValidator(Estimator, _CrossValidatorParams, HasParallelism, HasCollec
3
>>> cvModel.avgMetrics[0]
0.5
>>> path = tempfile.mkdtemp()
>>> model_path = path + "/model"
>>> cvModel.write().save(model_path)
>>> cvModelRead = CrossValidatorModel.read().load(model_path)
>>> cvModelRead.avgMetrics
[0.5, ...
>>> evaluator.evaluate(cvModel.transform(dataset))
0.8333...
Expand Down Expand Up @@ -483,10 +491,12 @@ def _from_java(cls, java_stage):
Given a Java CrossValidatorModel, create and return a Python wrapper of it.
Used for ML persistence.
"""
sc = SparkContext._active_spark_context
bestModel = JavaParams._from_java(java_stage.bestModel())
avgMetrics = _java2py(sc, java_stage.avgMetrics())
estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage)

py_stage = cls(bestModel=bestModel).setEstimator(estimator)
py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics).setEstimator(estimator)
py_stage = py_stage.setEstimatorParamMaps(epms).setEvaluator(evaluator)

if java_stage.hasSubModels():
Expand All @@ -505,11 +515,10 @@ def _to_java(self):
"""

sc = SparkContext._active_spark_context
# TODO: persist average metrics as well
_java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.CrossValidatorModel",
self.uid,
self.bestModel._to_java(),
_py2java(sc, []))
_py2java(sc, self.avgMetrics))
estimator, epms, evaluator = super(CrossValidatorModel, self)._to_java_impl()

_java_obj.set("evaluator", evaluator)
Expand Down Expand Up @@ -551,6 +560,8 @@ class TrainValidationSplit(Estimator, _TrainValidationSplitParams, HasParallelis
>>> from pyspark.ml.classification import LogisticRegression
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.tuning import TrainValidationSplitModel
>>> import tempfile
>>> dataset = spark.createDataFrame(
... [(Vectors.dense([0.0]), 0.0),
... (Vectors.dense([0.4]), 1.0),
Expand All @@ -566,6 +577,14 @@ class TrainValidationSplit(Estimator, _TrainValidationSplitParams, HasParallelis
>>> tvsModel = tvs.fit(dataset)
>>> tvsModel.getTrainRatio()
0.75
>>> tvsModel.validationMetrics
[0.5, ...
>>> path = tempfile.mkdtemp()
>>> model_path = path + "/model"
>>> tvsModel.write().save(model_path)
>>> tvsModelRead = TrainValidationSplitModel.read().load(model_path)
>>> tvsModelRead.validationMetrics
[0.5, ...
>>> evaluator.evaluate(tvsModel.transform(dataset))
0.833...
Expand Down Expand Up @@ -809,11 +828,14 @@ def _from_java(cls, java_stage):
"""

# Load information from java_stage to the instance.
sc = SparkContext._active_spark_context
bestModel = JavaParams._from_java(java_stage.bestModel())
validationMetrics = _java2py(sc, java_stage.validationMetrics())
estimator, epms, evaluator = super(TrainValidationSplitModel,
cls)._from_java_impl(java_stage)
# Create a new instance of this stage.
py_stage = cls(bestModel=bestModel).setEstimator(estimator)
py_stage = cls(bestModel=bestModel,
validationMetrics=validationMetrics).setEstimator(estimator)
py_stage = py_stage.setEstimatorParamMaps(epms).setEvaluator(evaluator)

if java_stage.hasSubModels():
Expand All @@ -830,12 +852,11 @@ def _to_java(self):
"""

sc = SparkContext._active_spark_context
# TODO: persst validation metrics as well
_java_obj = JavaParams._new_java_obj(
"org.apache.spark.ml.tuning.TrainValidationSplitModel",
self.uid,
self.bestModel._to_java(),
_py2java(sc, []))
_py2java(sc, self.validationMetrics))
estimator, epms, evaluator = super(TrainValidationSplitModel, self)._to_java_impl()

_java_obj.set("evaluator", evaluator)
Expand Down

0 comments on commit 4a6005c

Please sign in to comment.