Skip to content

Commit

Permalink
Merge pull request #184 from OscarDPan/feat/batch_inference
Browse files Browse the repository at this point in the history
Feat/batch inference and return training history
  • Loading branch information
danielenricocahall authored Apr 13, 2021
2 parents e1b52b3 + 2d37fe3 commit 3e5a44a
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 27 deletions.
14 changes: 14 additions & 0 deletions elephas/ml/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,17 @@ def set_custom_objects(self, custom_objects):

def get_custom_objects(self):
return self.getOrDefault(self.custom_objects)


class HasInferenceBatchSize(Params):
def __init__(self):
super(HasInferenceBatchSize, self).__init__()
self.inference_batch_size = Param(self, "inference_batch_size", "Batch inference could help limit memory consumption in executors")
self._setDefault(inference_batch_size=None)

def set_inference_batch_size(self, batch_size):
self._paramMap[self.inference_batch_size] = batch_size
return self

def get_inference_batch_size(self):
return self.getOrDefault(self.inference_batch_size)
57 changes: 46 additions & 11 deletions elephas/ml_model.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import copy
import json
import warnings
from functools import partial

import numpy as np
import copy
import h5py
import json

from pyspark.ml.param.shared import HasOutputCol, HasFeaturesCol, HasLabelCol
import numpy as np
from pyspark import keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.param.shared import HasOutputCol, HasFeaturesCol, HasLabelCol
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType, StructField, ArrayType

from tensorflow.keras.models import model_from_yaml
from tensorflow.keras.optimizers import get as get_optimizer

Expand Down Expand Up @@ -107,7 +105,8 @@ def _fit(self, df: DataFrame):
keras_model_config=spark_model.master_network.to_yaml(),
weights=model_weights,
custom_objects=self.get_custom_objects(),
model_type=LossModelTypeMapper().get_model_type(loss))
model_type=LossModelTypeMapper().get_model_type(loss),
history=spark_model.training_histories)

