From 2ef158b1e015d40810c9d385f226d363f26f2118 Mon Sep 17 00:00:00 2001 From: jenniew Date: Mon, 3 Aug 2020 19:19:06 -0700 Subject: [PATCH 1/4] add tensorboard support --- .../learn/spark/test_estimator_for_spark.py | 73 ++++++++++++ .../spark/test_estimator_keras_for_spark.py | 51 ++++++++ pyzoo/zoo/orca/learn/tf/estimator.py | 110 +++++++++++++++--- pyzoo/zoo/pipeline/estimator/estimator.py | 67 ++++++++++- pyzoo/zoo/tfpark/tf_optimizer.py | 6 +- .../pipeline/api/keras/models/Topology.scala | 15 +++ .../zoo/pipeline/estimator/Estimator.scala | 28 +++++ .../estimator/python/PythonEstimator.scala | 23 ++++ 8 files changed, 354 insertions(+), 19 deletions(-) diff --git a/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py b/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py index fd2069d1848..7225400705b 100644 --- a/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py +++ b/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py @@ -421,6 +421,79 @@ def test_checkpoint_remote(self): load_tf_checkpoint_from_remote(sess, os.path.join(temp, "simple.ckpt"), saver) shutil.rmtree(temp) + def test_estimator_graph_tensorboard(self): + tf.reset_default_graph() + + model = SimpleModel() + + file_path = os.path.join(resource_path, "orca/learn/ncf.csv") + data_shard = zoo.orca.data.pandas.read_csv(file_path) + + + def transform(df): + result = { + "x": (df['user'].to_numpy(), df['item'].to_numpy()), + "y": df['label'].to_numpy() + } + return result + + data_shard = data_shard.transform_shard(transform) + + temp = tempfile.mkdtemp() + # only set model dir, summary generated under model dir + model_dir = os.path.join(temp, "test_model") + + est = Estimator.from_graph( + inputs=[model.user, model.item], + labels=[model.label], + loss=model.loss, + optimizer=tf.train.AdamOptimizer(), + metrics={"loss": model.loss}, + model_dir=model_dir + ) + est.fit(data=data_shard, + batch_size=8, + epochs=5, + validation_data=data_shard) + + train_tp = est.get_train_summary("Throughput") + val_scores = est.get_validation_summary("loss") + assert len(train_tp) > 0 + assert len(val_scores) > 0 + + # set tensorboard dir to different directory + est.set_tensorboard("model", "test") + + est.fit(data=data_shard, + batch_size=8, + epochs=5, + validation_data=data_shard) + + train_tp = est.get_train_summary("Throughput") + val_scores = est.get_validation_summary("loss") + assert len(train_tp) > 0 + assert len(val_scores) > 0 + + # no model dir, no tensorboard dir, no summary saved + est2 = Estimator.from_graph( + inputs=[model.user, model.item], + labels=[model.label], + loss=model.loss, + optimizer=tf.train.AdamOptimizer(), + metrics={"loss": model.loss} + ) + + est2.fit(data=data_shard, + batch_size=8, + epochs=5, + validation_data=data_shard) + + train_tp = est2.get_train_summary("Throughput") + val_scores = est2.get_validation_summary("loss") + assert train_tp is None + assert val_scores is None + + shutil.rmtree(temp) if __name__ == "__main__": import pytest diff --git a/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py b/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py index 731b5af74b1..a3f29a85a35 100644 --- a/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py +++ b/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py @@ -272,6 +272,57 @@ def test_estimator_keras_dataframe_no_fit(self): predictions = prediction_df.collect() assert len(predictions) == 10 + def test_estimator_keras_tensorboard(self): + import zoo.orca.data.pandas + + model = self.create_model() + file_path = os.path.join(self.resource_path, "orca/learn/ncf.csv") + data_shard = zoo.orca.data.pandas.read_csv(file_path) + + def transform(df): + result = { + "x": (df['user'].to_numpy().reshape([-1, 1]), + df['item'].to_numpy().reshape([-1, 1])), + "y": df['label'].to_numpy() + } + return result + + data_shard = data_shard.transform_shard(transform) + + temp = tempfile.mkdtemp() + model_dir = os.path.join(temp, "test_model") + + est = Estimator.from_keras(keras_model=model, model_dir=model_dir) + + assert est.get_train_summary("Loss") is None + assert est.get_validation_summary("Top1Accuracy") is None + + est.fit(data=data_shard, + batch_size=8, + epochs=10, + validation_data=data_shard) + + train_loss = est.get_train_summary("Loss") + assert len(train_loss) > 0 + val_scores = est.get_validation_summary("Top1Accuracy") + assert len(val_scores) > 0 + + # no model dir + est = Estimator.from_keras(keras_model=model) + log_dir = os.path.join(temp, "log") + est.set_tensorboard(log_dir, "test") + + est.fit(data=data_shard, + batch_size=8, + epochs=10, + validation_data=data_shard) + + train_loss = est.get_train_summary("Loss") + val_scores = est.get_validation_summary("Loss") + assert len(train_loss) > 0 + assert len(val_scores) > 0 + shutil.rmtree(temp) + if __name__ == "__main__": import pytest diff --git a/pyzoo/zoo/orca/learn/tf/estimator.py b/pyzoo/zoo/orca/learn/tf/estimator.py index a7f3a9f977e..a8232b56b70 100644 --- a/pyzoo/zoo/orca/learn/tf/estimator.py +++ b/pyzoo/zoo/orca/learn/tf/estimator.py @@ -13,17 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import tensorflow as tf - -from pyspark.sql.dataframe import DataFrame from bigdl.optim.optimizer import MaxEpoch -from zoo.tfpark.utils import evaluate_metrics -from zoo.tfpark import TFOptimizer, TFNet, ZooOptimizer +from zoo.orca.learn.tf.utils import * from zoo.tfpark import KerasModel +from zoo.tfpark import TFOptimizer, TFNet, ZooOptimizer +from zoo.tfpark.tf_optimizer import StatelessMetric +from zoo.tfpark.utils import evaluate_metrics from zoo.util import nest -from zoo.orca.learn.tf.utils import * class Estimator(object): @@ -47,6 +45,76 @@ def load_latest_checkpoint(self, path): raise Exception("Cannot find checkpoint") self.load(ckpt_path, version) + def set_tensorboard(self, log_dir, app_name): + """ + Set summary information during the training process for visualization purposes. + Saved summary can be viewed via TensorBoard. + In order to take effect, it needs to be called before fit. + + Training summary will be saved to 'log_dir/app_name/train' + and validation summary (if any) will be saved to 'log_dir/app_name/validation'. + + # Arguments + :param log_dir: The base directory path to store training and validation logs. + :param app_name: The name of the application. + """ + self.log_dir = log_dir + self.app_name = app_name + + def get_train_summary(self, tag=None): + """ + Get the scalar from model train summary + Return 2-D array like object which could be converted + by nd.array() + # Arguments + tag: The string variable represents the scalar wanted + """ + # exception handle + if self.tf_optimizer: + return self.tf_optimizer.estimator.get_train_summary(tag) + + return None + + def get_validation_summary(self, tag=None): + """ + Get the scalar from model validation summary + Return 2-D array like object which could be converted + by np.array() + + Note: The metric and tag may not be consistent + Please look up following form to pass tag parameter + Left side is your metric during compile + Right side is the tag you should pass + 'Accuracy' | 'Top1Accuracy' + 'BinaryAccuracy' | 'Top1Accuracy' + 'CategoricalAccuracy' | 'Top1Accuracy' + 'SparseCategoricalAccuracy' | 'Top1Accuracy' + 'AUC' | 'AucScore' + 'HitRatio' | 'HitRate@k' (k is Top-k) + 'Loss' | 'Loss' + 'MAE' | 'MAE' + 'NDCG' | 'NDCG' + 'TFValidationMethod' | '${name + " " + valMethod.toString()}' + 'Top5Accuracy' | 'Top5Accuracy' + 'TreeNNAccuracy' | 'TreeNNAccuracy()' + 'MeanAveragePrecision' | 'MAP@k' (k is Top-k) (BigDL) + 'MeanAveragePrecision' | 'PascalMeanAveragePrecision' (Zoo) + 'StatelessMetric' | '${name}' + # Arguments + tag: The string variable represents the scalar wanted + """ + if self.tf_optimizer: + for val_method in self.tf_optimizer.tf_model.val_methods: + if isinstance(val_method, StatelessMetric): + if tag == val_method.name: + return self.tf_optimizer.estimator.get_validation_summary(tag) + else: + if tag == str(val_method.val_method): + return self.tf_optimizer.estimator.\ + get_validation_summary("{} {}".format(val_method.name, tag)) + continue + return None + @staticmethod def from_graph(*, inputs, outputs=None, labels=None, loss=None, optimizer=None, @@ -73,7 +141,6 @@ def from_keras(keras_model, model_dir=None, backend="spark"): class TFOptimizerWrapper(Estimator): - def __init__(self, *, inputs, outputs, labels, loss, optimizer, clip_norm, clip_value, metrics, @@ -120,6 +187,9 @@ def __init__(self, *, inputs, outputs, labels, loss, self.sess = sess self.model_dir = model_dir self.load_checkpoint = False + self.tf_optimizer = None + self.log_dir = None + self.app_name = None def fit(self, data, epochs=1, @@ -158,7 +228,7 @@ def fit(self, data, else: tensor_with_value = None - optimizer = TFOptimizer.from_train_op( + self.tf_optimizer = TFOptimizer.from_train_op( train_op=self.train_op, loss=self.loss, inputs=self.inputs, @@ -171,9 +241,12 @@ def fit(self, data, model_dir=self.model_dir) if self.load_checkpoint: - optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version) + self.tf_optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version) - optimizer.optimize(end_trigger=MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger) + if self.log_dir and self.app_name: + self.tf_optimizer.estimator.set_tensorboad(self.log_dir, self.app_name) + + self.tf_optimizer.optimize(end_trigger=MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger) return self def predict(self, data, batch_size=4, @@ -236,10 +309,12 @@ def evaluate(self, data, batch_size=32, class TFKerasWrapper(Estimator): - def __init__(self, keras_model, model_dir): self.model = KerasModel(keras_model, model_dir) self.load_checkpoint = False + self.tf_optimizer = None + self.log_dir = None + self.app_name = None def fit(self, data, epochs=1, @@ -265,14 +340,17 @@ def fit(self, data, sequential_order=False, shuffle=True ) - optimizer = TFOptimizer.from_keras(self.model.model, dataset, - model_dir=self.model.model_dir, - session_config=session_config) + self.tf_optimizer = TFOptimizer.from_keras(self.model.model, dataset, + model_dir=self.model.model_dir, + session_config=session_config) if self.load_checkpoint: - optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version) + self.tf_optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version) + + if self.log_dir and self.app_name: + self.tf_optimizer.estimator.set_tensorboad(self.log_dir, self.app_name) - optimizer.optimize(MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger) + self.tf_optimizer.optimize(MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger) return self diff --git a/pyzoo/zoo/pipeline/estimator/estimator.py b/pyzoo/zoo/pipeline/estimator/estimator.py index f24051d1e8d..26153014c4b 100644 --- a/pyzoo/zoo/pipeline/estimator/estimator.py +++ b/pyzoo/zoo/pipeline/estimator/estimator.py @@ -15,6 +15,7 @@ # from bigdl.util.common import JavaValue + from zoo.common.utils import callZooFunc @@ -58,6 +59,71 @@ def set_l2_norm_gradient_clipping(self, clip_norm): """ callZooFunc(self.bigdl_type, "setGradientClippingByL2Norm", self.value, clip_norm) + def set_tensorboad(self, log_dir, app_name): + """ + Set summary information during the training process for visualization purposes. + Saved summary can be viewed via TensorBoard. + In order to take effect, it needs to be called before fit. + + Training summary will be saved to 'log_dir/app_name/train' + and validation summary (if any) will be saved to 'log_dir/app_name/validation'. + + # Arguments + :param log_dir: The base directory path to store training and validation logs. + :param app_name: The name of the application. + """ + callZooFunc(self.bigdl_type, "estimatorSetTensorBoard", + self.value, + log_dir, + app_name) + + def get_train_summary(self, tag=None): + """ + Get the scalar from model train summary + Return 2-D array like object which could be converted + by nd.array() + # Arguments + tag: The string variable represents the scalar wanted + """ + # exception handle + if tag != "Loss" and tag != "LearningRate" and tag != "Throughput": + raise TypeError('Only "Loss", "LearningRate", "Throughput"' + + 'are supported in train summary') + + return callZooFunc("float", "estimatorGetScalarFromSummary", + self.value, tag, "Train") + + def get_validation_summary(self, tag=None): + """ + Get the scalar from model validation summary + Return 2-D array like object which could be converted + by np.array() + + Note: The metric and tag may not be consistent + Please look up following form to pass tag parameter + Left side is your metric during compile + Right side is the tag you should pass + 'Accuracy' | 'Top1Accuracy' + 'BinaryAccuracy' | 'Top1Accuracy' + 'CategoricalAccuracy' | 'Top1Accuracy' + 'SparseCategoricalAccuracy' | 'Top1Accuracy' + 'AUC' | 'AucScore' + 'HitRatio' | 'HitRate@k' (k is Top-k) + 'Loss' | 'Loss' + 'MAE' | 'MAE' + 'NDCG' | 'NDCG' + 'TFValidationMethod' | '${name + " " + valMethod.toString()}' + 'Top5Accuracy' | 'Top5Accuracy' + 'TreeNNAccuracy' | 'TreeNNAccuracy()' + 'MeanAveragePrecision' | 'MAP@k' (k is Top-k) (BigDL) + 'MeanAveragePrecision' | 'PascalMeanAveragePrecision' (Zoo) + 'StatelessMetric' | '${name}' + # Arguments + tag: The string variable represents the scalar wanted + """ + return callZooFunc("float", "estimatorGetScalarFromSummary", + self.value, tag, "Validation") + def train(self, train_set, criterion, end_trigger=None, checkpoint_trigger=None, validation_set=None, validation_method=None, batch_size=32): """ @@ -82,7 +148,6 @@ def train(self, train_set, criterion, end_trigger=None, checkpoint_trigger=None, def train_minibatch(self, train_set, criterion, end_trigger=None, checkpoint_trigger=None, validation_set=None, validation_method=None): - """ Train model with provided trainSet and criterion. The training will end until the endTrigger is triggered. diff --git a/pyzoo/zoo/tfpark/tf_optimizer.py b/pyzoo/zoo/tfpark/tf_optimizer.py index 83100cb135b..78efbcc96df 100644 --- a/pyzoo/zoo/tfpark/tf_optimizer.py +++ b/pyzoo/zoo/tfpark/tf_optimizer.py @@ -44,6 +44,8 @@ def __init__(self): class TFValidationMethod(JavaValue): def __init__(self, val_method, name, output_indices, label_indices): + self.name = name + self.val_method = val_method JavaValue.__init__(self, None, "float", val_method, name, output_indices, label_indices) @@ -570,7 +572,7 @@ def from_loss(cls, loss, optim_method, session=None, inputs=None, dataset=None, metrics = {} for i, method in enumerate(val_methods): - metrics['bigdl_metirc_' + str(i)] = BigDLMetric(method, val_outputs, val_labels) + metrics['bigdl_metric_' + str(i)] = BigDLMetric(method, val_outputs, val_labels) return TFOptimizer._from_grads(loss, sess, inputs, labels, grads, variables, dataset, optim_method, clip_norm, clip_value, @@ -691,7 +693,7 @@ def from_keras(cls, keras_model, dataset, val_methods = to_list(bigdl_val_methods) metrics = {} for i, method in enumerate(val_methods): - metrics['bigdl_metirc_' + str(i)] = BigDLMetric(method, val_outputs, val_labels) + metrics['bigdl_metric_' + str(i)] = BigDLMetric(method, val_outputs, val_labels) return cls.from_train_op(train_op, loss, inputs=model_inputs, labels=model_targets, metrics=metrics, updates=updates, sess=sess, dataset=dataset, diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/models/Topology.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/models/Topology.scala index bd49a8a41c9..9461410fee9 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/models/Topology.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/models/Topology.scala @@ -1349,6 +1349,21 @@ private[zoo] class InternalDistriOptimizer[T: ClassTag] ( this } + def getTrainSummary(tag: String): Array[(Long, Float, Double)] = { + if (this.trainSummary isDefined) { + this.trainSummary.get.readScalar(tag) + } else { + null + } + } + + def getValidationSummary(tag: String): Array[(Long, Float, Double)] = { + if (this.validationSummary isDefined) { + this.validationSummary.get.readScalar(tag) + } else { + null + } + } override def train( trainSet: FeatureSet[MiniBatch[T]], diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/Estimator.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/Estimator.scala index 3a0beb8f88f..e40e8ed705d 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/Estimator.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/Estimator.scala @@ -19,6 +19,7 @@ import com.intel.analytics.bigdl.{Criterion, Module} import com.intel.analytics.bigdl.dataset.MiniBatch import com.intel.analytics.bigdl.optim._ import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric +import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} import com.intel.analytics.zoo.feature.{DiskFeatureSet, DistributedFeatureSet, FeatureSet} import com.intel.analytics.zoo.pipeline.api.keras.models.InternalDistriOptimizer import org.apache.log4j.Logger @@ -72,6 +73,10 @@ class Estimator[T: ClassTag] private[zoo]( protected val gradientClipping: ArrayBuffer[GradientClipping] = new ArrayBuffer[GradientClipping]() + protected var logDir: String = null + + protected var appName: String = null + /** * Clear gradient clipping parameters. In this case, gradient clipping will not be applied. * In order to take effect, it needs to be called before fit. @@ -100,6 +105,22 @@ class Estimator[T: ClassTag] private[zoo]( def setGradientClippingByL2Norm(clipNorm: Double): Unit = { this.gradientClipping.append(L2NormClipping(clipNorm)) } + + def setTensorBoard(logDir: String, appName: String): Unit = { + this.logDir = logDir + this.appName = appName + } + + def getTrainSummary(tag: String): Array[(Long, Float, Double)] = { + this.internalEstimator.asInstanceOf[InternalDistriOptimizer[T]].getTrainSummary(tag) + } + + def getValidationSummary(tag: String): Array[(Long, Float, Double)] = { + this.internalEstimator.asInstanceOf[InternalDistriOptimizer[T]].getValidationSummary(tag) + } + + + /** * Train model with provided trainSet and criterion. * The training will end until the endTrigger is triggered. @@ -128,6 +149,13 @@ class Estimator[T: ClassTag] private[zoo]( .setCheckpointDir(modelDir) .setOptimMethods(optimMethods) .setNumOfSlice(d.numOfSlice) + if ((logDir != null) && (appName != null)) { + val trainSummary = TrainSummary(logDir, appName) + val valSummary = ValidationSummary(logDir, appName) + internalEstimator.asInstanceOf[Optimizer[_, _]] + .setTrainSummary(trainSummary) + .setValidationSummary(valSummary) + } } case _ => throw new IllegalArgumentException("Unsupported FeatureSet type.") } diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/python/PythonEstimator.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/python/PythonEstimator.scala index 8b90c33c766..8a3bcc4db85 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/python/PythonEstimator.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/estimator/python/PythonEstimator.scala @@ -146,4 +146,27 @@ class PythonEstimator[T: ClassTag](implicit ev: TensorNumeric[T]) extends Python def setGradientClippingByL2Norm(estimator: Estimator[T], clipNorm: Double): Unit = { estimator.setGradientClippingByL2Norm(clipNorm) } + + def estimatorSetTensorBoard( + estimator: Estimator[T], + logDir: String, + appName: String): Unit = { + estimator.setTensorBoard(logDir, appName) + } + + def estimatorGetScalarFromSummary(estimator: Estimator[T], tag: String, + target: String): JList[JList[Any]] = { + require(target == "Train" || target == "Validation", + "Invalid target, must be Train or Validation.") + val scalarArray = if (target == "Train") estimator.getTrainSummary(tag) + else estimator.getValidationSummary(tag) + + if (scalarArray != null) { + scalarArray.toList.map { tuple => + List(tuple._1, tuple._2, tuple._3).asJava.asInstanceOf[JList[Any]] + }.asJava + } else { + null + } + } } From a071dc9d0dc12f92e8f7145d40976d87681e29e9 Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 5 Aug 2020 17:41:43 -0700 Subject: [PATCH 2/4] fix style --- .../test/zoo/orca/learn/spark/test_estimator_for_spark.py | 7 +++---- .../zoo/orca/learn/spark/test_estimator_keras_for_spark.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py b/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py index 7225400705b..387e1a22756 100644 --- a/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py +++ b/pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py @@ -429,7 +429,6 @@ def test_estimator_graph_tensorboard(self): file_path = os.path.join(resource_path, "orca/learn/ncf.csv") data_shard = zoo.orca.data.pandas.read_csv(file_path) - def transform(df): result = { "x": (df['user'].to_numpy(), df['item'].to_numpy()), @@ -484,9 +483,9 @@ def transform(df): ) est2.fit(data=data_shard, - batch_size=8, - epochs=5, - validation_data=data_shard) + batch_size=8, + epochs=5, + validation_data=data_shard) train_tp = est2.get_train_summary("Throughput") val_scores = est2.get_validation_summary("loss") diff --git a/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py b/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py index a3f29a85a35..ad64afa8e95 100644 --- a/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py +++ b/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py @@ -311,7 +311,7 @@ def transform(df): est = Estimator.from_keras(keras_model=model) log_dir = os.path.join(temp, "log") est.set_tensorboard(log_dir, "test") - + est.fit(data=data_shard, batch_size=8, epochs=10, From fb3c68ed8924e5f09dd2ece2987cb71af379a292 Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 5 Aug 2020 20:04:05 -0700 Subject: [PATCH 3/4] fix style2 --- pyzoo/zoo/orca/learn/tf/estimator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyzoo/zoo/orca/learn/tf/estimator.py b/pyzoo/zoo/orca/learn/tf/estimator.py index a8232b56b70..83744a0c111 100644 --- a/pyzoo/zoo/orca/learn/tf/estimator.py +++ b/pyzoo/zoo/orca/learn/tf/estimator.py @@ -246,7 +246,8 @@ def fit(self, data, if self.log_dir and self.app_name: self.tf_optimizer.estimator.set_tensorboad(self.log_dir, self.app_name) - self.tf_optimizer.optimize(end_trigger=MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger) + self.tf_optimizer.optimize(end_trigger=MaxEpoch(epochs), + checkpoint_trigger=checkpoint_trigger) return self def predict(self, data, batch_size=4, From 0ced4ba9cee72ba19bfe129cc5ec056ce4793ce9 Mon Sep 17 00:00:00 2001 From: jenniew Date: Thu, 6 Aug 2020 13:14:11 -0700 Subject: [PATCH 4/4] fix test --- .../zoo/orca/learn/spark/test_estimator_keras_for_spark.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py b/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py index ad64afa8e95..39d94cc6c3d 100644 --- a/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py +++ b/pyzoo/test/zoo/orca/learn/spark/test_estimator_keras_for_spark.py @@ -181,7 +181,7 @@ def test_estimator_keras_xshards_checkpoint(self): import zoo.orca.data.pandas import tensorflow.keras.backend as K - K.clear_session() + # K.clear_session() tf.reset_default_graph() model = self.create_model() file_path = os.path.join(self.resource_path, "orca/learn/ncf.csv") @@ -210,7 +210,7 @@ def transform(df): eval_result = est.evaluate(data_shard) print(eval_result) - K.get_session().close() + # K.get_session().close() tf.reset_default_graph() model = self.create_model() @@ -307,6 +307,9 @@ def transform(df): val_scores = est.get_validation_summary("Top1Accuracy") assert len(val_scores) > 0 + # import tensorflow.keras.backend as K + # K.get_session().close() + # no model dir est = Estimator.from_keras(keras_model=model) log_dir = os.path.join(temp, "log")