From 9d5622cf0ec9803d1231e48438b8eecfa03b030d Mon Sep 17 00:00:00 2001 From: sgwhat Date: Thu, 25 Aug 2022 17:01:18 +0800 Subject: [PATCH 01/10] refactor model_dir as an option --- .../src/bigdl/orca/learn/tf2/pyspark_estimator.py | 4 +++- .../orca/src/bigdl/orca/learn/tf2/spark_runner.py | 14 ++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py index 06885156cec..c2341e80b7d 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py @@ -225,8 +225,10 @@ def transform_func(iter, init_param, param): self.model_weights = state['weights'] finally: shutil.rmtree(temp_dir) + else: + self.model_weights = res[1] - return res[0] + return res[0][0] def evaluate(self, data, batch_size=32, num_steps=None, verbose=1, sample_weight=None, callbacks=None, data_config=None, diff --git a/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py b/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py index 2277e73cc0a..f5ea9821ec2 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py +++ b/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py @@ -271,9 +271,10 @@ def distributed_train_func(self, data_creator, config, epochs=1, verbose=1, runs a training epoch and updates the model parameters """ with self.strategy.scope(): - if exists(self._model_saved_path): - # for continous training - model = load_model(self._model_saved_path) + if self.model_dir is not None: + if exists(self._model_saved_path): + # for continous training + model = load_model(self._model_saved_path) else: model = self.model_creator(self.config) if self.model_weights: @@ -336,7 +337,6 @@ def step(self, data_creator, epochs=1, batch_size=32, verbose=1, validation_steps=validation_steps, validation_freq=validation_freq ) - weights = model.get_weights() if history is None: stats = {} else: @@ -345,14 +345,16 @@ def step(self, data_creator, epochs=1, batch_size=32, verbose=1, if self.model_dir is not None: save_model(model, self._model_saved_path, save_format="h5") model_state = { - "weights": weights, + "weights": model.get_weights(), "optimizer_weights": model.optimizer.get_weights() } save_pkl(model_state, os.path.join(self.model_dir, "state.pkl")) + else: + model_weights = model.get_weights() if self.need_to_log_to_driver: LogMonitor.stop_log_monitor(self.log_path, self.logger_thread, self.thread_stop) - return [stats] + return [stats], model_weights else: temp_dir = tempfile.mkdtemp() try: From 7d7acc4d5aa192698977bbaffb9cff9f76b71a6b Mon Sep 17 00:00:00 2001 From: sgwhat Date: Thu, 25 Aug 2022 17:10:57 +0800 Subject: [PATCH 02/10] modify ut to test non model_dir --- .../bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py index 557ac735358..73bf7e6fcca 100644 --- a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py +++ b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py @@ -80,8 +80,7 @@ def test_dataframe(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"], @@ -516,8 +515,7 @@ def test_save_load_model_h5(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"], @@ -566,8 +564,7 @@ def test_save_load_model_savemodel(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"], From 243fb4bbb757dd91587d1e1b4450398115ceba27 Mon Sep 17 00:00:00 2001 From: sgwhat Date: Thu, 25 Aug 2022 17:32:46 +0800 Subject: [PATCH 03/10] update coding format --- python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py index c2341e80b7d..3481a3535bd 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py @@ -213,6 +213,7 @@ def transform_func(iter, init_param, param): res = self.workerRDD.barrier().mapPartitions( lambda iter: transform_func(iter, init_params, params)).collect() + result = res[0] if self.model_dir: try: @@ -228,7 +229,7 @@ def transform_func(iter, init_param, param): else: self.model_weights = res[1] - return res[0][0] + return result[0] def evaluate(self, data, batch_size=32, num_steps=None, verbose=1, sample_weight=None, callbacks=None, data_config=None, From 6db9a40352f884747418a37909b19f0786c4f264 Mon Sep 17 00:00:00 2001 From: sgwhat Date: Thu, 25 Aug 2022 20:27:03 +0800 Subject: [PATCH 04/10] remove error --- python/orca/src/bigdl/orca/learn/tf2/estimator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/estimator.py b/python/orca/src/bigdl/orca/learn/tf2/estimator.py index 9d6ad451cbe..226aa9505f1 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf2/estimator.py @@ -72,9 +72,6 @@ def from_keras(*, if cpu_binding: invalidInputError(False, "cpu_binding should not be True when using spark backend") - if not model_dir: - invalidInputError(False, - "Please specify model directory when using spark backend") from bigdl.orca.learn.tf2.pyspark_estimator import SparkTFEstimator return SparkTFEstimator(model_creator=model_creator, config=config, verbose=verbose, From 40be45cc79c68c27ca66f7703583874286fce55a Mon Sep 17 00:00:00 2001 From: sgwhat Date: Fri, 26 Aug 2022 09:46:48 +0800 Subject: [PATCH 05/10] fix test error when setting model_dir --- .../src/bigdl/orca/learn/tf2/pyspark_estimator.py | 3 ++- python/orca/src/bigdl/orca/learn/tf2/spark_runner.py | 11 +++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py index 3481a3535bd..26a21baac33 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py @@ -213,9 +213,9 @@ def transform_func(iter, init_param, param): res = self.workerRDD.barrier().mapPartitions( lambda iter: transform_func(iter, init_params, params)).collect() - result = res[0] if self.model_dir: + result = res try: temp_dir = tempfile.mkdtemp() get_remote_file_to_local(os.path.join(self.model_dir, "state.pkl"), @@ -227,6 +227,7 @@ def transform_func(iter, init_param, param): finally: shutil.rmtree(temp_dir) else: + result = res[0] self.model_weights = res[1] return result[0] diff --git a/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py b/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py index f5ea9821ec2..a46a1686757 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py +++ b/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py @@ -271,10 +271,9 @@ def distributed_train_func(self, data_creator, config, epochs=1, verbose=1, runs a training epoch and updates the model parameters """ with self.strategy.scope(): - if self.model_dir is not None: - if exists(self._model_saved_path): - # for continous training - model = load_model(self._model_saved_path) + if self.model_dir is not None and exists(self._model_saved_path): + # for continous training + model = load_model(self._model_saved_path) else: model = self.model_creator(self.config) if self.model_weights: @@ -350,11 +349,11 @@ def step(self, data_creator, epochs=1, batch_size=32, verbose=1, } save_pkl(model_state, os.path.join(self.model_dir, "state.pkl")) else: - model_weights = model.get_weights() + weights = model.get_weights() if self.need_to_log_to_driver: LogMonitor.stop_log_monitor(self.log_path, self.logger_thread, self.thread_stop) - return [stats], model_weights + return [stats], weights else: temp_dir = tempfile.mkdtemp() try: From c7149e210d86e456251bd208cd255a489975ed8e Mon Sep 17 00:00:00 2001 From: sgwhat Date: Fri, 26 Aug 2022 09:59:24 +0800 Subject: [PATCH 06/10] update return results --- python/orca/src/bigdl/orca/learn/tf2/spark_runner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py b/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py index a46a1686757..040529a195b 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py +++ b/python/orca/src/bigdl/orca/learn/tf2/spark_runner.py @@ -353,7 +353,10 @@ def step(self, data_creator, epochs=1, batch_size=32, verbose=1, if self.need_to_log_to_driver: LogMonitor.stop_log_monitor(self.log_path, self.logger_thread, self.thread_stop) - return [stats], weights + if self.model_dir is not None: + return [stats] + else: + return [stats], weights else: temp_dir = tempfile.mkdtemp() try: From aab889b92322418951101ee830cb0664381a531e Mon Sep 17 00:00:00 2001 From: sgwhat Date: Fri, 26 Aug 2022 10:13:46 +0800 Subject: [PATCH 07/10] update to support --- python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py index 26a21baac33..7ed59210679 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py @@ -493,7 +493,7 @@ def save(self, saving to SavedModel. """ # get current model - if exists(self._model_saved_path): + if self.model_dir is not None and exists(self._model_saved_path): model = load_model(self._model_saved_path) else: model = self.get_model() From 7a72b0b7bc287da7770ed6f6bf1b52746be5285b Mon Sep 17 00:00:00 2001 From: sgwhat Date: Fri, 26 Aug 2022 11:03:40 +0800 Subject: [PATCH 08/10] add support in load api --- python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py index 7ed59210679..2503ca907e2 100644 --- a/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf2/pyspark_estimator.py @@ -214,7 +214,7 @@ def transform_func(iter, init_param, param): res = self.workerRDD.barrier().mapPartitions( lambda iter: transform_func(iter, init_params, params)).collect() - if self.model_dir: + if self.model_dir is not None: result = res try: temp_dir = tempfile.mkdtemp() @@ -517,7 +517,8 @@ def load(self, filepath, custom_objects=None, compile=True): model = load_model(filepath, custom_objects=custom_objects, compile=compile) self.model_weights = model.get_weights() # update remote model - save_model(model, self._model_saved_path, save_format="h5", filemode=0o666) + if self.model_dir is not None: + save_model(model, self._model_saved_path, save_format="h5", filemode=0o666) def get_model(self): """ From 206b7861e43f031ddcd15867a6228dc5ece21afa Mon Sep 17 00:00:00 2001 From: sgwhat Date: Fri, 26 Aug 2022 14:17:31 +0800 Subject: [PATCH 09/10] update ut --- .../test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py index 73bf7e6fcca..37eca7f4297 100644 --- a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py +++ b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py @@ -259,8 +259,7 @@ def test_checkpoint_weights_h5(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") callbacks = [ tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(temp_dir, "ckpt_weights.h5"), @@ -564,7 +563,8 @@ def test_save_load_model_savemodel(self): verbose=True, config=config, workers_per_node=2, - backend="spark") + backend="spark", + model_dir=temp_dir) res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"], From 4899cd66a397bba5c820a412d4027f80752b638e Mon Sep 17 00:00:00 2001 From: sgwhat Date: Tue, 30 Aug 2022 17:13:53 +0800 Subject: [PATCH 10/10] move model_dir from ut --- .../learn/ray/tf/test_tf_spark_estimator.py | 108 ++++++++---------- 1 file changed, 45 insertions(+), 63 deletions(-) diff --git a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py index 37eca7f4297..3c37ebb4e10 100644 --- a/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py +++ b/python/orca/test/bigdl/orca/learn/ray/tf/test_tf_spark_estimator.py @@ -125,8 +125,7 @@ def test_dataframe_with_empty_partition(self): verbose=True, config=config, workers_per_node=3, - backend="spark", - model_dir=temp_dir) + backend="spark") res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"], @@ -177,8 +176,7 @@ def model_creator(config): verbose=True, config=config, workers_per_node=1, - backend="spark", - model_dir=temp_dir) + backend="spark") res = trainer.fit(data=xshards, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["user", "item"], label_cols=["label"]) @@ -213,8 +211,7 @@ def test_checkpoint_weights(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") callbacks = [ tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(temp_dir, "ckpt_{epoch}"), @@ -301,35 +298,29 @@ def test_dataframe_shard_size(self): "lr": 0.2 } - try: - temp_dir = tempfile.mkdtemp() - - trainer = Estimator.from_keras( - model_creator=model_creator, - verbose=True, - config=config, - workers_per_node=2, - backend="spark", - model_dir=temp_dir) - - res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, - validation_data=val_df, - validation_steps=2, - feature_cols=["feature"], - label_cols=["label"]) - - res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, - feature_cols=["feature"], - label_cols=["label"]) - - res = trainer.evaluate(val_df, batch_size=4, num_steps=25, feature_cols=["feature"], - label_cols=["label"]) - print("validation result: ", res) - - res = trainer.predict(df, feature_cols=["feature"]).collect() - print("predict result: ", res) - finally: - shutil.rmtree(temp_dir) + trainer = Estimator.from_keras( + model_creator=model_creator, + verbose=True, + config=config, + workers_per_node=2, + backend="spark") + + res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, + validation_data=val_df, + validation_steps=2, + feature_cols=["feature"], + label_cols=["label"]) + + res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, + feature_cols=["feature"], + label_cols=["label"]) + + res = trainer.evaluate(val_df, batch_size=4, num_steps=25, feature_cols=["feature"], + label_cols=["label"]) + print("validation result: ", res) + + res = trainer.predict(df, feature_cols=["feature"]).collect() + print("predict result: ", res) OrcaContext._shard_size = None def test_dataframe_different_train_val(self): @@ -349,31 +340,25 @@ def test_dataframe_different_train_val(self): "lr": 0.2 } - try: - temp_dir = tempfile.mkdtemp() - - trainer = Estimator.from_keras( - model_creator=model_creator, - verbose=True, - config=config, - workers_per_node=2, - backend="spark", - model_dir=temp_dir) + trainer = Estimator.from_keras( + model_creator=model_creator, + verbose=True, + config=config, + workers_per_node=2, + backend="spark") - res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, - validation_data=val_df, - validation_steps=2, - feature_cols=["feature"], - label_cols=["label"]) + res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, + validation_data=val_df, + validation_steps=2, + feature_cols=["feature"], + label_cols=["label"]) - res = trainer.evaluate(val_df, batch_size=4, num_steps=25, feature_cols=["feature"], - label_cols=["label"]) - print("validation result: ", res) + res = trainer.evaluate(val_df, batch_size=4, num_steps=25, feature_cols=["feature"], + label_cols=["label"]) + print("validation result: ", res) - res = trainer.predict(df, feature_cols=["feature"]).collect() - print("predict result: ", res) - finally: - shutil.rmtree(temp_dir) + res = trainer.predict(df, feature_cols=["feature"]).collect() + print("predict result: ", res) def test_tensorboard(self): sc = OrcaContext.get_spark_context() @@ -396,8 +381,7 @@ def test_tensorboard(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=os.path.join(temp_dir, "train_log"), @@ -457,8 +441,7 @@ def test_checkpoint_model(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") callbacks = [ tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(temp_dir, "ckpt_{epoch}"), @@ -563,8 +546,7 @@ def test_save_load_model_savemodel(self): verbose=True, config=config, workers_per_node=2, - backend="spark", - model_dir=temp_dir) + backend="spark") res = trainer.fit(df, epochs=5, batch_size=4, steps_per_epoch=25, feature_cols=["feature"],