Skip to content

Commit

Permalink
support spark dataframe in tf2 estimator (intel-analytics#3113)
Browse files Browse the repository at this point in the history
* support spark dataframe in tf2 estimator

* fix style
  • Loading branch information
yangw1234 authored and GavinGu07 committed Nov 27, 2020
1 parent 07d62a0 commit 444819a
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 41 deletions.
28 changes: 28 additions & 0 deletions pyzoo/test/zoo/orca/learn/ray/tf/test_tf_ray_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import numpy as np
import pytest
import tensorflow as tf

from zoo import init_nncontext
from zoo.orca.data import XShards

import zoo.orca.data.pandas
Expand Down Expand Up @@ -324,6 +326,32 @@ def test_sparkxshards(self):
trainer.fit(train_data_shard, epochs=1, steps_per_epoch=25)
trainer.evaluate(train_data_shard, steps=25)

def test_dataframe(self):

sc = init_nncontext()
rdd = sc.range(0, 10)
from pyspark.sql import SparkSession
spark = SparkSession(sc)
from pyspark.ml.linalg import DenseVector
df = rdd.map(lambda x: (DenseVector(np.random.randn(1,).astype(np.float)),
int(np.random.randint(0, 1, size=())))).toDF(["feature", "label"])

config = {
"batch_size": 4,
"lr": 0.8
}
trainer = Estimator(
model_creator=model_creator,
verbose=True,
config=config,
workers_per_node=2)

trainer.fit(df, epochs=1, steps_per_epoch=25,
feature_cols=["feature"],
label_cols=["label"])
trainer.evaluate(df, steps=25, feature_cols=["feature"], label_cols=["label"])
trainer.predict(df, feature_cols=["feature"]).collect()

def test_sparkxshards_with_inbalanced_data(self):

train_data_shard = XShards.partition({"x": np.random.randn(100, 1),
Expand Down
93 changes: 89 additions & 4 deletions pyzoo/zoo/orca/learn/tf2/tf_ray_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from zoo.orca.learn.tf2.tf_runner import TFRunner
from zoo.ray import RayContext
from zoo.tfpark.tf_dataset import convert_row_to_numpy

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,6 +57,42 @@ def process_spark_xshards(spark_xshards, num_workers):
return max_length, ray_xshards


def arrays2dict(iter, feature_cols, label_cols):

feature_lists = [[] for col in feature_cols]
if label_cols is not None:
label_lists = [[] for col in label_cols]
else:
label_lists = None

for row in iter:
# feature
if not isinstance(row[0], list):
features = [row[0]]
else:
features = row[0]

for i, arr in enumerate(features):
feature_lists[i].append(arr)

# label
if label_cols is not None:
if not isinstance(row[1], list):
labels = [row[1]]
else:
labels = row[1]

for i, arr in enumerate(labels):
label_lists[i].append(arr)

feature_arrs = [np.stack(l) for l in feature_lists]
if label_lists is not None:
label_arrs = [np.stack(l) for l in label_lists]
return [{"x": feature_arrs, "y": label_arrs}]

return [{"x": feature_arrs}]


class Estimator:
def __init__(self,
model_creator,
Expand Down Expand Up @@ -156,7 +193,8 @@ def from_keras(cls, model_creator,
def fit(self, data_creator, epochs=1, verbose=1,
callbacks=None, validation_data_creator=None, class_weight=None,
steps_per_epoch=None, validation_steps=None, validation_freq=1,
data_config=None):
data_config=None, feature_cols=None,
label_cols=None,):
"""Runs a training epoch."""
params = dict(
epochs=epochs,
Expand All @@ -170,6 +208,22 @@ def fit(self, data_creator, epochs=1, verbose=1,
)

from zoo.orca.data import SparkXShards
from pyspark.sql import DataFrame
if isinstance(data_creator, DataFrame):
assert feature_cols is not None,\
"feature_col must be provided if data_creator is a spark dataframe"
assert label_cols is not None,\
"label_cols must be provided if data_creator is a spark dataframe"
schema = data_creator.schema
numpy_rdd = data_creator.rdd.map(lambda row: convert_row_to_numpy(row,
schema,
feature_cols,
label_cols))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x,
feature_cols,
label_cols))
data_creator = SparkXShards(shard_rdd)

if isinstance(data_creator, SparkXShards):
max_length, ray_xshards = process_spark_xshards(data_creator, self.num_workers)

Expand Down Expand Up @@ -208,7 +262,8 @@ def zip_func(worker, this_shards_ref, that_shards_ref):
return stats

def evaluate(self, data_creator, verbose=1, sample_weight=None,
steps=None, callbacks=None, data_config=None):
steps=None, callbacks=None, data_config=None,
feature_cols=None, label_cols=None):
"""Evaluates the model on the validation data set."""
logger.info("Starting validation step.")
params = dict(
Expand All @@ -219,11 +274,27 @@ def evaluate(self, data_creator, verbose=1, sample_weight=None,
data_config=data_config,
)
from zoo.orca.data import SparkXShards
from pyspark.sql import DataFrame

if isinstance(data_creator, DataFrame):
assert feature_cols is not None,\
"feature_col must be provided if data_creator is a spark dataframe"
assert label_cols is not None,\
"label_cols must be provided if data_creator is a spark dataframe"
schema = data_creator.schema
numpy_rdd = data_creator.rdd.map(lambda row: convert_row_to_numpy(row,
schema,
feature_cols,
label_cols))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x,
feature_cols,
label_cols))
data_creator = SparkXShards(shard_rdd)

if isinstance(data_creator, SparkXShards):
data = data_creator
if data.num_partitions() != self.num_workers:
data = data.repartition(self.num_workers)
max_length = data.rdd.map(data_length).max()

ray_xshards = RayXShards.from_spark_xshards(data)

Expand All @@ -247,7 +318,8 @@ def transform_func(worker, shards_ref):
return stats

def predict(self, data_creator, batch_size=None, verbose=1,
steps=None, callbacks=None, data_config=None):
steps=None, callbacks=None, data_config=None,
feature_cols=None):
"""Evaluates the model on the validation data set."""
logger.info("Starting predict step.")
params = dict(
Expand All @@ -258,6 +330,19 @@ def predict(self, data_creator, batch_size=None, verbose=1,
data_config=data_config,
)
from zoo.orca.data import SparkXShards
from pyspark.sql import DataFrame
if isinstance(data_creator, DataFrame):
assert feature_cols is not None,\
"feature_col must be provided if data_creator is a spark dataframe"
schema = data_creator.schema
numpy_rdd = data_creator.rdd.map(lambda row: convert_row_to_numpy(row,
schema,
feature_cols,
None))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x,
feature_cols,
None))
data_creator = SparkXShards(shard_rdd)
if isinstance(data_creator, SparkXShards):
ray_xshards = RayXShards.from_spark_xshards(data_creator)

