From 15e9c338f5869c519ab13f6612fb89bc8c200bdb Mon Sep 17 00:00:00 2001 From: Bago Amirbekian Date: Mon, 18 Dec 2017 10:52:16 -0800 Subject: [PATCH 1/6] Add fitMultiple method to Estimator. --- python/pyspark/ml/base.py | 32 +++++++++++++++++++++++++++++--- python/pyspark/ml/tuning.py | 33 ++++++++++++++++++--------------- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index a6767cee9bf28..7ff9db6f3be44 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -18,15 +18,33 @@ from abc import ABCMeta, abstractmethod import copy +import threading from pyspark import since -from pyspark.ml.param import Params from pyspark.ml.param.shared import * from pyspark.ml.common import inherit_doc from pyspark.sql.functions import udf -from pyspark.sql.types import StructField, StructType, DoubleType +from pyspark.sql.types import StructField, StructType +class FitMutlipleIterator(object): + def __init__(self, fitSingleModel, numModel): + self.fitSingleModel = fitSingleModel + self.numModel = numModel + self.counter = 0 + self.lock = threading.Lock() + + def __iter__(self): + return self + + def next(self): + with self.lock: + index = self.counter + if index >= self.numModel: + raise StopIteration("No models remaining.") + self.counter += 1 + return index, self.fitSingleModel(index) + @inherit_doc class Estimator(Params): """ @@ -47,6 +65,11 @@ def _fit(self, dataset): """ raise NotImplementedError() + def fitMultiple(self, dataset, params): + def fitSingleModel(index): + return self.fit(dataset, params[index]) + return FitMutlipleIterator(fitSingleModel, len(params)) + @since("1.3.0") def fit(self, dataset, params=None): """ @@ -61,7 +84,10 @@ def fit(self, dataset, params=None): if params is None: params = dict() if isinstance(params, (list, tuple)): - return [self.fit(dataset, paramMap) for paramMap in params] + models = [None] * len(params) + for index, model in self.fitMultiple(dataset, params): + models[index] = model + return models elif isinstance(params, dict): if params: return self.copy(params)._fit(dataset) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 47351133524e7..03c538f845d09 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -31,6 +31,17 @@ 'TrainValidationSplitModel'] +def parallelFitTasks(est, train, eva, validation, epm): + modelIter = est.fitMultiple(train, epm) + + def singleTask(): + index, model = modelIter.next() + metric = eva.evaluate(model.transform(validation, epm[index])) + return index, metric + + return [singleTask] * len(epm) + + class ParamGridBuilder(object): r""" Builder for a param grid used in grid search-based model selection. @@ -266,15 +277,9 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - def singleTrain(paramMap): - model = est.fit(train, paramMap) - # TODO: duplicate evaluator to take extra params from input - metric = eva.evaluate(model.transform(validation, paramMap)) - return metric - - currentFoldMetrics = pool.map(singleTrain, epm) - for j in range(numModels): - metrics[j] += (currentFoldMetrics[j] / nFolds) + tasks = parallelFitTasks(est, train, eva, validation, epm) + for j, metric in pool.imap_unordered(lambda f: f(), tasks): + metrics[j] += (metric / nFolds) validation.unpersist() train.unpersist() @@ -523,13 +528,11 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - def singleTrain(paramMap): - model = est.fit(train, paramMap) - metric = eva.evaluate(model.transform(validation, paramMap)) - return metric - + tasks = parallelFitTasks(est, train, eva, validation, epm) pool = ThreadPool(processes=min(self.getParallelism(), numModels)) - metrics = pool.map(singleTrain, epm) + metrics = [None] * numModels + for j, m in pool.imap_unordered(lambda f: f(), tasks): + metrics[j] = m train.unpersist() validation.unpersist() From fdef9d5bfc75b26200f6ff3c78fee044bc64cb66 Mon Sep 17 00:00:00 2001 From: Bago Amirbekian Date: Mon, 18 Dec 2017 16:45:42 -0800 Subject: [PATCH 2/6] A test, some docs, and python2/3 compatibility. --- python/pyspark/ml/base.py | 20 +++++++++++++++++++- python/pyspark/ml/tests.py | 15 +++++++++++++++ python/pyspark/ml/tuning.py | 6 +++--- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index 7ff9db6f3be44..eb35808df53ca 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -28,6 +28,10 @@ class FitMutlipleIterator(object): + """ + Used by default implementation of Estimator.fitMultiple to produce models in a thread safe + iterator. + """ def __init__(self, fitSingleModel, numModel): self.fitSingleModel = fitSingleModel self.numModel = numModel @@ -37,7 +41,7 @@ def __init__(self, fitSingleModel, numModel): def __iter__(self): return self - def next(self): + def __next__(self): with self.lock: index = self.counter if index >= self.numModel: @@ -45,6 +49,11 @@ def next(self): self.counter += 1 return index, self.fitSingleModel(index) + def next(self): + """For python2 compatibility.""" + return self.__next__() + + @inherit_doc class Estimator(Params): """ @@ -66,6 +75,15 @@ def _fit(self, dataset): raise NotImplementedError() def fitMultiple(self, dataset, params): + """ + Fits a model to the input dataset for each param map in params. + + :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`. + :param params: A list/tuple of param maps. + :return: A thread safe iterable which contains one model for each param map. Each + call to `next(modelIterator)` will return `(index, model)` where model was fit using + `params[index]`. Params maps may be fit in an order different than their order in params. + """ def fitSingleModel(index): return self.fit(dataset, params[index]) return FitMutlipleIterator(fitSingleModel, len(params)) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index be1521154f042..e2d218ec30d3d 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -2359,6 +2359,21 @@ def test_unary_transformer_transform(self): self.assertEqual(res.input + shiftVal, res.output) +class TestFit(unittest.TestCase): + + def testDefaultFitMultiple(self): + N = 4 + data = MockDataset() + estimator = MockEstimator() + params = [{estimator.fake: i} for i in range(N)] + modelIter = estimator.fitMultiple(data, params) + indexList = [] + for index, model in modelIter: + self.assertEqual(model.getFake(), index) + indexList.append(index) + self.assertEqual(sorted(indexList), list(range(N))) + + if __name__ == "__main__": from pyspark.ml.tests import * if xmlrunner: diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 03c538f845d09..f506cc4084287 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -35,7 +35,7 @@ def parallelFitTasks(est, train, eva, validation, epm): modelIter = est.fitMultiple(train, epm) def singleTask(): - index, model = modelIter.next() + index, model = next(modelIter) metric = eva.evaluate(model.transform(validation, epm[index])) return index, metric @@ -531,8 +531,8 @@ def _fit(self, dataset): tasks = parallelFitTasks(est, train, eva, validation, epm) pool = ThreadPool(processes=min(self.getParallelism(), numModels)) metrics = [None] * numModels - for j, m in pool.imap_unordered(lambda f: f(), tasks): - metrics[j] = m + for j, metric in pool.imap_unordered(lambda f: f(), tasks): + metrics[j] = metric train.unpersist() validation.unpersist() From 49c833214e234144f6606e4f8bd53e320a268150 Mon Sep 17 00:00:00 2001 From: Bago Amirbekian Date: Fri, 22 Dec 2017 14:00:52 -0800 Subject: [PATCH 3/6] Added version & experimental tags. --- python/pyspark/ml/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index eb35808df53ca..f314397b61775 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -74,6 +74,7 @@ def _fit(self, dataset): """ raise NotImplementedError() + @since("2.3.0") def fitMultiple(self, dataset, params): """ Fits a model to the input dataset for each param map in params. @@ -83,6 +84,8 @@ def fitMultiple(self, dataset, params): :return: A thread safe iterable which contains one model for each param map. Each call to `next(modelIterator)` will return `(index, model)` where model was fit using `params[index]`. Params maps may be fit in an order different than their order in params. + + .. note:: Experimental """ def fitSingleModel(index): return self.fit(dataset, params[index]) From d73af1ffb409b35788be927fe56dd8aaf03b4f10 Mon Sep 17 00:00:00 2001 From: Bago Amirbekian Date: Wed, 27 Dec 2017 12:59:06 -0800 Subject: [PATCH 4/6] Style fix. --- python/pyspark/ml/base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index f314397b61775..e91ab787e96bb 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -82,8 +82,9 @@ def fitMultiple(self, dataset, params): :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`. :param params: A list/tuple of param maps. :return: A thread safe iterable which contains one model for each param map. Each - call to `next(modelIterator)` will return `(index, model)` where model was fit using - `params[index]`. Params maps may be fit in an order different than their order in params. + call to `next(modelIterator)` will return `(index, model)` where model was fit + using `params[index]`. Params maps may be fit in an order different than their + order in params. .. note:: Experimental """ From fe3d6bddc3e9e50febf706d7f22007b1e0d58de3 Mon Sep 17 00:00:00 2001 From: Bago Amirbekian Date: Thu, 28 Dec 2017 14:01:12 -0800 Subject: [PATCH 5/6] PR feedback. --- python/pyspark/ml/base.py | 30 +++++++++++++++++++++++------- python/pyspark/ml/tests.py | 2 +- python/pyspark/ml/tuning.py | 17 ++++++++++++++--- 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index e91ab787e96bb..9f99a66c3416b 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -27,14 +27,26 @@ from pyspark.sql.types import StructField, StructType -class FitMutlipleIterator(object): +class FitMultipleIterator(object): """ Used by default implementation of Estimator.fitMultiple to produce models in a thread safe - iterator. + iterator. This class handles the simple case of fitMultiple where each param map should be + fit independently. + + :param fitSingleModel: Function: (int => Model) which fits an estimator to a dataset. + `fitSingleModel` may be called up to `numModels` times, with a unique index each time. + Each call to `fitSingleModel` with an index should return the Model associated with + that index. + :param numModel: Number of models this iterator should produce. + + See Estimator.fitMultiple for more info. """ - def __init__(self, fitSingleModel, numModel): + def __init__(self, fitSingleModel, numModels): + """ + + """ self.fitSingleModel = fitSingleModel - self.numModel = numModel + self.numModel = numModels self.counter = 0 self.lock = threading.Lock() @@ -80,17 +92,21 @@ def fitMultiple(self, dataset, params): Fits a model to the input dataset for each param map in params. :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`. - :param params: A list/tuple of param maps. + :param params: A Sequence of param maps. :return: A thread safe iterable which contains one model for each param map. Each call to `next(modelIterator)` will return `(index, model)` where model was fit using `params[index]`. Params maps may be fit in an order different than their order in params. + .. note:: DeveloperApi .. note:: Experimental """ + estimator = self.copy() + def fitSingleModel(index): - return self.fit(dataset, params[index]) - return FitMutlipleIterator(fitSingleModel, len(params)) + return estimator.fit(dataset, params[index]) + + return FitMultipleIterator(fitSingleModel, len(params)) @since("1.3.0") def fit(self, dataset, params=None): diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index e2d218ec30d3d..7f8231a1cca56 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -2359,7 +2359,7 @@ def test_unary_transformer_transform(self): self.assertEqual(res.input + shiftVal, res.output) -class TestFit(unittest.TestCase): +class EstimatorTest(unittest.TestCase): def testDefaultFitMultiple(self): N = 4 diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index f506cc4084287..6c0cad6cbaaa1 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -31,7 +31,18 @@ 'TrainValidationSplitModel'] -def parallelFitTasks(est, train, eva, validation, epm): +def _parallelFitTasks(est, train, eva, validation, epm): + """ + Creates a list of callables which can be called from different threads to fit and evaluate + an estimator in parallel. Each callable returns an `(index, metric)` pair. + + :param est: Estimator, the estimator to be fit. + :param train: DataFrame, training data set, used for fitting. + :param eva: Evaluator, used to compute `metric` + :param validation: DataFrame, validation data set, used for evaluation. + :param epm: Sequence of ParamMap, params maps to be used during fitting & evaluation. + :return: (int, float), an index into `epm` and the associated metric value. + """ modelIter = est.fitMultiple(train, epm) def singleTask(): @@ -277,7 +288,7 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - tasks = parallelFitTasks(est, train, eva, validation, epm) + tasks = _parallelFitTasks(est, train, eva, validation, epm) for j, metric in pool.imap_unordered(lambda f: f(), tasks): metrics[j] += (metric / nFolds) validation.unpersist() @@ -528,7 +539,7 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - tasks = parallelFitTasks(est, train, eva, validation, epm) + tasks = _parallelFitTasks(est, train, eva, validation, epm) pool = ThreadPool(processes=min(self.getParallelism(), numModels)) metrics = [None] * numModels for j, metric in pool.imap_unordered(lambda f: f(), tasks): From c44db979e83c0c2e49df84e9e3e0fd375748d7e3 Mon Sep 17 00:00:00 2001 From: Bago Amirbekian Date: Fri, 29 Dec 2017 13:37:15 -0800 Subject: [PATCH 6/6] Update params argument to be paramMaps & Make _FitMultipleIterator private. --- python/pyspark/ml/base.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index 9f99a66c3416b..d4470b5bf2900 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -27,7 +27,7 @@ from pyspark.sql.types import StructField, StructType -class FitMultipleIterator(object): +class _FitMultipleIterator(object): """ Used by default implementation of Estimator.fitMultiple to produce models in a thread safe iterator. This class handles the simple case of fitMultiple where each param map should be @@ -87,16 +87,15 @@ def _fit(self, dataset): raise NotImplementedError() @since("2.3.0") - def fitMultiple(self, dataset, params): + def fitMultiple(self, dataset, paramMaps): """ - Fits a model to the input dataset for each param map in params. + Fits a model to the input dataset for each param map in `paramMaps`. :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`. - :param params: A Sequence of param maps. + :param paramMaps: A Sequence of param maps. :return: A thread safe iterable which contains one model for each param map. Each call to `next(modelIterator)` will return `(index, model)` where model was fit - using `params[index]`. Params maps may be fit in an order different than their - order in params. + using `paramMaps[index]`. `index` values may not be sequential. .. note:: DeveloperApi .. note:: Experimental @@ -104,9 +103,9 @@ def fitMultiple(self, dataset, params): estimator = self.copy() def fitSingleModel(index): - return estimator.fit(dataset, params[index]) + return estimator.fit(dataset, paramMaps[index]) - return FitMultipleIterator(fitSingleModel, len(params)) + return _FitMultipleIterator(fitSingleModel, len(paramMaps)) @since("1.3.0") def fit(self, dataset, params=None):