Skip to content

Commit

Permalink
[SPARK-44374][PYTHON][ML] Add example code for distributed ML for spa…
Browse files Browse the repository at this point in the history
…rk connect

### What changes were proposed in this pull request?

Add example code for distributed ML for spark connect

### Why are the changes needed?

Example code for new APIs.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #41940 from WeichenXu123/ml-connect-example.

Authored-by: Weichen Xu <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
  • Loading branch information
WeichenXu123 committed Jul 12, 2023
1 parent 0d90f2a commit 054f94a
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 1 deletion.
25 changes: 25 additions & 0 deletions python/pyspark/ml/connect/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,31 @@ class LogisticRegression(
Logistic regression estimator.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.classification import LogisticRegression, LogisticRegressionModel
>>> lor = LogisticRegression(maxIter=20, learningRate=0.01)
>>> dataset = spark.createDataFrame([
... ([1.0, 2.0], 1),
... ([2.0, -1.0], 1),
... ([-3.0, -2.0], 0),
... ([-1.0, -2.0], 0),
... ], schema=['features', 'label'])
>>> lor_model = lor.fit(dataset)
>>> transformed_dataset = lor_model.transform(dataset)
>>> transformed_dataset.show()
+------------+-----+----------+--------------------+
| features|label|prediction| probability|
+------------+-----+----------+--------------------+
| [1.0, 2.0]| 1| 1|[0.02423273026943...|
| [2.0, -1.0]| 1| 1|[0.09334788471460...|
|[-3.0, -2.0]| 0| 0|[0.99808156490325...|
|[-1.0, -2.0]| 0| 0|[0.96210002899169...|
+------------+-----+----------+--------------------+
>>> lor_model.saveToLocal("/tmp/lor_model")
>>> LogisticRegressionModel.loadFromLocal("/tmp/lor_model")
LogisticRegression_...
"""

_input_kwargs: Dict[str, Any]
Expand Down
38 changes: 38 additions & 0 deletions python/pyspark/ml/connect/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ class RegressionEvaluator(_TorchMetricEvaluator, HasLabelCol, HasPredictionCol,
Supported metrics are 'rmse', 'mse' and 'r2'.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.evaluation import RegressionEvaluator
>>> eva = RegressionEvaluator(metricName='mse')
>>> dataset = spark.createDataFrame(
... [(1.0, 2.0), (-1.0, -1.5)], schema=['label', 'prediction']
... )
>>> eva.evaluate(dataset)
0.625
>>> eva.isLargerBetter()
False
"""

@keyword_only
Expand Down Expand Up @@ -148,6 +160,19 @@ class BinaryClassificationEvaluator(
Supported metrics are 'areaUnderROC' and 'areaUnderPR'.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.evaluation import BinaryClassificationEvaluator
>>> eva = BinaryClassificationEvaluator(metricName='areaUnderPR')
>>> dataset = spark.createDataFrame(
... [(1, 0.6), (0, 0.55), (0, 0.1), (1, 0.6), (1, 0.4)],
... schema=['label', 'probability']
... )
>>> eva.evaluate(dataset)
0.9166666865348816
>>> eva.isLargerBetter()
True
"""

@keyword_only
Expand Down Expand Up @@ -207,6 +232,19 @@ class MulticlassClassificationEvaluator(
Supported metrics are 'accuracy'.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.evaluation import MulticlassClassificationEvaluator
>>> eva = MulticlassClassificationEvaluator(metricName='accuracy')
>>> dataset = spark.createDataFrame(
... [(1, 1), (0, 0), (2, 2), (1, 0), (2, 1)],
... schema=['label', 'prediction']
... )
>>> eva.evaluate(dataset)
0.6000000238418579
>>> eva.isLargerBetter()
True
"""

def __init__(
Expand Down
56 changes: 56 additions & 0 deletions python/pyspark/ml/connect/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ class MaxAbsScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
Rescale each feature individually to range [-1, 1] by dividing through the largest maximum
absolute value in each feature. It does not shift/center the data, and thus does not destroy
any sparsity.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.feature import MaxAbsScaler
>>> scaler = MaxAbsScaler(inputCol='features', outputCol='scaled_features')
>>> dataset = spark.createDataFrame([
... ([1.0, 2.0],),
... ([2.0, -1.0],),
... ([-3.0, -2.0],),
... ], schema=['features'])
>>> scaler_model = scaler.fit(dataset)
>>> transformed_dataset = scaler_model.transform(dataset)
>>> transformed_dataset.show(truncate=False)
+------------+--------------------------+
|features |scaled_features |
+------------+--------------------------+
|[1.0, 2.0] |[0.3333333333333333, 1.0] |
|[2.0, -1.0] |[0.6666666666666666, -0.5]|
|[-3.0, -2.0]|[-1.0, -1.0] |
+------------+--------------------------+
"""

_input_kwargs: Dict[str, Any]
Expand Down Expand Up @@ -62,6 +84,12 @@ def _fit(self, dataset: Union["pd.DataFrame", "DataFrame"]) -> "MaxAbsScalerMode


class MaxAbsScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, CoreModelReadWrite):
"""
Model fitted by MaxAbsScaler.
.. versionadded:: 3.5.0
"""

def __init__(
self, max_abs_values: Optional["np.ndarray"] = None, n_samples_seen: Optional[int] = None
) -> None:
Expand Down Expand Up @@ -117,6 +145,28 @@ class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
"""
Standardizes features by removing the mean and scaling to unit variance using column summary
statistics on the samples in the training set.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.feature import StandardScaler
>>> scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
>>> dataset = spark.createDataFrame([
... ([1.0, 2.0],),
... ([2.0, -1.0],),
... ([-3.0, -2.0],),
... ], schema=['features'])
>>> scaler_model = scaler.fit(dataset)
>>> transformed_dataset = scaler_model.transform(dataset)
>>> transformed_dataset.show(truncate=False)
+------------+------------------------------------------+
|features |scaled_features |
+------------+------------------------------------------+
|[1.0, 2.0] |[0.3779644730092272, 1.1208970766356101] |
|[2.0, -1.0] |[0.7559289460184544, -0.3202563076101743] |
|[-3.0, -2.0]|[-1.1338934190276817, -0.8006407690254358]|
+------------+------------------------------------------+
"""

_input_kwargs: Dict[str, Any]
Expand Down Expand Up @@ -144,6 +194,12 @@ def _fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "StandardScalerModel"


class StandardScalerModel(Model, HasInputCol, HasOutputCol, ParamsReadWrite, CoreModelReadWrite):
"""
Model fitted by StandardScaler.
.. versionadded:: 3.5.0
"""

def __init__(
self,
mean_values: Optional["np.ndarray"] = None,
Expand Down
28 changes: 28 additions & 0 deletions python/pyspark/ml/connect/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,34 @@ class Pipeline(Estimator["PipelineModel"], _PipelineReadWrite):
identity transformer.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect import Pipeline
>>> from pyspark.ml.connect.classification import LogisticRegression
>>> from pyspark.ml.connect.feature import StandardScaler
>>> scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
>>> lor = LogisticRegression(maxIter=20, learningRate=0.01)
>>> pipeline=Pipeline(stages=[scaler, lor])
>>> dataset = spark.createDataFrame([
... ([1.0, 2.0], 1),
... ([2.0, -1.0], 1),
... ([-3.0, -2.0], 0),
... ([-1.0, -2.0], 0),
... ], schema=['features', 'label'])
>>> pipeline_model = pipeline.fit(dataset)
>>> transformed_dataset = pipeline_model.transform(dataset)
>>> transformed_dataset.show()
+------------+-----+--------------------+----------+--------------------+
| features|label| scaled_features|prediction| probability|
+------------+-----+--------------------+----------+--------------------+
| [1.0, 2.0]| 1|[0.56373452100212...| 1|[0.02423273026943...|
| [2.0, -1.0]| 1|[1.01472213780381...| 1|[0.09334788471460...|
|[-3.0, -2.0]| 0|[-1.2402159462046...| 0|[0.99808156490325...|
|[-1.0, -2.0]| 0|[-0.3382407126012...| 0|[0.96210002899169...|
+------------+-----+--------------------+----------+--------------------+
>>> pipeline_model.saveToLocal("/tmp/pipeline")
>>> loaded_pipeline_model = PipelineModel.loadFromLocal("/tmp/pipeline")
"""

stages: Param[List[Params]] = Param(
Expand Down
24 changes: 23 additions & 1 deletion python/pyspark/ml/connect/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,36 @@ class CrossValidator(
_CrossValidatorReadWrite,
):
"""
K-fold cross validation performs model selection by splitting the dataset into a set of
non-overlapping randomly partitioned folds which are used as separate training and test datasets
e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs,
each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the
test set exactly once.
.. versionadded:: 3.5.0
Examples
--------
>>> from pyspark.ml.connect.tuning import CrossValidator
>>> from pyspark.ml.connect.classification import LogisticRegression
>>> from pyspark.ml.connect.evaluation import BinaryClassificationEvaluator
>>> from pyspark.ml.tuning import ParamGridBuilder
>>> from sklearn.datasets import load_breast_cancer
>>> lor = LogisticRegression(maxIter=20, learningRate=0.01)
>>> ev = BinaryClassificationEvaluator()
>>> grid = ParamGridBuilder().addGrid(lor.maxIter, [2, 20]).build()
>>> cv = CrossValidator(estimator=lor, evaluator=ev, estimatorParamMaps=grid)
>>> sk_dataset = load_breast_cancer()
>>> train_dataset = spark.createDataFrame(
... zip(sk_dataset.data.tolist(), [int(t) for t in sk_dataset.target]),
... schema="features: array<double>, label: long",
... )
>>> cv_model = cv.fit(train_dataset)
>>> transformed_dataset = cv_model.transform(train_dataset.limit(10))
>>> cv_model.avgMetrics
[0.5527792527167658, 0.8348714668615984]
>>> cv_model.stdMetrics
[0.04902833489813031, 0.05247132866444953]
"""

_input_kwargs: Dict[str, Any]
Expand Down

0 comments on commit 054f94a

Please sign in to comment.