From d1bc92ebae7ac9e3764b19638a43e919c89b15b1 Mon Sep 17 00:00:00 2001 From: Zhou Date: Thu, 8 Sep 2022 13:45:46 +0800 Subject: [PATCH 1/8] add more datatype --- python/dllib/src/bigdl/dllib/utils/utils.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/dllib/src/bigdl/dllib/utils/utils.py b/python/dllib/src/bigdl/dllib/utils/utils.py index 256949b2b4d..f9028365902 100644 --- a/python/dllib/src/bigdl/dllib/utils/utils.py +++ b/python/dllib/src/bigdl/dllib/utils/utils.py @@ -210,10 +210,14 @@ def convert_for_cols(row, cols): if _is_scalar_type(feature_type, accept_str_col): if isinstance(feature_type, df_types.FloatType): result.append(np.array(row[name]).astype(np.float32)) + elif isinstance(feature_type, df_types.DoubleType): + result.append(np.array(row[name]).astype(np.float64)) elif isinstance(feature_type, df_types.TimestampType): result.append(np.array(row[name]).astype('datetime64[ns]')) elif isinstance(feature_type, df_types.IntegerType): result.append(np.array(row[name]).astype(np.int32)) + elif isinstance(feature_type, df_types.LongType): + result.append(np.array(row[name]).astype(np.int64)) elif isinstance(feature_type, df_types.DecimalType): result.append(np.array(row[name]).astype(np.float64)) else: @@ -221,6 +225,14 @@ def convert_for_cols(row, cols): elif isinstance(feature_type, df_types.ArrayType): if accept_str_col and isinstance(feature_type.elementType, df_types.StringType): result.append(np.array(row[name]).astype(np.str)) + elif isinstance(feature_type.elementType, df_types.FloatType): + result.append(np.array(row[name]).astype(np.float32)) + elif isinstance(feature_type.elementType, df_types.DoubleType): + result.append(np.array(row[name]).astype(np.float64)) + elif isinstance(feature_type.elementType, df_types.IntegerType): + result.append(np.array(row[name]).astype(np.int32)) + elif isinstance(feature_type.elementType, df_types.LongType): + result.append(np.array(row[name]).astype(np.int64)) else: result.append(np.array(row[name]).astype(np.float32)) elif isinstance(row[name], DenseVector): From 0539cdf72bdf8c2ebeab38c2cc86060753c248b9 Mon Sep 17 00:00:00 2001 From: Zhou Date: Thu, 8 Sep 2022 16:01:04 +0800 Subject: [PATCH 2/8] update unit test --- .../pytorch/test_estimator_pyspark_backend.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py index 9bd8212b499..709b530a3e9 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py @@ -22,6 +22,9 @@ import torch import torch.nn as nn +from pyspark.sql import SparkSession +from pyspark.sql.types import FloatType, ArrayType, StructType, StructField + from bigdl.orca import OrcaContext from bigdl.orca.data.pandas import read_csv from bigdl.orca.learn.metrics import Accuracy @@ -138,7 +141,7 @@ def on_train_end(self, logs=None): assert self.model def on_epoch_end(self, epoch, logs=None): - assert "train_loss" in logs + assert "train_loss" in logs assert "val_loss" in logs assert self.model @@ -303,7 +306,7 @@ def test_dataframe_predict(self): def test_xshards_predict_save_load(self): sc = init_nncontext() - rdd = sc.range(0, 110).map(lambda x: np.array([x]*50)) + rdd = sc.range(0, 110).map(lambda x: np.array([x] * 50)) shards = rdd.mapPartitions(lambda iter: chunks(iter, 5)).map(lambda x: {"x": np.stack(x)}) shards = SparkXShards(shards) @@ -373,6 +376,7 @@ def test_multiple_inputs_model(self): def test_data_parallel_sgd_correctness(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # partition 0: [(0, 0), (0, 0)] @@ -390,8 +394,12 @@ def test_data_parallel_sgd_correctness(self): # partition 1 loss: 0.25 # avg_grad = avg([0, 0, 1, 1]) = 0.5 # weight = 0.5 - 0.5 * avg_grad = 0.25 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) def get_optimizer(model, config): return torch.optim.SGD(model.parameters(), lr=0.5) @@ -446,7 +454,7 @@ def test_checkpoint_callback(self): latest_checkpoint_path = Estimator.latest_checkpoint(self.model_dir) assert os.path.isfile(latest_checkpoint_path) estimator.shutdown() - new_estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, + new_estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, log_level=logging.DEBUG) new_estimator.load_checkpoint(latest_checkpoint_path) eval_after = new_estimator.evaluate(df, batch_size=4, @@ -494,7 +502,7 @@ def test_manual_ckpt(self): def test_custom_callback(self): estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) callbacks = [CustomCallback()] - estimator.fit(train_data_loader, epochs=4, batch_size=128, + estimator.fit(train_data_loader, epochs=4, batch_size=128, validation_data=val_data_loader, callbacks=callbacks) From 63f658037a87dcd9af627b8de27844d5d4b3cac0 Mon Sep 17 00:00:00 2001 From: Zhou Date: Fri, 9 Sep 2022 10:14:47 +0800 Subject: [PATCH 3/8] update ut in test_estimator_pyspark_backend --- .../pytorch/test_estimator_pyspark_backend.py | 80 ++++++++++++++----- 1 file changed, 60 insertions(+), 20 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py index 709b530a3e9..320e1948ada 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py @@ -23,7 +23,7 @@ import torch.nn as nn from pyspark.sql import SparkSession -from pyspark.sql.types import FloatType, ArrayType, StructType, StructField +from pyspark.sql.types import FloatType, IntegerType, ArrayType, StructType, StructField from bigdl.orca import OrcaContext from bigdl.orca.data.pandas import read_csv @@ -236,10 +236,17 @@ def test_spark_xshards(self): def test_dataframe_train_eval(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [int(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(IntegerType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) estimator.fit(df, batch_size=4, epochs=2, @@ -255,10 +262,16 @@ def test_dataframe_shard_size_train_eval(self): from bigdl.orca import OrcaContext OrcaContext._shard_size = 30 sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [int(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(IntegerType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) estimator.fit(df, batch_size=4, epochs=2, @@ -270,10 +283,17 @@ def test_dataframe_shard_size_train_eval(self): def test_partition_num_less_than_workers(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(200, numSlices=1) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [int(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(IntegerType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir) assert df.rdd.getNumPartitions() < estimator.num_workers @@ -355,10 +375,17 @@ def test_multiple_inputs_model(self): rdd = sc.parallelize(range(100)) from pyspark.sql import SparkSession - spark = SparkSession(sc) - df = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["f1", "f2", "label"]) + spark = SparkSession.builder.getOrCreate() + data = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, + [int(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("f1", ArrayType(FloatType()), True), + StructField("f2", ArrayType(FloatType()), True), + StructField("label", ArrayType(IntegerType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: MultiInputNet(), @@ -426,11 +453,17 @@ def get_optimizer(model, config): def test_checkpoint_callback(self): from bigdl.orca.learn.pytorch.callbacks.model_checkpoint import ModelCheckpoint sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [int(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(IntegerType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, @@ -467,11 +500,18 @@ def test_checkpoint_callback(self): def test_manual_ckpt(self): sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [int(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(IntegerType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir, From 3d12df27ee6dcd0aa3762a5dcec8900ebd7e8162 Mon Sep 17 00:00:00 2001 From: Zhou Date: Fri, 9 Sep 2022 12:35:26 +0800 Subject: [PATCH 4/8] update ut in test_estimator_pyspark_backend --- .../pytorch/test_estimator_pyspark_backend.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py index 320e1948ada..0240f2de4e7 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py @@ -239,11 +239,11 @@ def test_dataframe_train_eval(self): spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) + [float(np.random.randint(0, 2, size=()))]) ) schema = StructType([ StructField("feature", ArrayType(FloatType()), True), - StructField("label", ArrayType(IntegerType()), True) + StructField("label", ArrayType(FloatType()), True) ]) df = spark.createDataFrame(data=data, schema=schema) @@ -265,11 +265,11 @@ def test_dataframe_shard_size_train_eval(self): spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) + [float(np.random.randint(0, 2, size=()))]) ) schema = StructType([ StructField("feature", ArrayType(FloatType()), True), - StructField("label", ArrayType(IntegerType()), True) + StructField("label", ArrayType(FloatType()), True) ]) df = spark.createDataFrame(data=data, schema=schema) @@ -286,11 +286,11 @@ def test_partition_num_less_than_workers(self): spark = SparkSession.builder.getOrCreate() rdd = sc.range(200, numSlices=1) data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) + [float(np.random.randint(0, 2, size=()))]) ) schema = StructType([ StructField("feature", ArrayType(FloatType()), True), - StructField("label", ArrayType(IntegerType()), True) + StructField("label", ArrayType(FloatType()), True) ]) df = spark.createDataFrame(data=data, schema=schema) @@ -377,12 +377,12 @@ def test_multiple_inputs_model(self): from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() data = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, - [int(np.random.randint(0, 2, size=()))]) + [float(np.random.randint(0, 2, size=()))]) ) schema = StructType([ StructField("f1", ArrayType(FloatType()), True), StructField("f2", ArrayType(FloatType()), True), - StructField("label", ArrayType(IntegerType()), True) + StructField("label", ArrayType(FloatType()), True) ]) df = spark.createDataFrame(data=data, schema=schema) @@ -457,11 +457,11 @@ def test_checkpoint_callback(self): rdd = sc.range(0, 100) epochs = 2 data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) + [float(np.random.randint(0, 2, size=()))]) ) schema = StructType([ StructField("feature", ArrayType(FloatType()), True), - StructField("label", ArrayType(IntegerType()), True) + StructField("label", ArrayType(FloatType()), True) ]) df = spark.createDataFrame(data=data, schema=schema) df = df.cache() @@ -504,11 +504,11 @@ def test_manual_ckpt(self): rdd = sc.range(0, 100) epochs = 2 data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) + [float(np.random.randint(0, 2, size=()))]) ) schema = StructType([ StructField("feature", ArrayType(FloatType()), True), - StructField("label", ArrayType(IntegerType()), True) + StructField("label", ArrayType(FloatType()), True) ]) df = spark.createDataFrame(data=data, schema=schema) From 37f5df94fd763bc3da1cce6c6bd29d6415f93cec Mon Sep 17 00:00:00 2001 From: Zhou Date: Fri, 9 Sep 2022 14:51:21 +0800 Subject: [PATCH 5/8] update ut in test_estimator_ray_backend --- .../pytorch/test_estimator_pyspark_backend.py | 2 +- .../ray/pytorch/test_estimator_ray_backend.py | 122 ++++++++++++++---- 2 files changed, 95 insertions(+), 29 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py index 0240f2de4e7..711c37b0631 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_pyspark_backend.py @@ -23,7 +23,7 @@ import torch.nn as nn from pyspark.sql import SparkSession -from pyspark.sql.types import FloatType, IntegerType, ArrayType, StructType, StructField +from pyspark.sql.types import FloatType, ArrayType, StructType, StructField from bigdl.orca import OrcaContext from bigdl.orca.data.pandas import read_csv diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py index b8956a6ea04..fd9be94dd99 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py @@ -24,6 +24,9 @@ import torch import torch.nn as nn +from pyspark.sql import SparkSession +from pyspark.sql.types import FloatType, ArrayType, StructType, StructField + from bigdl.orca import OrcaContext from bigdl.orca.data.pandas import read_csv from bigdl.orca.learn.metrics import Accuracy @@ -250,10 +253,17 @@ def test_spark_xshards(self): def test_dataframe_train_eval(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) val_rdd = sc.range(0, 40) val_df = val_rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), @@ -273,10 +283,17 @@ def test_dataframe_shard_size_train_eval(self): from bigdl.orca import OrcaContext OrcaContext._shard_size = 30 sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2) estimator.fit(df, batch_size=4, epochs=2, @@ -288,10 +305,17 @@ def test_dataframe_shard_size_train_eval(self): def test_partition_num_less_than_workers(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(200, numSlices=1) - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2) assert df.rdd.getNumPartitions() < estimator.num_workers @@ -358,11 +382,17 @@ def test_multiple_inputs_model(self): sc = init_nncontext() rdd = sc.parallelize(range(100)) - from pyspark.sql import SparkSession - spark = SparkSession(sc) - df = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["f1", "f2", "label"]) + spark = SparkSession.builder.getOrCreate() + data = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25, + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("f1", ArrayType(FloatType()), True), + StructField("f2", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: MultiInputNet()) @@ -379,12 +409,18 @@ def test_multiple_inputs_model(self): def test_unenven_data(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(3) # the data and model are constructed that loss on worker 0 is always 0.0 # and loss on worker 1 is always 1.0 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: LinearModel(), @@ -399,12 +435,18 @@ def test_unenven_data(self): def test_sync_stats(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # the data and model are constructed that loss on worker 0 is always 0.0 # and loss on worker 1 is always 1.0 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: LinearModel(), @@ -429,13 +471,19 @@ def test_sync_stats(self): def test_not_sync_stats(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # the data and model are constructed that loss on worker 0 is always 0.0 # and loss on worker 1 is always 1.0 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) estimator = get_estimator(workers_per_node=2, model_fn=lambda config: LinearModel(), @@ -455,6 +503,7 @@ def test_not_sync_stats(self): def test_data_parallel_sgd_correctness(self): sc = init_nncontext() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100).repartition(2) # partition 0: [(0, 0), (0, 0)] @@ -472,8 +521,12 @@ def test_data_parallel_sgd_correctness(self): # partition 1 loss: 0.25 # avg_grad = avg([0, 0, 1, 1]) = 0.5 # weight = 0.5 - 0.5 * avg_grad = 0.25 - df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2] - ).toDF(["feature", "label"]) + data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2]) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) def get_optimizer(model, config): return torch.optim.SGD(model.parameters(), lr=0.5) @@ -524,11 +577,17 @@ def get_optimizer(model, config): def test_checkpoint_callback(self): from bigdl.orca.learn.pytorch.callbacks.model_checkpoint import ModelCheckpoint sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2, log_level=logging.DEBUG) @@ -568,11 +627,18 @@ def test_checkpoint_callback(self): def test_manual_ckpt(self): sc = OrcaContext.get_spark_context() + spark = SparkSession.builder.getOrCreate() rdd = sc.range(0, 100) epochs = 2 - df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + schema = StructType([ + StructField("feature", ArrayType(FloatType()), True), + StructField("label", ArrayType(FloatType()), True) + ]) + + df = spark.createDataFrame(data=data, schema=schema) df = df.cache() estimator = get_estimator(workers_per_node=2) From 6d857f0b4c73964710d577b3664dfa4c40bda379 Mon Sep 17 00:00:00 2001 From: Zhou Date: Fri, 9 Sep 2022 16:53:19 +0800 Subject: [PATCH 6/8] update ut in test_estimator_ray_backend --- .../orca/learn/ray/pytorch/test_estimator_ray_backend.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py index fd9be94dd99..27b7686ad4f 100644 --- a/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py +++ b/python/orca/test/bigdl/orca/learn/ray/pytorch/test_estimator_ray_backend.py @@ -266,9 +266,10 @@ def test_dataframe_train_eval(self): df = spark.createDataFrame(data=data, schema=schema) val_rdd = sc.range(0, 40) - val_df = val_rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), - [int(np.random.randint(0, 2, size=()))]) - ).toDF(["feature", "label"]) + val_data = val_rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(), + [float(np.random.randint(0, 2, size=()))]) + ) + val_df = spark.createDataFrame(data=val_data, schema=schema) estimator = get_estimator(workers_per_node=2) estimator.fit(df, batch_size=4, epochs=2, From c2ba3ad689a7f2adf1d6630f2581b99fd753613e Mon Sep 17 00:00:00 2001 From: Zhou Date: Tue, 13 Sep 2022 14:44:05 +0800 Subject: [PATCH 7/8] add array decimal type --- python/dllib/src/bigdl/dllib/utils/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/dllib/src/bigdl/dllib/utils/utils.py b/python/dllib/src/bigdl/dllib/utils/utils.py index f9028365902..13a5fdd8633 100644 --- a/python/dllib/src/bigdl/dllib/utils/utils.py +++ b/python/dllib/src/bigdl/dllib/utils/utils.py @@ -233,6 +233,8 @@ def convert_for_cols(row, cols): result.append(np.array(row[name]).astype(np.int32)) elif isinstance(feature_type.elementType, df_types.LongType): result.append(np.array(row[name]).astype(np.int64)) + elif isinstance(feature_type.elementType, df_types.DecimalType): + result.append(np.array(row[name]).astype(np.float64)) else: result.append(np.array(row[name]).astype(np.float32)) elif isinstance(row[name], DenseVector): From 3113546d752765b9c29281deeeaac8bd6f9f661c Mon Sep 17 00:00:00 2001 From: Zhou Date: Tue, 13 Sep 2022 17:23:58 +0800 Subject: [PATCH 8/8] update array data type --- python/dllib/src/bigdl/dllib/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dllib/src/bigdl/dllib/utils/utils.py b/python/dllib/src/bigdl/dllib/utils/utils.py index 13a5fdd8633..acce9f044e0 100644 --- a/python/dllib/src/bigdl/dllib/utils/utils.py +++ b/python/dllib/src/bigdl/dllib/utils/utils.py @@ -236,7 +236,7 @@ def convert_for_cols(row, cols): elif isinstance(feature_type.elementType, df_types.DecimalType): result.append(np.array(row[name]).astype(np.float64)) else: - result.append(np.array(row[name]).astype(np.float32)) + result.append(np.array(row[name])) elif isinstance(row[name], DenseVector): result.append(row[name].values.astype(np.float32)) else: