Skip to content

Commit

Permalink
Add tensorboard support (intel-analytics#2667)
Browse files Browse the repository at this point in the history
* add tensorboard support

* fix style

* fix style2

* fix test
  • Loading branch information
jenniew authored and Wang, Yang committed Sep 26, 2021
1 parent 10095b2 commit f80f80c
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 21 deletions.
112 changes: 95 additions & 17 deletions python/orca/src/bigdl/orca/learn/tf/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import tensorflow as tf

from pyspark.sql.dataframe import DataFrame

from bigdl.optim.optimizer import MaxEpoch

from zoo.tfpark.utils import evaluate_metrics
from zoo.tfpark import TFOptimizer, TFNet, ZooOptimizer
from zoo.orca.learn.tf.utils import *
from zoo.tfpark import KerasModel
from zoo.tfpark import TFOptimizer, TFNet, ZooOptimizer
from zoo.tfpark.tf_optimizer import StatelessMetric
from zoo.tfpark.utils import evaluate_metrics
from zoo.util import nest
from zoo.orca.learn.tf.utils import *


class Estimator(object):
Expand All @@ -47,6 +45,75 @@ def load_latest_checkpoint(self, path):
raise Exception("Cannot find checkpoint")
self.load(ckpt_path, version)

def set_tensorboard(self, log_dir, app_name):
"""
Set summary information during the training process for visualization purposes.
Saved summary can be viewed via TensorBoard.
In order to take effect, it needs to be called before fit.
Training summary will be saved to 'log_dir/app_name/train'
and validation summary (if any) will be saved to 'log_dir/app_name/validation'.
# Arguments
:param log_dir: The base directory path to store training and validation logs.
:param app_name: The name of the application.
"""
self.log_dir = log_dir
self.app_name = app_name

def get_train_summary(self, tag=None):
"""
Get the scalar from model train summary
Return 2-D array like object which could be converted
by nd.array()
# Arguments
tag: The string variable represents the scalar wanted
"""
if self.tf_optimizer:
return self.tf_optimizer.estimator.get_train_summary(tag)

return None

def get_validation_summary(self, tag=None):
"""
Get the scalar from model validation summary
Return 2-D array like object which could be converted
by np.array()
Note: The metric and tag may not be consistent
Please look up following form to pass tag parameter
Left side is your metric during compile
Right side is the tag you should pass
'Accuracy' | 'Top1Accuracy'
'BinaryAccuracy' | 'Top1Accuracy'
'CategoricalAccuracy' | 'Top1Accuracy'
'SparseCategoricalAccuracy' | 'Top1Accuracy'
'AUC' | 'AucScore'
'HitRatio' | 'HitRate@k' (k is Top-k)
'Loss' | 'Loss'
'MAE' | 'MAE'
'NDCG' | 'NDCG'
'TFValidationMethod' | '${name + " " + valMethod.toString()}'
'Top5Accuracy' | 'Top5Accuracy'
'TreeNNAccuracy' | 'TreeNNAccuracy()'
'MeanAveragePrecision' | 'MAP@k' (k is Top-k) (BigDL)
'MeanAveragePrecision' | 'PascalMeanAveragePrecision' (Zoo)
'StatelessMetric' | '${name}'
# Arguments
tag: The string variable represents the scalar wanted
"""
if self.tf_optimizer:
for val_method in self.tf_optimizer.tf_model.val_methods:
if isinstance(val_method, StatelessMetric):
if tag == val_method.name:
return self.tf_optimizer.estimator.get_validation_summary(tag)
else:
if tag == str(val_method.val_method):
return self.tf_optimizer.estimator.\
get_validation_summary("{} {}".format(val_method.name, tag))
continue
return None

@staticmethod
def from_graph(*, inputs, outputs=None,
labels=None, loss=None, optimizer=None,
Expand All @@ -73,7 +140,6 @@ def from_keras(keras_model, metrics=None, model_dir=None, backend="spark"):


class TFOptimizerWrapper(Estimator):

def __init__(self, *, inputs, outputs, labels, loss,
optimizer, clip_norm, clip_value,
metrics,
Expand Down Expand Up @@ -120,6 +186,9 @@ def __init__(self, *, inputs, outputs, labels, loss,
self.sess = sess
self.model_dir = model_dir
self.load_checkpoint = False
self.tf_optimizer = None
self.log_dir = None
self.app_name = None

def fit(self, data,
epochs=1,
Expand Down Expand Up @@ -158,7 +227,7 @@ def fit(self, data,
else:
tensor_with_value = None

optimizer = TFOptimizer.from_train_op(
self.tf_optimizer = TFOptimizer.from_train_op(
train_op=self.train_op,
loss=self.loss,
inputs=self.inputs,
Expand All @@ -171,9 +240,13 @@ def fit(self, data,
model_dir=self.model_dir)

if self.load_checkpoint:
optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version)
self.tf_optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version)

optimizer.optimize(end_trigger=MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger)
if self.log_dir and self.app_name:
self.tf_optimizer.estimator.set_tensorboad(self.log_dir, self.app_name)

self.tf_optimizer.optimize(end_trigger=MaxEpoch(epochs),
checkpoint_trigger=checkpoint_trigger)
return self

def predict(self, data, batch_size=4,
Expand Down Expand Up @@ -238,11 +311,13 @@ def evaluate(self, data, batch_size=32,


class TFKerasWrapper(Estimator):

def __init__(self, keras_model, metrics, model_dir):
self.model = KerasModel(keras_model, model_dir)
self.load_checkpoint = False
self.metrics = metrics
self.tf_optimizer = None
self.log_dir = None
self.app_name = None

def fit(self, data,
epochs=1,
Expand All @@ -268,15 +343,18 @@ def fit(self, data,
sequential_order=False, shuffle=True
)

optimizer = TFOptimizer.from_keras(self.model.model, dataset,
model_dir=self.model.model_dir,
session_config=session_config,
metrics=self.metrics)
self.tf_optimizer = TFOptimizer.from_keras(self.model.model, dataset,
model_dir=self.model.model_dir,
session_config=session_config,
metrics=self.metrics)

if self.load_checkpoint:
optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version)
self.tf_optimizer.load_checkpoint(self.checkpoint_path, self.checkpoint_version)

if self.log_dir and self.app_name:
self.tf_optimizer.estimator.set_tensorboad(self.log_dir, self.app_name)

optimizer.optimize(MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger)
self.tf_optimizer.optimize(MaxEpoch(epochs), checkpoint_trigger=checkpoint_trigger)

return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,78 @@ def test_checkpoint_remote(self):
load_tf_checkpoint_from_remote(sess, os.path.join(temp, "simple.ckpt"), saver)
shutil.rmtree(temp)

def test_estimator_graph_tensorboard(self):
tf.reset_default_graph()

model = SimpleModel()

file_path = os.path.join(resource_path, "orca/learn/ncf.csv")
data_shard = zoo.orca.data.pandas.read_csv(file_path)

def transform(df):
result = {
"x": (df['user'].to_numpy(), df['item'].to_numpy()),
"y": df['label'].to_numpy()
}
return result

data_shard = data_shard.transform_shard(transform)

temp = tempfile.mkdtemp()
# only set model dir, summary generated under model dir
model_dir = os.path.join(temp, "test_model")

est = Estimator.from_graph(
inputs=[model.user, model.item],
labels=[model.label],
loss=model.loss,
optimizer=tf.train.AdamOptimizer(),
metrics={"loss": model.loss},
model_dir=model_dir
)
est.fit(data=data_shard,
batch_size=8,
epochs=5,
validation_data=data_shard)

train_tp = est.get_train_summary("Throughput")
val_scores = est.get_validation_summary("loss")
assert len(train_tp) > 0
assert len(val_scores) > 0

# set tensorboard dir to different directory
est.set_tensorboard("model", "test")

est.fit(data=data_shard,
batch_size=8,
epochs=5,
validation_data=data_shard)

train_tp = est.get_train_summary("Throughput")
val_scores = est.get_validation_summary("loss")
assert len(train_tp) > 0
assert len(val_scores) > 0

# no model dir, no tensorboard dir, no summary saved
est2 = Estimator.from_graph(
inputs=[model.user, model.item],
labels=[model.label],
loss=model.loss,
optimizer=tf.train.AdamOptimizer(),
metrics={"loss": model.loss}
)

est2.fit(data=data_shard,
batch_size=8,
epochs=5,
validation_data=data_shard)

train_tp = est2.get_train_summary("Throughput")
val_scores = est2.get_validation_summary("loss")
assert train_tp is None
assert val_scores is None

shutil.rmtree(temp)

if __name__ == "__main__":
import pytest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ def test_estimator_keras_xshards_checkpoint(self):

tf.reset_default_graph()

import tensorflow.keras.backend as K
K.clear_session()
tf.reset_default_graph()
model = self.create_model()
file_path = os.path.join(self.resource_path, "orca/learn/ncf.csv")
data_shard = zoo.orca.data.pandas.read_csv(file_path)
Expand Down Expand Up @@ -219,8 +216,8 @@ def transform(df):
eval_result = est.evaluate(data_shard)
print(eval_result)

K.get_session().close()
tf.reset_default_graph()

model = self.create_model()

est = Estimator.from_keras(keras_model=model, model_dir=model_dir)
Expand Down Expand Up @@ -287,6 +284,64 @@ def test_estimator_keras_dataframe_no_fit(self):
predictions = prediction_df.collect()
assert len(predictions) == 10

def test_estimator_keras_tensorboard(self):
import zoo.orca.data.pandas

tf.reset_default_graph()

model = self.create_model()
file_path = os.path.join(self.resource_path, "orca/learn/ncf.csv")
data_shard = zoo.orca.data.pandas.read_csv(file_path)

def transform(df):
result = {
"x": (df['user'].to_numpy().reshape([-1, 1]),
df['item'].to_numpy().reshape([-1, 1])),
"y": df['label'].to_numpy()
}
return result

data_shard = data_shard.transform_shard(transform)

temp = tempfile.mkdtemp()
model_dir = os.path.join(temp, "test_model")

est = Estimator.from_keras(keras_model=model, model_dir=model_dir)

assert est.get_train_summary("Loss") is None
assert est.get_validation_summary("Top1Accuracy") is None

est.fit(data=data_shard,
batch_size=8,
epochs=10,
validation_data=data_shard)

train_loss = est.get_train_summary("Loss")
assert len(train_loss) > 0
val_scores = est.get_validation_summary("Top1Accuracy")
assert len(val_scores) > 0

tf.reset_default_graph()
# no model dir
model = self.create_model()
est = Estimator.from_keras(keras_model=model)
log_dir = os.path.join(temp, "log")
est.set_tensorboard(log_dir, "test")

est.fit(data=data_shard,
batch_size=8,
epochs=10,
validation_data=data_shard)

assert os.path.exists(os.path.join(log_dir, "test/train"))
assert os.path.exists(os.path.join(log_dir, "test/validation"))

train_loss = est.get_train_summary("Loss")
val_scores = est.get_validation_summary("Loss")
assert len(train_loss) > 0
assert len(val_scores) > 0
shutil.rmtree(temp)

def test_convert_predict_list_of_array(self):

tf.reset_default_graph()
Expand Down

0 comments on commit f80f80c

Please sign in to comment.