Skip to content

Commit

Permalink
Rebased with master
Browse files Browse the repository at this point in the history
  • Loading branch information
jkbradley committed May 11, 2015
1 parent 82fee9d commit 7431272
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 37 deletions.
7 changes: 3 additions & 4 deletions mllib/src/main/scala/org/apache/spark/ml/param/params.scala
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,15 @@ trait Params extends Identifiable with Serializable {
def copy(extra: ParamMap): Params = {
val that = this.getClass.newInstance()
copyValues(that, extra)
that
}

/**
* Extracts the embedded default param values and user-supplied values, and then merges them with
* extra values from input into a flat param map, where the latter value is used if there exist
* conflicts, i.e., with ordering: default param values < user-supplied values < extraParamMap.
* conflicts, i.e., with ordering: default param values < user-supplied values < extra.
*/
final def extractParamMap(extraParamMap: ParamMap): ParamMap = {
defaultParamMap ++ paramMap ++ extraParamMap
final def extractParamMap(extra: ParamMap): ParamMap = {
defaultParamMap ++ paramMap ++ extra
}

/**
Expand Down
72 changes: 64 additions & 8 deletions python/pyspark/ml/param/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from abc import ABCMeta

import copy

from pyspark.ml.util import Identifiable


Expand Down Expand Up @@ -56,15 +58,20 @@ class Params(Identifiable):
#: internal param map for default values
defaultParamMap = {}

#: value returned by :py:func:`params`
_params = None

@property
def params(self):
"""
Returns all params ordered by name. The default implementation
uses :py:func:`dir` to get all attributes of type
:py:class:`Param`.
"""
return list(filter(lambda attr: isinstance(attr, Param),
[getattr(self, x) for x in dir(self) if x != "params"]))
if self._params is None:
self._params = list(filter(lambda attr: isinstance(attr, Param),
[getattr(self, x) for x in dir(self) if x != "params"]))
return self._params

def _explain(self, param):
"""
Expand Down Expand Up @@ -116,10 +123,18 @@ def hasDefault(self, param):

def isDefined(self, param):
"""
Checks whether a param is explicitly set by user or has a default value.
Checks whether a param is explicitly set by user or has
a default value.
"""
return self.isSet(param) or self.hasDefault(param)

def hasParam(self, paramName):
"""
Tests whether this instance contains a param with a given
(string) name.
"""
return self.params.count(paramName) != 0

def getOrDefault(self, param):
"""
Gets the value of a param in the user-supplied param map or its
Expand All @@ -135,21 +150,38 @@ def getOrDefault(self, param):
else:
raise KeyError("Cannot recognize %r as a param." % param)

def extractParamMap(self, extraParamMap={}):
def extractParamMap(self, extra={}):
"""
Extracts the embedded default param values and user-supplied
values, and then merges them with extra values from input into
a flat param map, where the latter value is used if there exist
conflicts, i.e., with ordering: default param values <
user-supplied values < extraParamMap.
:param extraParamMap: extra param values
user-supplied values < extra.
:param extra: extra param values
:return: merged param map
"""
paramMap = self.defaultParamMap.copy()
paramMap.update(self.paramMap)
paramMap.update(extraParamMap)
paramMap.update(extra)
return paramMap

def copy(self, extra={}):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. The default implementation creates a
shallow copy using :py:func:`copy.copy`, creates a deep copy of
the embedded paramMap, and then copies the embedded and extra
parameters over and returns the new instance.
Subclasses should override this method if the default approach
is not sufficient.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
that = copy.copy(self)
that.uid = that._generateUID()
that.paramMap = copy.deepcopy(self.paramMap)
return self._copyValues(that, extra)

def _shouldOwn(self, param):
"""
Validates that the input param belongs to this Params instance.
Expand All @@ -175,7 +207,8 @@ def _resolveParam(self, param):
@staticmethod
def _dummy():
"""
Returns a dummy Params instance used as a placeholder to generate docs.
Returns a dummy Params instance used as a placeholder to
generate docs.
"""
dummy = Params()
dummy.uid = "undefined"
Expand All @@ -196,3 +229,26 @@ def _setDefault(self, **kwargs):
for param, value in kwargs.items():
self.defaultParamMap[getattr(self, param)] = value
return self

def _copyValues(self, to, extra={}):
"""
Copies param values from this instance to another instance for
params shared by them.
:param to: the target instance
:param extra: extra params to be copied
:return: the target instance with param values copied
"""
paramMap = self.extractParamMap(extra)
for p in self.params:
if paramMap.has_key(p) and to.hasParam(p.name):
to._set((p.name, paramMap[p]))
return to

@staticmethod
def _copyParamMap(paramMap, to):
"""
Create a copy of the given ParamMap, but with parameter
:param paramMap:
:param to:
:return:
"""
63 changes: 51 additions & 12 deletions python/pyspark/ml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ def fit(self, dataset, params={}):
:param dataset: input dataset, which is an instance of
:py:class:`pyspark.sql.DataFrame`
:param params: an optional param map that overwrites embedded
:param params: an optional param map that overrides embedded
params
:returns: fitted model
"""
# NOTE: Implementing classes should begin with the following
# to handle params:
# if len(params) != 0:
# return self.copy(params).fit(dataset)
raise NotImplementedError()


Expand All @@ -63,10 +67,14 @@ def transform(self, dataset, params={}):
:param dataset: input dataset, which is an instance of
:py:class:`pyspark.sql.DataFrame`
:param params: an optional param map that overwrites embedded
:param params: an optional param map that overrides embedded
params
:returns: transformed dataset
"""
# NOTE: Implementing classes should begin with the following
# to handle params:
# if len(params) != 0:
# return self.copy(params).transform(dataset)
raise NotImplementedError()


Expand Down Expand Up @@ -136,8 +144,9 @@ def setParams(self, stages=[]):
return self._set(**kwargs)

def fit(self, dataset, params={}):
paramMap = self.extractParamMap(params)
stages = paramMap[self.stages]
if len(params) != 0:
return self.copy(params).fit(dataset)
stages = self.getStages()
for stage in stages:
if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)):
raise TypeError(
Expand All @@ -151,33 +160,59 @@ def fit(self, dataset, params={}):
if i <= indexOfLastEstimator:
if isinstance(stage, Transformer):
transformers.append(stage)
dataset = stage.transform(dataset, paramMap)
dataset = stage.transform(dataset)
else: # must be an Estimator
model = stage.fit(dataset, paramMap)
model = stage.fit(dataset)
transformers.append(model)
if i < indexOfLastEstimator:
dataset = model.transform(dataset, paramMap)
dataset = model.transform(dataset)
else:
transformers.append(stage)
return PipelineModel(transformers)

def copy(self, extra={}):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies each of the component
stages, creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
paramMap = self.extractParamMap(extra)
stages = map(lambda stage: stage.copy(extra), paramMap[self.stages])
return Pipeline().setStages(stages)


@inherit_doc
class PipelineModel(Model):
"""
Represents a compiled pipeline with transformers and fitted models.
"""

def __init__(self, transformers):
def __init__(self, stages):
super(PipelineModel, self).__init__()
self.transformers = transformers
self.stages = stages

def transform(self, dataset, params={}):
paramMap = self.extractParamMap(params)
for t in self.transformers:
dataset = t.transform(dataset, paramMap)
if len(params) != 0:
return self.copy(params).transform(dataset)
for t in self.stages:
dataset = t.transform(dataset)
return dataset

def copy(self, extra={}):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies each of the component
stages, creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
stages = map(lambda stage: stage.copy(extra), self.stages)
return PipelineModel(stages)


class Evaluator(Params):
"""
Expand All @@ -197,4 +232,8 @@ def evaluate(self, dataset, params={}):
params
:return: metric
"""
# NOTE: Implementing classes should begin with the following
# to handle params:
# if len(params) != 0:
# return self.copy(params).evaluate(dataset)
raise NotImplementedError()
2 changes: 2 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def test_params(self):
params = testParams.params
self.assertEqual(params, [inputCol, maxIter])

self.assertTrue(testParams.hasParam(maxIter))
self.assertTrue(testParams.hasDefault(maxIter))
self.assertFalse(testParams.isSet(maxIter))
self.assertTrue(testParams.isDefined(maxIter))
Expand All @@ -147,6 +148,7 @@ def test_params(self):
self.assertTrue(testParams.isSet(maxIter))
self.assertEquals(testParams.getMaxIter(), 100)

self.assertTrue(testParams.hasParam(inputCol))
self.assertFalse(testParams.hasDefault(inputCol))
self.assertFalse(testParams.isSet(inputCol))
self.assertFalse(testParams.isDefined(inputCol))
Expand Down
23 changes: 23 additions & 0 deletions python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ def fit(self, dataset, params={}):
bestModel = est.fit(dataset, epm[bestIndex])
return CrossValidatorModel(bestModel)

def copy(self, extra={}):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying estimator, creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
paramMap = self.extractParamMap(extra)
stages = map(lambda stage: stage.copy(extra), paramMap[self.stages])
return CrossValidator().setStages(stages)


class CrossValidatorModel(Model):
"""
Expand All @@ -243,6 +255,17 @@ def __init__(self, bestModel):
def transform(self, dataset, params={}):
return self.bestModel.transform(dataset, params)

def copy(self, extra={}):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying bestModel,
creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
return CrossValidatorModel(self.bestModel.copy(extra))


if __name__ == "__main__":
import doctest
Expand Down
12 changes: 9 additions & 3 deletions python/pyspark/ml/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@ class Identifiable(object):
"""

def __init__(self):
#: A unique id for the object. The default implementation
#: concatenates the class name, "_", and 8 random hex chars.
self.uid = type(self).__name__ + "_" + uuid.uuid4().hex[:8]
#: A unique id for the object.
self.uid = self._generateUID()

def __repr__(self):
return self.uid

def _generateUID(self):
"""
Generate a unique id for the object. The default implementation
concatenates the class name, "_", and 8 random hex chars.
"""
return type(self).__name__ + "_" + uuid.uuid4().hex[:8]
Loading

0 comments on commit 7431272

Please sign in to comment.