Skip to content

Commit

Permalink
Add more data types in convert DataFrame to numpy (#5680)
Browse files Browse the repository at this point in the history
* add more datatype

* update unit test

* update ut in test_estimator_pyspark_backend

* update ut in test_estimator_pyspark_backend

* update ut in test_estimator_ray_backend

* update ut in test_estimator_ray_backend

* add array decimal type

* update array data type

Co-authored-by: Zhou <[email protected]>
  • Loading branch information
PatrickkZ and PatrickkZ authored Sep 13, 2022
1 parent 4285bdd commit 1c03a40
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 57 deletions.
16 changes: 15 additions & 1 deletion python/dllib/src/bigdl/dllib/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,33 @@ def convert_for_cols(row, cols):
if _is_scalar_type(feature_type, accept_str_col):
if isinstance(feature_type, df_types.FloatType):
result.append(np.array(row[name]).astype(np.float32))
elif isinstance(feature_type, df_types.DoubleType):
result.append(np.array(row[name]).astype(np.float64))
elif isinstance(feature_type, df_types.TimestampType):
result.append(np.array(row[name]).astype('datetime64[ns]'))
elif isinstance(feature_type, df_types.IntegerType):
result.append(np.array(row[name]).astype(np.int32))
elif isinstance(feature_type, df_types.LongType):
result.append(np.array(row[name]).astype(np.int64))
elif isinstance(feature_type, df_types.DecimalType):
result.append(np.array(row[name]).astype(np.float64))
else:
result.append(np.array(row[name]))
elif isinstance(feature_type, df_types.ArrayType):
if accept_str_col and isinstance(feature_type.elementType, df_types.StringType):
result.append(np.array(row[name]).astype(np.str))
else:
elif isinstance(feature_type.elementType, df_types.FloatType):
result.append(np.array(row[name]).astype(np.float32))
elif isinstance(feature_type.elementType, df_types.DoubleType):
result.append(np.array(row[name]).astype(np.float64))
elif isinstance(feature_type.elementType, df_types.IntegerType):
result.append(np.array(row[name]).astype(np.int32))
elif isinstance(feature_type.elementType, df_types.LongType):
result.append(np.array(row[name]).astype(np.int64))
elif isinstance(feature_type.elementType, df_types.DecimalType):
result.append(np.array(row[name]).astype(np.float64))
else:
result.append(np.array(row[name]))
elif isinstance(row[name], DenseVector):
result.append(row[name].values.astype(np.float32))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import torch
import torch.nn as nn

from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, ArrayType, StructType, StructField

from bigdl.orca import OrcaContext
from bigdl.orca.data.pandas import read_csv
from bigdl.orca.learn.metrics import Accuracy
Expand Down Expand Up @@ -138,7 +141,7 @@ def on_train_end(self, logs=None):
assert self.model

def on_epoch_end(self, epoch, logs=None):
assert "train_loss" in logs
assert "train_loss" in logs
assert "val_loss" in logs
assert self.model

Expand Down Expand Up @@ -233,10 +236,17 @@ def test_spark_xshards(self):
def test_dataframe_train_eval(self):

sc = init_nncontext()
spark = SparkSession.builder.getOrCreate()
rdd = sc.range(0, 100)
df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])
data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[float(np.random.randint(0, 2, size=()))])
)
schema = StructType([
StructField("feature", ArrayType(FloatType()), True),
StructField("label", ArrayType(FloatType()), True)
])

df = spark.createDataFrame(data=data, schema=schema)

estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir)
estimator.fit(df, batch_size=4, epochs=2,
Expand All @@ -252,10 +262,16 @@ def test_dataframe_shard_size_train_eval(self):
from bigdl.orca import OrcaContext
OrcaContext._shard_size = 30
sc = init_nncontext()
spark = SparkSession.builder.getOrCreate()
rdd = sc.range(0, 100)
df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])
data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[float(np.random.randint(0, 2, size=()))])
)
schema = StructType([
StructField("feature", ArrayType(FloatType()), True),
StructField("label", ArrayType(FloatType()), True)
])
df = spark.createDataFrame(data=data, schema=schema)

estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir)
estimator.fit(df, batch_size=4, epochs=2,
Expand All @@ -267,10 +283,17 @@ def test_dataframe_shard_size_train_eval(self):

def test_partition_num_less_than_workers(self):
sc = init_nncontext()
spark = SparkSession.builder.getOrCreate()
rdd = sc.range(200, numSlices=1)
df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])
data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[float(np.random.randint(0, 2, size=()))])
)
schema = StructType([
StructField("feature", ArrayType(FloatType()), True),
StructField("label", ArrayType(FloatType()), True)
])

df = spark.createDataFrame(data=data, schema=schema)

estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir)
assert df.rdd.getNumPartitions() < estimator.num_workers
Expand Down Expand Up @@ -303,7 +326,7 @@ def test_dataframe_predict(self):
def test_xshards_predict_save_load(self):

sc = init_nncontext()
rdd = sc.range(0, 110).map(lambda x: np.array([x]*50))
rdd = sc.range(0, 110).map(lambda x: np.array([x] * 50))
shards = rdd.mapPartitions(lambda iter: chunks(iter, 5)).map(lambda x: {"x": np.stack(x)})
shards = SparkXShards(shards)

Expand Down Expand Up @@ -352,10 +375,17 @@ def test_multiple_inputs_model(self):
rdd = sc.parallelize(range(100))

from pyspark.sql import SparkSession
spark = SparkSession(sc)
df = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25,
[int(np.random.randint(0, 2, size=()))])
).toDF(["f1", "f2", "label"])
spark = SparkSession.builder.getOrCreate()
data = rdd.map(lambda x: ([float(x)] * 25, [float(x)] * 25,
[float(np.random.randint(0, 2, size=()))])
)
schema = StructType([
StructField("f1", ArrayType(FloatType()), True),
StructField("f2", ArrayType(FloatType()), True),
StructField("label", ArrayType(FloatType()), True)
])

df = spark.createDataFrame(data=data, schema=schema)

estimator = get_estimator(workers_per_node=2,
model_fn=lambda config: MultiInputNet(),
Expand All @@ -373,6 +403,7 @@ def test_multiple_inputs_model(self):

def test_data_parallel_sgd_correctness(self):
sc = init_nncontext()
spark = SparkSession.builder.getOrCreate()
rdd = sc.range(0, 100).repartition(2)

# partition 0: [(0, 0), (0, 0)]
Expand All @@ -390,8 +421,12 @@ def test_data_parallel_sgd_correctness(self):
# partition 1 loss: 0.25
# avg_grad = avg([0, 0, 1, 1]) = 0.5
# weight = 0.5 - 0.5 * avg_grad = 0.25
df = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2]
).toDF(["feature", "label"])
data = rdd.mapPartitionsWithIndex(lambda idx, iter: [([float(idx)], [0.0]) for _ in iter][:2])
schema = StructType([
StructField("feature", ArrayType(FloatType()), True),
StructField("label", ArrayType(FloatType()), True)
])
df = spark.createDataFrame(data=data, schema=schema)

def get_optimizer(model, config):
return torch.optim.SGD(model.parameters(), lr=0.5)
Expand All @@ -418,11 +453,17 @@ def get_optimizer(model, config):
def test_checkpoint_callback(self):
from bigdl.orca.learn.pytorch.callbacks.model_checkpoint import ModelCheckpoint
sc = OrcaContext.get_spark_context()
spark = SparkSession.builder.getOrCreate()
rdd = sc.range(0, 100)
epochs = 2
df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])
data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[float(np.random.randint(0, 2, size=()))])
)
schema = StructType([
StructField("feature", ArrayType(FloatType()), True),
StructField("label", ArrayType(FloatType()), True)
])
df = spark.createDataFrame(data=data, schema=schema)
df = df.cache()

estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir,
Expand All @@ -446,7 +487,7 @@ def test_checkpoint_callback(self):
latest_checkpoint_path = Estimator.latest_checkpoint(self.model_dir)
assert os.path.isfile(latest_checkpoint_path)
estimator.shutdown()
new_estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir,
new_estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir,
log_level=logging.DEBUG)
new_estimator.load_checkpoint(latest_checkpoint_path)
eval_after = new_estimator.evaluate(df, batch_size=4,
Expand All @@ -459,11 +500,18 @@ def test_checkpoint_callback(self):

def test_manual_ckpt(self):
sc = OrcaContext.get_spark_context()
spark = SparkSession.builder.getOrCreate()
rdd = sc.range(0, 100)
epochs = 2
df = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[int(np.random.randint(0, 2, size=()))])
).toDF(["feature", "label"])
data = rdd.map(lambda x: (np.random.randn(50).astype(np.float).tolist(),
[float(np.random.randint(0, 2, size=()))])
)
schema = StructType([
StructField("feature", ArrayType(FloatType()), True),
StructField("label", ArrayType(FloatType()), True)
])

df = spark.createDataFrame(data=data, schema=schema)
df = df.cache()

estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir,
Expand Down Expand Up @@ -494,7 +542,7 @@ def test_manual_ckpt(self):
def test_custom_callback(self):
estimator = get_estimator(workers_per_node=2, model_dir=self.model_dir)
callbacks = [CustomCallback()]
estimator.fit(train_data_loader, epochs=4, batch_size=128,
estimator.fit(train_data_loader, epochs=4, batch_size=128,
validation_data=val_data_loader, callbacks=callbacks)


Expand Down
Loading

0 comments on commit 1c03a40

Please sign in to comment.