From 1c03a40390e5c80abc7df7fbb29d496a401c692f Mon Sep 17 00:00:00 2001 From: Jian Zhou <41574757+PatrickkZ@users.noreply.github.com> Date: Tue, 13 Sep 2022 20:46:18 +0800 Subject: [PATCH] Add more data types in convert DataFrame to numpy (#5680) * add more datatype * update unit test * update ut in test_estimator_pyspark_backend * update ut in test_estimator_pyspark_backend * update ut in test_estimator_ray_backend * update ut in test_estimator_ray_backend * add array decimal type * update array data type Co-authored-by: Zhou --- python/dllib/src/bigdl/dllib/utils/utils.py | 16 ++- .../pytorch/test_estimator_pyspark_backend.py | 98 +++++++++---- .../ray/pytorch/test_estimator_ray_backend.py | 129 +++++++++++++----- 3 files changed, 186 insertions(+), 57 deletions(-) diff --git a/python/dllib/src/bigdl/dllib/utils/utils.py b/python/dllib/src/bigdl/dllib/utils/utils.py index 256949b2b4d..acce9f044e0 100644 --- a/python/dllib/src/bigdl/dllib/utils/utils.py +++ b/python/dllib/src/bigdl/dllib/utils/utils.py @@ -210,10 +210,14 @@ def convert_for_cols(row, cols): if _is_scalar_type(feature_type, accept_str_col): if isinstance(feature_type, df_types.FloatType): result.append(np.array(row[name]).astype(np.float32)) + elif isinstance(feature_type, df_types.DoubleType): + result.append(np.array(row[name]).astype(np.float64)) elif isinstance(feature_type, df_types.TimestampType): result.append(np.array(row[name]).astype('datetime64[ns]')) elif isinstance(feature_type, df_types.IntegerType): result.append(np.array(row[name]).astype(np.int32)) + elif isinstance(feature_type, df_types.LongType): + result.append(np.array(row[name]).astype(np.int64)) elif isinstance(feature_type, df_types.DecimalType): result.append(np.array(row[name]).astype(np.float64)) else: @@ -221,8 +225,18 @@ def convert_for_cols(row, cols): elif isinstance(feature_type, df_types.ArrayType): if accept_str_col and isinstance(feature_type.elementType, df_types.StringType): result.append(np.array(row[name]).astype(np.str)) - else: + elif isinstance(feature_type.elementType, df_types.FloatType): result.append(np.array(row[name]).astype(np.float32)) + elif isinstance(feature_type.elementType, df_types.DoubleType): + result.append(np.array(row[name]).astype(np.float64)) + elif isinstance(feature_type.elementType, df_types.IntegerType): + result.append(np.array(row[name]).astype(np.int32)) + elif isinstance(feature_type.elementType, df_types.LongType): + result.append(np.array(row[name]).astype(np.int64)) + elif isinstance(feature_type.elementType, df_types.DecimalType): + result.append(np.array(row[name]).astype(np.float64)) + else: + result.append(np.array(row[name])) elif isinstance(row[name], DenseVector): result.append(row[name].values.astype(np.float32)) else: diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py index 9bd8212b499..711c37b0631 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py @@ -22,6 +22,9 @@ import torch import torch.nn as nn +from pyspark.sql import SparkSession +from pyspark.sql.types import FloatType, ArrayType, StructType, StructField + from bigdl.orca import OrcaContext from bigdl.orca.data.pandas import read_csv from bigdl.orca.learn.metrics import Accuracy @@ -138,7 +141,7 @@ def on_train_end(self, logs=None): assert self.model def on_epoch_end(self, epoch, logs=None): - assert "train_loss" in logs + assert "train_loss" in logs assert "val_loss" in logs assert self.model @@ -233,10 +236,17 @@ def test_spark_xshards(self): def test_dataframe_train_eval(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) estimator.fit(df, batch_size=4, epochs=2, @@ -252,10 +262,16 @@ def test_dataframe_shard_size_train_eval(self): from bigdl.orca import OrcaContext OrcaContext._shard_size = 30 sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) estimator.fit(df, batch_size=4, epochs=2, @@ -267,10 +283,17 @@ def test_dataframe_shard_size_train_eval(self): def test_partition_num_less_than_workers(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(200, numSlices=1) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) assert df.rdd.getNumPartitions() < estimator.num_workers @@ -303,7 +326,7 @@ def test_dataframe_predict(self): def test_xshards_predict_save_load(self): sc = init_nncontext() - rdd = sc.range(0, 110).map(lambda x: np.array([x]*50)) + rdd = sc.range(0, 110).map(lambda x: np.array([x] * 50)) shards = rdd.mapPartitions(lambda iter: chunks(iter, 5)).map(lambda x: {"x": np.stack(x)}) shards = SparkXShards(shards) @@ -352,10 +375,17 @@ def test_multiple_inputs_model(self): rdd = sc.parallelize(range(100)) from pyspark.sql import SparkSession - spark = SparkSession(sc) - df = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["f1", "f2", "label"]) + spark = SparkSession.builder.getOrCreate() + data = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("f1", ArrayType(FloatType()), True), + StructField("f2", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: MultiInputNet(), @@ -373,6 +403,7 @@ def test_multiple_inputs_model(self): def test_data_parallel_sgd_correctness(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # partition 0: [(0, 0), (0, 0)] @@ -390,8 +421,12 @@ def test_data_parallel_sgd_correctness(self): # partition 1 loss: 0.25 # avg_grad = avg([0, 0, 1, 1]) = 0.5 # weight = 0.5 - 0.5 * avg_grad = 0.25 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) def get_optimizer(model, config): return torch.optim.SGD(model.parameters(), lr=0.5) @@ -418,11 +453,17 @@ def get_optimizer(model, config): def test_checkpoint_callback(self): from bigdl.orca.learn.pytorch.callbacks.model_checkpoint import ModelCheckpoint sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, @@ -446,7 +487,7 @@ def test_checkpoint_callback(self): latest_checkpoint_path = Estimator.latest_checkpoint(self.model_dir) assert os.path.isfile(latest_checkpoint_path) estimator.shutdown() - new_estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, + new_estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, log_level=logging.DEBUG) new_estimator.load_checkpoint(latest_checkpoint_path) eval_after = new_estimator.evaluate(df, batch_size=4, @@ -459,11 +500,18 @@ def test_checkpoint_callback(self): def test_manual_ckpt(self): sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, @@ -494,7 +542,7 @@ def test_manual_ckpt(self): def test_custom_callback(self): estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) callbacks = [CustomCallback()] - estimator.fit(train_data_loader, epochs=4, batch_size=128, + estimator.fit(train_data_loader, epochs=4, batch_size=128, validation_data=val_data_loader, callbacks=callbacks) diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py index b8956a6ea04..27b7686ad4f 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py @@ -24,6 +24,9 @@ import torch import torch.nn as nn +from pyspark.sql import SparkSession +from pyspark.sql.types import FloatType, ArrayType, StructType, StructField + from bigdl.orca import OrcaContext from bigdl.orca.data.pandas import read_csv from bigdl.orca.learn.metrics import Accuracy @@ -250,15 +253,23 @@ def test_spark_xshards(self): def test_dataframe_train_eval(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) val_rdd = sc.range(0, 40) - val_df = val_rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + val_data = val_rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + val_df = spark.createDataFrame(data=val_data, schema=schema) estimator = get_estimator(workers_per_node=2) estimator.fit(df, batch_size=4, epochs=2, @@ -273,10 +284,17 @@ def test_dataframe_shard_size_train_eval(self): from bigdl.orca import OrcaContext OrcaContext._shard_size = 30 sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2) estimator.fit(df, batch_size=4, epochs=2, @@ -288,10 +306,17 @@ def test_dataframe_shard_size_train_eval(self): def test_partition_num_less_than_workers(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(200, numSlices=1) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2) assert df.rdd.getNumPartitions() < estimator.num_workers @@ -358,11 +383,17 @@ def test_multiple_inputs_model(self): sc = init_nncontext() rdd = sc.parallelize(range(100)) - from pyspark.sql import SparkSession - spark = SparkSession(sc) - df = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["f1", "f2", "label"]) + spark = SparkSession.builder.getOrCreate() + data = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("f1", ArrayType(FloatType()), True), + StructField("f2", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: MultiInputNet()) @@ -379,12 +410,18 @@ def test_multiple_inputs_model(self): def test_unenven_data(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(3) # the data and model are constructed that loss on worker 0 is always 0.0 # and loss on worker 1 is always 1.0 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: LinearModel(), @@ -399,12 +436,18 @@ def test_unenven_data(self): def test_sync_stats(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # the data and model are constructed that loss on worker 0 is always 0.0 # and loss on worker 1 is always 1.0 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: LinearModel(), @@ -429,13 +472,19 @@ def test_sync_stats(self): def test_not_sync_stats(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # the data and model are constructed that loss on worker 0 is always 0.0 # and loss on worker 1 is always 1.0 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: LinearModel(), @@ -455,6 +504,7 @@ def test_not_sync_stats(self): def test_data_parallel_sgd_correctness(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # partition 0: [(0, 0), (0, 0)] @@ -472,8 +522,12 @@ def test_data_parallel_sgd_correctness(self): # partition 1 loss: 0.25 # avg_grad = avg([0, 0, 1, 1]) = 0.5 # weight = 0.5 - 0.5 * avg_grad = 0.25 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) def get_optimizer(model, config): return torch.optim.SGD(model.parameters(), lr=0.5) @@ -524,11 +578,17 @@ def get_optimizer(model, config): def test_checkpoint_callback(self): from bigdl.orca.learn.pytorch.callbacks.model_checkpoint import ModelCheckpoint sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2, log_level=logging.DEBUG) @@ -568,11 +628,18 @@ def test_checkpoint_callback(self): def test_manual_ckpt(self): sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2)