Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orca spark estimator #3124

Merged
merged 22 commits into from
Dec 15, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def forward(self, x):
model = SimpleModel()

estimator = Estimator.from_torch(model=model, loss=nn.BCELoss(),
optimizer=Adam(), backend="bigdl")
optimizer=Adam())

inputs = torch.Tensor([[1, 2], [1, 3], [3, 2], [5, 6], [8, 9], [1, 9]])
targets = torch.Tensor([[0], [0], [0], [1], [1], [1]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from zoo.orca.learn.trigger import SeveralIteration
from zoo.orca.learn.tf.estimator import Estimator
from zoo.common.nncontext import *
from zoo.orca.learn.tf.estimator import Estimator
from zoo.orca.learn.tf.utils import convert_predict_to_dataframe


Expand Down
2 changes: 1 addition & 1 deletion pyzoo/test/zoo/orca/learn/spark/test_estimator_openvino.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import numpy as np
from zoo import init_nncontext
from zoo.orca.data import SparkXShards
from zoo.orca.learn.openvino.estimator import Estimator
from zoo.orca.learn.openvino import Estimator
from bigdl.dataset.base import maybe_download

property_path = os.path.join(os.path.split(__file__)[0],
Expand Down
3 changes: 1 addition & 2 deletions pyzoo/zoo/examples/orca/learn/pytorch/mnist/lenet_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ def main():
criterion = nn.NLLLoss()

adam = torch.optim.Adam(model.parameters(), args.lr)
zoo_estimator = Estimator.from_torch(model=model, optimizer=adam, loss=criterion,
backend="bigdl")
zoo_estimator = Estimator.from_torch(model=model, optimizer=adam, loss=criterion)
zoo_estimator.fit(data=train_loader, epochs=args.epochs, validation_data=test_loader,
validation_methods=[Accuracy()], checkpoint_trigger=EveryEpoch())
zoo_estimator.evaluate(data=test_loader, validation_methods=[Accuracy()])
Expand Down
43 changes: 43 additions & 0 deletions pyzoo/zoo/orca/learn/base_estimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from abc import ABC, abstractmethod


class BaseEstimator(ABC):
@abstractmethod
def fit(self, data, epochs, **kwargs):
pass

@abstractmethod
def predict(self, data, **kwargs):
pass

@abstractmethod
def evaluate(self, data, **kwargs):
pass

@abstractmethod
def get_model(self):
pass

@abstractmethod
def save(self, model_path):
pass

@abstractmethod
def load(self, checkpoint, **kwargs):
pass
102 changes: 30 additions & 72 deletions pyzoo/zoo/orca/learn/bigdl/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,14 @@
#
from zoo.pipeline.nnframes import NNEstimator, NNModel
from zoo.pipeline.estimator import Estimator as SparkEstimator
from zoo.orca.learn.spark_estimator import Estimator as OrcaSparkEstimator
from zoo.orca.data import SparkXShards
from bigdl.optim.optimizer import MaxEpoch
from zoo.feature.common import FeatureSet
from pyspark.sql.dataframe import DataFrame


class Estimator(object):
def fit(self, data, epochs, **kwargs):
pass

def predict(self, data, **kwargs):
pass

def evaluate(self, data, **kwargs):
pass

def get_model(self):
pass

def save(self, model_path):
pass

def load(self, checkpoint):
pass

def set_tensorboard(self, log_dir, app_name):
"""
Set summary information during the training process for visualization purposes.
Saved summary can be viewed via TensorBoard.
In order to take effect, it needs to be called before fit.

Training summary will be saved to 'log_dir/app_name/train'
and validation summary (if any) will be saved to 'log_dir/app_name/validation'.

# Arguments
:param log_dir: The base directory path to store training and validation logs.
:param app_name: The name of the application.
"""
pass

def clear_gradient_clipping(self):
pass

def set_constant_gradient_clipping(self, min, max):
pass

def set_l2_norm_gradient_clipping(self, clip_norm):
pass

def get_train_summary(self, tag=None):
pass

def get_validation_summary(self, tag=None):
pass

@staticmethod
def from_bigdl(*, model, loss=None, optimizer=None, feature_preprocessing=None,
label_preprocessing=None, model_dir=None):
Expand Down Expand Up @@ -107,7 +60,7 @@ def from_bigdl(*, model, loss=None, optimizer=None, feature_preprocessing=None,
label_preprocessing=label_preprocessing, model_dir=model_dir)


class BigDLEstimatorWrapper(Estimator):
class BigDLEstimatorWrapper(OrcaSparkEstimator):
cyita marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, *, model, loss, optimizer=None, feature_preprocessing=None,
label_preprocessing=None, model_dir=None):
self.loss = loss
Expand Down Expand Up @@ -262,29 +215,7 @@ def load(self, checkpoint, optimizer=None, loss=None, feature_preprocessing=None
self.model_dir = model_dir

if is_checkpoint:
from zoo.orca.learn.utils import find_latest_checkpoint
from zoo.pipeline.api.net import Net
from bigdl.nn.layer import Model, Container
from bigdl.optim.optimizer import OptimMethod
import os
path, prefix, version = find_latest_checkpoint(checkpoint, model_type="bigdl")
if path is None:
raise ValueError("Cannot find BigDL checkpoint, please check your checkpoint path.")
try:
self.model = Model.load(os.path.join(path, "model.{}".format(version)))
assert isinstance(self.model, Container), \
"The loaded model should be a Container, please check your checkpoint type."
self.optimizer = OptimMethod.load(os.path.join(path,
"{}.{}".format(prefix, version)))
except Exception:
raise ValueError("Cannot load BigDL checkpoint, please check your checkpoint path "
"and checkpoint type.")
self.estimator = SparkEstimator(self.model, self.optimizer, self.model_dir)
self.nn_estimator = NNEstimator(self.model, self.loss, self.feature_preprocessing,
self.label_preprocessing)
if self.optimizer is not None:
self.nn_estimator.setOptimMethod(self.optimizer)
self.nn_model = NNModel(self.model, feature_preprocessing=self.feature_preprocessing)
self.load_latest_orca_checkpoint(checkpoint)
else:
from zoo.pipeline.api.net import Net
self.model = Net.load_bigdl(checkpoint + ".bigdl", checkpoint + ".bin")
Expand All @@ -299,6 +230,33 @@ def load(self, checkpoint, optimizer=None, loss=None, feature_preprocessing=None
self.nn_model = NNModel(self.model, feature_preprocessing=self.feature_preprocessing)
return self

def load_orca_checkpoint(self, path, version, prefix=None):
from bigdl.nn.layer import Model, Container
from bigdl.optim.optimizer import OptimMethod
import os
try:
self.model = Model.load(os.path.join(path, "model.{}".format(version)))
assert isinstance(self.model, Container), \
"The loaded model should be a Container, please check your checkpoint type."
self.optimizer = OptimMethod.load(os.path.join(path,
"{}.{}".format(prefix, version)))
except Exception:
raise ValueError("Cannot load BigDL checkpoint, please check your checkpoint path "
"and checkpoint type.")
self.estimator = SparkEstimator(self.model, self.optimizer, self.model_dir)
self.nn_estimator = NNEstimator(self.model, self.loss, self.feature_preprocessing,
self.label_preprocessing)
if self.optimizer is not None:
self.nn_estimator.setOptimMethod(self.optimizer)
self.nn_model = NNModel(self.model, feature_preprocessing=self.feature_preprocessing)

def load_latest_orca_checkpoint(self, path):
from zoo.orca.learn.utils import find_latest_checkpoint
path, prefix, version = find_latest_checkpoint(path, model_type="bigdl")
if path is None:
raise ValueError("Cannot find BigDL checkpoint, please check your checkpoint path.")
self.load_orca_checkpoint(path=path, version=version, prefix=prefix)

def clear_gradient_clipping(self):
self.nn_estimator.clearGradientClipping()
self.estimator.clear_gradient_clipping()
Expand Down
45 changes: 26 additions & 19 deletions pyzoo/zoo/orca/learn/openvino/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
from zoo.pipeline.inference import InferenceModel
from zoo.orca.data import SparkXShards
from zoo.orca.learn.spark_estimator import Estimator as SparkEstimator
from zoo import get_node_and_core_number
from zoo.util import nest
from zoo.common.nncontext import init_nncontext
Expand All @@ -23,24 +24,6 @@


class Estimator(object):
def fit(self, data, epochs, **kwargs):
pass

def predict(self, data, **kwargs):
pass

def evaluate(self, data, **kwargs):
pass

def get_model(self):
pass

def save(self, model_path):
pass

def load(self, model_path, **kwargs):
pass

@staticmethod
def from_openvino(*, model_path, batch_size=0):
"""
Expand All @@ -52,7 +35,7 @@ def from_openvino(*, model_path, batch_size=0):
return OpenvinoEstimatorWrapper(model_path=model_path, batch_size=batch_size)


class OpenvinoEstimatorWrapper(Estimator):
class OpenvinoEstimatorWrapper(SparkEstimator):
def __init__(self,
*,
model_path,
Expand Down Expand Up @@ -183,3 +166,27 @@ def load(self, model_path, batch_size=0):
self.model.load_openvino(model_path=model_path,
weight_path=model_path[:model_path.rindex(".")] + ".bin",
batch_size=batch_size)

def set_tensorboard(self, log_dir, app_name):
raise NotImplementedError

def clear_gradient_clipping(self):
raise NotImplementedError

def set_constant_gradient_clipping(self, min, max):
raise NotImplementedError

def set_l2_norm_gradient_clipping(self, clip_norm):
raise NotImplementedError

def get_train_summary(self, tag=None):
raise NotImplementedError

def get_validation_summary(self, tag=None):
raise NotImplementedError

def load_orca_checkpoint(self, path, version):
raise NotImplementedError

def load_latest_orca_checkpoint(self, path):
raise NotImplementedError
33 changes: 24 additions & 9 deletions pyzoo/zoo/orca/learn/pytorch/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
#
from zoo.pipeline.estimator.estimator import Estimator as SparkEstimator
from zoo.orca.learn.pytorch.training_operator import TrainingOperator
from zoo.orca.learn.spark_estimator import Estimator as OrcaSparkEstimator
from zoo.orca.data import SparkXShards
from bigdl.optim.optimizer import MaxEpoch
from zoo.feature.common import FeatureSet

import torch
from torch.optim.optimizer import Optimizer as TorchOptimizer
from torch.utils.data import DataLoader

Expand Down Expand Up @@ -195,7 +194,7 @@ def shutdown(self, force=False):
return self.estimator.shutdown(force=force)


class PytorchSparkEstimatorWrapper(Estimator):
class PytorchSparkEstimatorWrapper(OrcaSparkEstimator):
def __init__(self, model, loss, optimizer, model_dir=None, bigdl_type="float"):
from zoo.pipeline.api.torch import TorchModel, TorchLoss, TorchOptim
self.loss = loss
Expand All @@ -208,6 +207,8 @@ def __init__(self, model, loss, optimizer, model_dir=None, bigdl_type="float"):
optimizer = SGD()
elif isinstance(optimizer, TorchOptimizer):
optimizer = TorchOptim.from_pytorch(optimizer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make sure optimizer is either TorchOptimizer or or.learn.optimizers.Optimizer

self.log_dir = None
self.app_name = None
Comment on lines +215 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anywhere to set these two inputs? Seems they are always None and set_tensorboard will never be triggered... @cyita

self.model_dir = model_dir
self.model = TorchModel.from_pytorch(model)
self.estimator = SparkEstimator(self.model, optimizer, model_dir, bigdl_type=bigdl_type)
Expand All @@ -223,6 +224,9 @@ def fit(self, data, epochs=1, batch_size=32, validation_data=None, validation_me
validation_methods = Metrics.convert_metrics_list(validation_methods)
checkpoint_trigger = Trigger.convert_trigger(checkpoint_trigger)

if self.log_dir is not None and self.app_name is not None:
self.estimator.set_tensorboad(self.log_dir, self.app_name)

if isinstance(data, SparkXShards):
train_rdd = data.rdd.flatMap(to_sample)
train_feature_set = FeatureSet.sample_rdd(train_rdd)
Expand Down Expand Up @@ -274,20 +278,25 @@ def evaluate(self, data, validation_methods=None, batch_size=32):
def get_model(self):
return self.model.to_pytorch()

def save(self, checkpoint):
def save(self, model_path):
pass

def load(self, checkpoint, loss=None):
from zoo.orca.learn.utils import find_latest_checkpoint
from bigdl.nn.layer import Model
from bigdl.optim.optimizer import OptimMethod
import os
if loss is not None:
from zoo.pipeline.api.torch import TorchLoss
self.loss = TorchLoss.from_pytorch(loss)
path, prefix, version = find_latest_checkpoint(checkpoint, model_type="pytorch")
if path is None:
raise ValueError("Cannot find PyTorch checkpoint, please check your checkpoint path.")
self.load_orca_checkpoint(path, version=version, prefix=prefix)

def load_orca_checkpoint(self, path, version, prefix=None):
import os
from bigdl.nn.layer import Model
from bigdl.optim.optimizer import OptimMethod
assert prefix is not None, "You should provide optimMethod prefix, " \
"for example 'optimMethod-TorchModelf53bddcc'"
try:
self.model = Model.load(os.path.join(path, "model.{}".format(version)))
optimizer = OptimMethod.load(os.path.join(path, "{}.{}".format(prefix, version)))
Expand All @@ -296,8 +305,14 @@ def load(self, checkpoint, loss=None):
"and checkpoint type.")
self.estimator = SparkEstimator(self.model, optimizer, self.model_dir)

def shutdown(self, force=False):
pass
def load_latest_orca_checkpoint(self, path):
self.load(checkpoint=path)

def get_train_summary(self, tag=None):
return self.estimator.get_train_summary(tag=tag)

def get_validation_summary(self, tag=None):
return self.estimator.get_validation_summary(tag=tag)

def clear_gradient_clipping(self):
"""
Expand Down
Loading