def setFeaturesCol(self, value):
warnings.warn("setFeaturesCol is deprecated in Spark 3.0.x+ - please supply featuresCol in the constructor i.e;"
Expand All @@ -131,7 +130,8 @@ def load_ml_estimator(file_name: str) -> ElephasEstimator:
return ElephasEstimator(**config)


class ElephasTransformer(Model, HasKerasModelConfig, HasLabelCol, HasOutputCol, HasFeaturesCol, HasCustomObjects):
class ElephasTransformer(Model, HasKerasModelConfig, HasLabelCol, HasOutputCol, HasFeaturesCol, HasCustomObjects,
HasInferenceBatchSize):
"""SparkML Transformer implementation. Contains a trained model,
with which new feature data can be transformed into labels.
"""
Expand All @@ -145,8 +145,13 @@ def __init__(self, **kwargs):
if "model_type" in kwargs.keys():
# Extract loss from parameters
self.model_type = kwargs.pop('model_type')
self._history = kwargs.pop("history", [])
self.set_params(**kwargs)

@property
def history(self):
return self._history

@keyword_only
def set_params(self, **kwargs):
"""Set all provided parameters, otherwise set defaults
Expand Down Expand Up @@ -183,19 +188,49 @@ def _transform(self, df):
rdd = df.rdd
weights = rdd.ctx.broadcast(self.weights)

def batched_prediction(data, inference_batch_size, features_col, predict_function) -> np.ndarray:
# Do prediction in batches instead of materializing the whole partition at once
batch = []
preds = []
for row in data:
if len(batch) < inference_batch_size:
batch.append(from_vector(row[features_col]))
else:
batch_np = np.array(batch)
pred = predict_function(batch_np)
preds.append(pred)
batch = [from_vector(row[features_col])]
if len(batch) > 0:
batch_np = np.array(batch)
pred = predict_function(batch_np)
preds.append(pred)

if preds:
res = np.vstack(preds)
return res
else:
return np.array([])

def extract_features_and_predict(model_yaml: str,
custom_objects: dict,
features_col: str,
data):
data,
inference_batch_size: int = None):
model = model_from_yaml(model_yaml, custom_objects)
model.set_weights(weights.value)
return model.predict(np.stack([from_vector(x[features_col]) for x in data]))
if inference_batch_size is not None and inference_batch_size > 0:
return batched_prediction(data, inference_batch_size, features_col, model.predict)
else:
return model.predict(np.array([from_vector(x[features_col]) for x in data]))

predictions = rdd.mapPartitions(
partial(extract_features_and_predict,
self.get_keras_model_config(),
self.get_custom_objects(),
self.getFeaturesCol()))
self.getFeaturesCol(),
inference_batch_size=self.get_inference_batch_size()
)
)
if self.model_type == ModelType.REGRESSION:
predictions = predictions.map(lambda x: tuple([float(x)]))
output_col_field = StructField(output_col, DoubleType(), True)
Expand Down
14 changes: 10 additions & 4 deletions elephas/spark_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_serv
:param batch_size: batch size used for training and inference
:param port: port used in case of 'http' parameter server mode
"""

self._training_histories = []
self._master_network = model
if not hasattr(model, "loss"):
raise Exception(
Expand Down Expand Up @@ -97,6 +97,10 @@ def save(self, file_name):
f.flush()
f.close()

@property
def training_histories(self):
return self._training_histories

@property
def master_network(self):
return self._master_network
Expand Down Expand Up @@ -178,10 +182,12 @@ def _fit(self, rdd: RDD, **kwargs):
elif self.mode == 'synchronous':
worker = SparkWorker(yaml, parameters, train_config,
optimizer, loss, metrics, custom)
gradients = rdd.mapPartitions(worker.train).collect()
training_outcomes = rdd.mapPartitions(worker.train).collect()
new_parameters = self._master_network.get_weights()
number_of_sub_models = len(gradients)
for grad in gradients: # Accumulate simple average gradients one by one
number_of_sub_models = len(training_outcomes)
for training_outcome in training_outcomes:
grad, history = training_outcome
self.training_histories.append(history)
weighted_grad = divide_by(grad, number_of_sub_models)
new_parameters = subtract_params(new_parameters, weighted_grad)
print('>>> Synchronous training complete.')
Expand Down
12 changes: 6 additions & 6 deletions elephas/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, yaml, parameters, train_config, master_optimizer,
def train(self, data_iterator):
"""Train a keras model on a worker
"""
history = None
optimizer = get_optimizer(self.master_optimizer)
self.model = model_from_yaml(self.yaml, self.custom_objects)
self.model.compile(optimizer=optimizer,
Expand All @@ -36,17 +37,16 @@ def train(self, data_iterator):
x_train = np.asarray([x for x, y in feature_iterator])
y_train = np.asarray([y for x, y in label_iterator])

self.model.compile(optimizer=get_optimizer(self.master_optimizer),
loss=self.master_loss,
metrics=self.master_metrics)

weights_before_training = self.model.get_weights()
if x_train.shape[0] > self.train_config.get('batch_size'):
self.model.fit(x_train, y_train, **self.train_config)
history = self.model.fit(x_train, y_train, **self.train_config)
weights_after_training = self.model.get_weights()
deltas = subtract_params(
weights_before_training, weights_after_training)
yield deltas
if history:
yield [deltas, history.history]
else:
yield [deltas, None]


class AsynchronousSparkWorker(object):
Expand Down
56 changes: 50 additions & 6 deletions tests/test_ml_model.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import numpy as np
import pytest
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics, RegressionMetrics
from pyspark.sql.types import DoubleType
from tensorflow.keras import optimizers
from tensorflow.keras.activations import relu
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.activations import relu

from elephas.ml_model import ElephasEstimator, load_ml_estimator, ElephasTransformer, load_ml_transformer
from elephas.ml.adapter import to_data_frame

from pyspark.mllib.evaluation import MulticlassMetrics, RegressionMetrics
from pyspark.ml import Pipeline

from elephas.ml_model import ElephasEstimator, load_ml_estimator, ElephasTransformer, load_ml_transformer
from elephas.utils.model_utils import ModelType, argmax


Expand Down Expand Up @@ -232,6 +231,7 @@ def test_set_cols(spark_context, regression_model, boston_housing_dataset):
def test_custom_objects(spark_context, boston_housing_dataset):
def custom_activation(x):
return 2 * relu(x)

model = Sequential()
model.add(Dense(64, input_shape=(13,)))
model.add(Dense(64, activation=custom_activation))
Expand Down Expand Up @@ -295,3 +295,47 @@ def test_predict_classes_probability(spark_context, classification_model, mnist_
# and therefore 10 probabilities
assert len(results.take(1)[0].prediction) == 10


def test_batch_predict_classes_probability(spark_context, classification_model, mnist_data):
batch_size = 64
nb_classes = 10
epochs = 1

x_train, y_train, x_test, y_test = mnist_data
x_train = x_train[:1000]
y_train = y_train[:1000]
df = to_data_frame(spark_context, x_train, y_train, categorical=True)
test_df = to_data_frame(spark_context, x_test, y_test, categorical=True)

sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
sgd_conf = optimizers.serialize(sgd)

# Initialize Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(classification_model.to_yaml())
estimator.set_optimizer_config(sgd_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])
estimator.set_epochs(epochs)
estimator.set_batch_size(batch_size)
estimator.set_validation_split(0.1)
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)

# Fitting a model returns a Transformer
fitted_pipeline = estimator.fit(df)

results = fitted_pipeline.transform(test_df)

# Set inference batch size and do transform again on the same test_df
inference_batch_size = int(len(y_test) / 10)
fitted_pipeline.set_params(inference_batch_size=inference_batch_size)
fitted_pipeline.set_params(outputCol="prediction_via_batch_inference")
results_with_batch_prediction = fitted_pipeline.transform(results)
# we should have an array of 10 elements in the prediction column, since we have 10 classes
# and therefore 10 probabilities
results_np = results_with_batch_prediction.take(1)[0]
assert len(results_np.prediction) == 10
assert len(results_np.prediction_via_batch_inference) == 10
assert np.array_equal(results_np.prediction, results_np.prediction_via_batch_inference)

0 comments on commit 3e5a44a

Please sign in to comment.