From 2f84fc29f3c09ac46c82cbc5ccfb51ef8f0433f5 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 23 Nov 2020 20:15:16 +0800 Subject: [PATCH] support spark dataframe in tf2 estimator (#3113) * support spark dataframe in tf2 estimator * fix style --- .../bigdl/orca/learn/tf2/tf_ray_estimator.py | 93 ++++++++++++++++++- .../learn/ray/tf/test_tf_ray_estimator.py | 28 ++++++ 2 files changed, 117 insertions(+), 4 deletions(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/tf_ray_estimator.py b/python/orca/src/bigdl/orca/learn/tf2/tf_ray_estimator.py index 902d7fa8ecc..071214eeb98 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/tf_ray_estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf2/tf_ray_estimator.py @@ -25,6 +25,7 @@ from zoo.orca.learn.tf2.tf_runner import TFRunner from zoo.ray import RayContext +from zoo.tfpark.tf_dataset import convert_row_to_numpy logger = logging.getLogger(__name__) @@ -56,6 +57,42 @@ def process_spark_xshards(spark_xshards, num_workers): return max_length, ray_xshards +def arrays2dict(iter, feature_cols, label_cols): + + feature_lists = [[] for col in feature_cols] + if label_cols is not None: + label_lists = [[] for col in label_cols] + else: + label_lists = None + + for row in iter: + # feature + if not isinstance(row[0], list): + features = [row[0]] + else: + features = row[0] + + for i, arr in enumerate(features): + feature_lists[i].append(arr) + + # label + if label_cols is not None: + if not isinstance(row[1], list): + labels = [row[1]] + else: + labels = row[1] + + for i, arr in enumerate(labels): + label_lists[i].append(arr) + + feature_arrs = [np.stack(l) for l in feature_lists] + if label_lists is not None: + label_arrs = [np.stack(l) for l in label_lists] + return [{"x": feature_arrs, "y": label_arrs}] + + return [{"x": feature_arrs}] + + class Estimator: def __init__(self, model_creator, @@ -156,7 +193,8 @@ def from_keras(cls, model_creator, def fit(self, data_creator, epochs=1, verbose=1, callbacks=None, validation_data_creator=None, class_weight=None, steps_per_epoch=None, validation_steps=None, validation_freq=1, - data_config=None): + data_config=None, feature_cols=None, + label_cols=None,): """Runs a training epoch.""" params = dict( epochs=epochs, @@ -170,6 +208,22 @@ def fit(self, data_creator, epochs=1, verbose=1, ) from zoo.orca.data import SparkXShards + from pyspark.sql import DataFrame + if isinstance(data_creator, DataFrame): + assert feature_cols is not None,\ + "feature_col must be provided if data_creator is a spark dataframe" + assert label_cols is not None,\ + "label_cols must be provided if data_creator is a spark dataframe" + schema = data_creator.schema + numpy_rdd = data_creator.rdd.map(lambda row: convert_row_to_numpy(row, + schema, + feature_cols, + label_cols)) + shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x, + feature_cols, + label_cols)) + data_creator = SparkXShards(shard_rdd) + if isinstance(data_creator, SparkXShards): max_length, ray_xshards = process_spark_xshards(data_creator, self.num_workers) @@ -208,7 +262,8 @@ def zip_func(worker, this_shards_ref, that_shards_ref): return stats def evaluate(self, data_creator, verbose=1, sample_weight=None, - steps=None, callbacks=None, data_config=None): + steps=None, callbacks=None, data_config=None, + feature_cols=None, label_cols=None): """Evaluates the model on the validation data set.""" logger.info("Starting validation step.") params = dict( @@ -219,11 +274,27 @@ def evaluate(self, data_creator, verbose=1, sample_weight=None, data_config=data_config, ) from zoo.orca.data import SparkXShards + from pyspark.sql import DataFrame + + if isinstance(data_creator, DataFrame): + assert feature_cols is not None,\ + "feature_col must be provided if data_creator is a spark dataframe" + assert label_cols is not None,\ + "label_cols must be provided if data_creator is a spark dataframe" + schema = data_creator.schema + numpy_rdd = data_creator.rdd.map(lambda row: convert_row_to_numpy(row, + schema, + feature_cols, + label_cols)) + shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x, + feature_cols, + label_cols)) + data_creator = SparkXShards(shard_rdd) + if isinstance(data_creator, SparkXShards): data = data_creator if data.num_partitions() != self.num_workers: data = data.repartition(self.num_workers) - max_length = data.rdd.map(data_length).max() ray_xshards = RayXShards.from_spark_xshards(data) @@ -247,7 +318,8 @@ def transform_func(worker, shards_ref): return stats def predict(self, data_creator, batch_size=None, verbose=1, - steps=None, callbacks=None, data_config=None): + steps=None, callbacks=None, data_config=None, + feature_cols=None): """Evaluates the model on the validation data set.""" logger.info("Starting predict step.") params = dict( @@ -258,6 +330,19 @@ def predict(self, data_creator, batch_size=None, verbose=1, data_config=data_config, ) from zoo.orca.data import SparkXShards + from pyspark.sql import DataFrame + if isinstance(data_creator, DataFrame): + assert feature_cols is not None,\ + "feature_col must be provided if data_creator is a spark dataframe" + schema = data_creator.schema + numpy_rdd = data_creator.rdd.map(lambda row: convert_row_to_numpy(row, + schema, + feature_cols, + None)) + shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x, + feature_cols, + None)) + data_creator = SparkXShards(shard_rdd) if isinstance(data_creator, SparkXShards): ray_xshards = RayXShards.from_spark_xshards(data_creator) diff --git a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py index 8e340833bc8..6cce0f8849e 100644 --- a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py +++ b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_ray_estimator.py @@ -18,6 +18,8 @@ import numpy as np import pytest import tensorflow as tf + +from zoo import init_nncontext from zoo.orca.data import XShards import zoo.orca.data.pandas @@ -324,6 +326,32 @@ def test_sparkxshards(self): trainer.fit(train_data_shard, epochs=1, steps_per_epoch=25) trainer.evaluate(train_data_shard, steps=25) + def test_dataframe(self): + + sc = init_nncontext() + rdd = sc.range(0, 10) + from pyspark.sql import SparkSession + spark = SparkSession(sc) + from pyspark.ml.linalg import DenseVector + df = rdd.map(lambda x: (DenseVector(np.random.randn(1,).astype(np.float)), + int(np.random.randint(0, 1, size=())))).toDF(["feature", "label"]) + + config = { + "batch_size": 4, + "lr": 0.8 + } + trainer = Estimator( + model_creator=model_creator, + verbose=True, + config=config, + workers_per_node=2) + + trainer.fit(df, epochs=1, steps_per_epoch=25, + feature_cols=["feature"], + label_cols=["label"]) + trainer.evaluate(df, steps=25, feature_cols=["feature"], label_cols=["label"]) + trainer.predict(df, feature_cols=["feature"]).collect() + def test_sparkxshards_with_inbalanced_data(self): train_data_shard = XShards.partition({"x": np.random.randn(100, 1),