Expand Down
97 changes: 60 additions & 37 deletions pyzoo/zoo/tfpark/tf_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,21 @@ def from_dataframe(df, feature_cols, labels_cols=None, batch_size=-1,
sequential_order, shuffle)


def _tf_get_types(dataset):
import tensorflow as tf
return tf.compat.v1.data.get_output_types(dataset)


def _tf_get_shapes(dataset):
import tensorflow as tf
return tf.compat.v1.data.get_output_shapes(dataset)


def _tf_make_iterator(dataset):
import tensorflow as tf
return tf.compat.v1.data.make_initializable_iterator(dataset)


class TFDataDataset(TFDataset):
def get_num_partitions(self):
# only called in inference case
Expand Down Expand Up @@ -735,17 +750,17 @@ def __init__(self, tf_data_dataset, batch_size,
from tensorflow.python.keras.engine import training_utils
training_utils.verify_dataset_shuffled(tf_data_dataset)

flatten_shapes = nest.flatten(tf.compat.v1.data.get_output_shapes(tf_data_dataset))
flatten_shapes = nest.flatten(_tf_get_shapes(tf_data_dataset))
if batch_outside:
flatten_shapes = [shape[1:] for shape in flatten_shapes]

flatten_types = nest.flatten(tf.compat.v1.data.get_output_types(tf_data_dataset))
flatten_types = nest.flatten(_tf_get_types(tf_data_dataset))

flatten_tensor_structure = [TensorMeta(dtype=flatten_types[i],
shape=list(flatten_shapes[i]),
name="zoo_input_{}".format(i))
for i in range(len(flatten_shapes))]
structure = tf.compat.v1.data.get_output_types(tf_data_dataset)
structure = _tf_get_types(tf_data_dataset)
if isinstance(structure, tf.DType):
structure = (structure,)
tensor_structure = nest.pack_sequence_as(structure,
Expand Down Expand Up @@ -807,11 +822,10 @@ def __init__(self, tf_data_dataset, batch_size,

self.shard_index = shard_index
self.train_dataset = tf_data_dataset
self.train_iterator = tf.compat.v1.data.make_initializable_iterator(self.train_dataset)
self.train_iterator = _tf_make_iterator(self.train_dataset)
self.train_next_ops = nest.flatten(self.train_iterator.get_next())
self.output_types = [t.as_datatype_enum
for t in nest.flatten(
tf.compat.v1.data.get_output_types(self.train_dataset))]
for t in nest.flatten(_tf_get_types(self.train_dataset))]

self.validation_dataset = validation_dataset
self.validation_iterator = None
Expand All @@ -820,7 +834,7 @@ def __init__(self, tf_data_dataset, batch_size,
self._train_init_op_name = self.train_iterator.initializer.name
self._train_output_names = [op.name for op in self.train_next_ops]
if validation_dataset is not None:
self.validation_iterator = tf.compat.v1.data.make_initializable_iterator(
self.validation_iterator = _tf_make_iterator(
self.validation_dataset)
self.validation_next_ops = nest.flatten(self.validation_iterator.get_next())
self._val_init_op_name = self.validation_iterator.initializer.name
Expand Down Expand Up @@ -1169,6 +1183,34 @@ def from_ndarrays(tensors, batch_size=-1, batch_per_thread=-1,
val_rdd, sequential_order=sequential_order, shuffle=shuffle)


def convert_row_to_numpy(row, schema, feature_cols, labels_cols):
def convert_for_cols(row, cols):
import pyspark.sql.types as df_types
result = []
for name in cols:
feature_type = schema[name].dataType
if DataFrameDataset.is_scalar_type(feature_type):
result.append(np.array(row[name]))
elif isinstance(feature_type, df_types.ArrayType):
result.append(np.array(row[name]))
elif isinstance(row[name], DenseVector):
result.append(row[name].values)
else:
assert isinstance(row[name], SparseVector), \
"unsupported field {}, data {}".format(schema[name], row[name])
result.append(row[name].toArray())
if len(result) == 1:
return result[0]
return result

features = convert_for_cols(row, feature_cols)
if labels_cols:
labels = convert_for_cols(row, labels_cols)
return (features, labels)
else:
return (features,)


class DataFrameDataset(TFNdarrayDataset):
@staticmethod
def df_datatype_to_tf(dtype):
Expand Down Expand Up @@ -1243,36 +1285,17 @@ def __init__(self, df, feature_cols, labels_cols=None, batch_size=-1,
else:
tensor_structure = (feature_meta,)

def convert(row):

def convert_for_cols(row, cols):
import pyspark.sql.types as df_types
result = []
for name in cols:
feature_type = schema[name].dataType
if DataFrameDataset.is_scalar_type(feature_type):
result.append(np.array(row[name]))
elif isinstance(feature_type, df_types.ArrayType):
result.append(np.array(row[name]))
elif isinstance(row[name], DenseVector):
result.append(row[name].values)
else:
assert isinstance(row[name], SparseVector), \
"unsupported field {}, data {}".format(schema[name], row[name])
result.append(row[name].toArray())
if len(result) == 1:
return result[0]
return result

features = convert_for_cols(row, feature_cols)
if labels_cols:
labels = convert_for_cols(row, labels_cols)
return (features, labels)
else:
return (features,)

rdd = selected_df.rdd.map(lambda row: convert(row))
val_rdd = validation_df.rdd.map(lambda row: convert(row)) if validation_df else None
rdd = selected_df.rdd.map(lambda row: convert_row_to_numpy(row,
schema,
feature_cols,
labels_cols))
if validation_df is not None:
val_rdd = validation_df.rdd.map(lambda row: convert_row_to_numpy(row,
schema,
feature_cols,
labels_cols))
else:
val_rdd = None

super(DataFrameDataset, self).__init__(rdd, tensor_structure, batch_size,
batch_per_thread, hard_code_batch_size,
Expand Down

0 comments on commit 444819a

Please sign in to comment.