Skip to content

Commit

Permalink
Support spark dataframe as input dataset and spark models as estimato…
Browse files Browse the repository at this point in the history
…rs (microsoft#934)

* add basic support to Spark dataframe

add support to SynapseML LightGBM model

update to pyspark>=3.2.0 to leverage pandas_on_Spark API

* clean code, add TODOs

* add sample_train_data for pyspark.pandas dataframe, fix bugs

* improve some functions, fix bugs

* fix dict change size during iteration

* update model predict

* update LightGBM model, update test

* update SynapseML LightGBM params

* update synapseML and tests

* update TODOs

* Added support to roc_auc for spark models

* Added support to score of spark estimator

* Added test for automl score of spark estimator

* Added cv support to pyspark.pandas dataframe

* Update test, fix bugs

* Added tests

* Updated docs, tests, added a notebook

* Fix bugs in non-spark env

* Fix bugs and improve tests

* Fix uninstall pyspark

* Fix tests error

* Fix java.lang.OutOfMemoryError: Java heap space

* Fix test_performance

* Update test_sparkml to test_0sparkml to use the expected spark conf

* Remove unnecessary widgets in notebook

* Fix iloc java.lang.StackOverflowError

* fix pre-commit

* Added params check for spark dataframes

* Refactor code for train_test_split to a function

* Update train_test_split_pyspark

* Refactor if-else, remove unnecessary code

* Remove y from predict, remove mem control from n_iter compute

* Update workflow

* Improve _split_pyspark

* Fix test failure of too short training time

* Fix typos, improve docstrings

* Fix index errors of pandas_on_spark, add spark loss metric

* Fix typo of ndcgAtK

* Update NDCG metrics and tests

* Remove unuseful logger

* Use cache and count to ensure consistent indexes

* refactor for merge maain

* fix errors of refactor

* Updated SparkLightGBMEstimator and cache

* Updated config2params

* Remove unused import

* Fix unknown parameters

* Update default_estimator_list

* Add unit tests for spark metrics
  • Loading branch information
thinkall authored Mar 25, 2023
1 parent f33cb42 commit 720e356
Show file tree
Hide file tree
Showing 24 changed files with 3,017 additions and 235 deletions.
19 changes: 10 additions & 9 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ jobs:
matrix:
os: [ubuntu-latest, macos-latest, windows-2019]
python-version: ["3.7", "3.8", "3.9", "3.10"]

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand All @@ -45,21 +44,18 @@ jobs:
export CFLAGS="$CFLAGS -I/usr/local/opt/libomp/include"
export CXXFLAGS="$CXXFLAGS -I/usr/local/opt/libomp/include"
export LDFLAGS="$LDFLAGS -Wl,-rpath,/usr/local/opt/libomp/lib -L/usr/local/opt/libomp/lib -lomp"
- name: On Linux, install Spark stand-alone cluster and PySpark
if: matrix.os == 'ubuntu-latest'
- name: On Linux + python 3.8, install pyspark 3.2.3
if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.8'
run: |
sudo apt-get update && sudo apt-get install -y --allow-downgrades --allow-change-held-packages --no-install-recommends ca-certificates-java ca-certificates openjdk-17-jdk-headless && sudo apt-get clean && sudo rm -rf /var/lib/apt/lists/*
wget --progress=dot:giga "https://www.apache.org/dyn/closer.lua/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz?action=download" -O - | tar -xzC /tmp; archive=$(basename "spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz") bash -c "sudo mv -v /tmp/\${archive/%.tgz/} /spark"
pip install --no-cache-dir pyspark>=3.0
export SPARK_HOME=/spark
export PYTHONPATH=/spark/python/lib/py4j-0.10.9.5-src.zip:/spark/python
export PATH=$PATH:$SPARK_HOME/bin
python -m pip install --upgrade pip wheel
pip install pyspark==3.2.3
- name: Install packages and dependencies
run: |
python -m pip install --upgrade pip wheel
pip install -e .
python -c "import flaml"
pip install -e .[test]
pip list | grep "pyspark"
- name: If linux, install ray 2
if: matrix.os == 'ubuntu-latest'
run: |
Expand All @@ -76,6 +72,11 @@ jobs:
if: matrix.python-version != '3.10'
run: |
pip install -e .[vw]
- name: Uninstall pyspark on python 3.9
if: matrix.python-version == '3.9'
run: |
# Uninstall pyspark to test env without pyspark
pip uninstall -y pyspark
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,6 @@ automl.pkl

test/nlp/testtmp.py
test/nlp/testtmpfl.py

flaml/tune/spark/mylearner.py
*.pkl
57 changes: 50 additions & 7 deletions flaml/automl/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import sys
from typing import Callable, List, Union, Optional
import inspect
from functools import partial
import numpy as np
from sklearn.base import BaseEstimator
Expand All @@ -17,7 +16,6 @@

from flaml.automl.state import SearchState, AutoMLState
from flaml.automl.ml import (
compute_estimator,
train_estimator,
get_estimator_class,
)
Expand All @@ -31,7 +29,6 @@
N_SPLITS,
SAMPLE_MULTIPLY_FACTOR,
)
from flaml.automl.data import concat

# TODO check to see when we can remove these
from flaml.automl.task.task import CLASSIFICATION, TS_FORECAST, Task
Expand All @@ -43,6 +40,34 @@
from flaml.version import __version__ as flaml_version
from flaml.tune.spark.utils import check_spark, get_broadcast_data

try:
from flaml.automl.spark.utils import (
train_test_split_pyspark,
unique_pandas_on_spark,
len_labels,
unique_value_first_index,
)
except ImportError:
train_test_split_pyspark = None
unique_pandas_on_spark = None
from flaml.automl.utils import (
len_labels,
unique_value_first_index,
)
try:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark.pandas as ps
from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries
from pyspark.pandas.config import set_option, reset_option
except ImportError:
ps = None

class psDataFrame:
pass

class psSeries:
pass


try:
import mlflow
Expand Down Expand Up @@ -511,7 +536,12 @@ def time_to_find_best_model(self) -> float:
"""Time taken to find best model in seconds."""
return self.__dict__.get("_time_taken_best_iter")

def score(self, X: pd.DataFrame, y: pd.Series, **kwargs):
def score(
self,
X: Union[pd.DataFrame, psDataFrame],
y: Union[pd.Series, psSeries],
**kwargs,
):
estimator = getattr(self, "_trained_estimator", None)
if estimator is None:
logger.warning(
Expand All @@ -525,13 +555,14 @@ def score(self, X: pd.DataFrame, y: pd.Series, **kwargs):

def predict(
self,
X: Union[np.array, pd.DataFrame, List[str], List[List[str]]],
X: Union[np.array, pd.DataFrame, List[str], List[List[str]], psDataFrame],
**pred_kwargs,
):
"""Predict label from features.
Args:
X: A numpy array of featurized instances, shape n * m,
X: A numpy array or pandas dataframe or pyspark.pandas dataframe
of featurized instances, shape n * m,
or for time series forcast tasks:
a pandas dataframe with the first column containing
timestamp values (datetime type) or an integer n for
Expand Down Expand Up @@ -1859,7 +1890,19 @@ def is_to_reverse_metric(metric, task):
error_metric = "customized metric"
logger.info(f"Minimizing error metric: {error_metric}")

estimator_list = task.default_estimator_list(estimator_list)
is_spark_dataframe = isinstance(X_train, psDataFrame) or isinstance(
dataframe, psDataFrame
)
estimator_list = task.default_estimator_list(estimator_list, is_spark_dataframe)

if is_spark_dataframe and self._use_spark:
# For spark dataframe, use_spark must be False because spark models are trained in parallel themselves
self._use_spark = False
logger.warning(
"Spark dataframes support only spark.ml type models, which will be trained "
"with spark themselves, no need to start spark trials in flaml. "
"`use_spark` is set to False."
)

# When no search budget is specified
if no_budget:
Expand Down
32 changes: 32 additions & 0 deletions flaml/automl/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,22 @@
from datetime import datetime
from typing import TYPE_CHECKING, Union

import os

try:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark.pandas as ps
from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries
except ImportError:
ps = None

class psDataFrame:
pass

class psSeries:
pass


if TYPE_CHECKING:
from flaml.automl.task import Task

Expand Down Expand Up @@ -198,6 +214,15 @@ def get_output_from_log(filename, time_budget):

def concat(X1, X2):
"""concatenate two matrices vertically."""
if type(X1) != type(X2):
if isinstance(X2, (psDataFrame, psSeries)):
X1 = ps.from_pandas(pd.DataFrame(X1))
elif isinstance(X1, (psDataFrame, psSeries)):
X2 = ps.from_pandas(pd.DataFrame(X2))
else:
X1 = pd.DataFrame(X1)
X2 = pd.DataFrame(X2)

if isinstance(X1, (DataFrame, Series)):
df = pd.concat([X1, X2], sort=False)
df.reset_index(drop=True, inplace=True)
Expand All @@ -206,6 +231,13 @@ def concat(X1, X2):
if len(cat_columns):
df[cat_columns] = df[cat_columns].astype("category")
return df
if isinstance(X1, (psDataFrame, psSeries)):
df = ps.concat([X1, X2], ignore_index=True)
if isinstance(X1, psDataFrame):
cat_columns = X1.select_dtypes(include="category").columns.values.tolist()
if len(cat_columns):
df[cat_columns] = df[cat_columns].astype("category")
return df
if issparse(X1):
return vstack((X1, X2))
else:
Expand Down
51 changes: 39 additions & 12 deletions flaml/automl/ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# * Copyright (c) FLAML authors. All rights reserved.
# * Licensed under the MIT License. See LICENSE file in the
# * project root for license information.
import os
import time
import numpy as np
import pandas as pd
Expand All @@ -19,12 +20,6 @@
mean_absolute_percentage_error,
ndcg_score,
)
from sklearn.model_selection import (
RepeatedStratifiedKFold,
GroupKFold,
TimeSeriesSplit,
StratifiedGroupKFold,
)
from flaml.automl.model import (
XGBoostSklearnEstimator,
XGBoost_TS,
Expand All @@ -46,14 +41,33 @@
TransformersEstimator,
TemporalFusionTransformerEstimator,
TransformersEstimatorModelSelection,
SparkLGBMEstimator,
)
from flaml.automl.data import group_counts
from flaml.automl.task.task import TS_FORECAST, Task
from flaml.automl.model import BaseEstimator

import logging
try:
from flaml.automl.spark.utils import len_labels
except ImportError:
from flaml.automl.utils import len_labels
try:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
from pyspark.sql.functions import col
import pyspark.pandas as ps
from pyspark.pandas import DataFrame as psDataFrame, Series as psSeries
from flaml.automl.spark.utils import to_pandas_on_spark, iloc_pandas_on_spark
from flaml.automl.spark.metrics import spark_metric_loss_score
except ImportError:
ps = None

class psDataFrame:
pass

class psSeries:
pass


logger = logging.getLogger(__name__)
EstimatorSubclass = TypeVar("EstimatorSubclass", bound=BaseEstimator)

sklearn_metric_name_set = {
Expand Down Expand Up @@ -124,6 +138,8 @@ def get_estimator_class(task: str, estimator_name: str) -> EstimatorSubclass:
estimator_class = RF_TS if task in TS_FORECAST else RandomForestEstimator
elif "lgbm" == estimator_name:
estimator_class = LGBM_TS if task in TS_FORECAST else LGBMEstimator
elif "lgbm_spark" == estimator_name:
estimator_class = SparkLGBMEstimator
elif "lrl1" == estimator_name:
estimator_class = LRL1Classifier
elif "lrl2" == estimator_name:
Expand Down Expand Up @@ -163,7 +179,15 @@ def metric_loss_score(
groups=None,
):
# y_processed_predict and y_processed_true are processed id labels if the original were the token labels
if is_in_sklearn_metric_name_set(metric_name):
if isinstance(y_processed_predict, (psDataFrame, psSeries)):
return spark_metric_loss_score(
metric_name,
y_processed_predict,
y_processed_true,
sample_weight,
groups,
)
elif is_in_sklearn_metric_name_set(metric_name):
return sklearn_metric_loss_score(
metric_name,
y_processed_predict,
Expand Down Expand Up @@ -359,7 +383,10 @@ def sklearn_metric_loss_score(
def get_y_pred(estimator, X, eval_metric, task: Task):
if eval_metric in ["roc_auc", "ap", "roc_auc_weighted"] and task.is_binary():
y_pred_classes = estimator.predict_proba(X)
y_pred = y_pred_classes[:, 1] if y_pred_classes.ndim > 1 else y_pred_classes
if isinstance(y_pred_classes, (psSeries, psDataFrame)):
y_pred = y_pred_classes
else:
y_pred = y_pred_classes[:, 1] if y_pred_classes.ndim > 1 else y_pred_classes
elif eval_metric in [
"log_loss",
"roc_auc",
Expand Down Expand Up @@ -525,7 +552,7 @@ def compute_estimator(
fit_kwargs: Optional[dict] = None,
free_mem_ratio=0,
):
if not fit_kwargs:
if fit_kwargs is None:
fit_kwargs = {}

estimator_class = estimator_class or get_estimator_class(task, estimator_name)
Expand Down Expand Up @@ -605,7 +632,7 @@ def train_estimator(
task=task,
n_jobs=n_jobs,
)
if not fit_kwargs:
if fit_kwargs is None:
fit_kwargs = {}

if isinstance(estimator, TransformersEstimator):
Expand Down
Loading

0 comments on commit 720e356

Please sign in to comment.