Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19826][ML][PYTHON]add spark.ml Python API for PIC #21119

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has
def getNeighborsCol: String = $(neighborsCol)

/**
* Param for the name of the input column for neighbors in the adjacency list representation.
* Param for the name of the input column for non-negative weights (similarities) of edges
* between the vertex in `idCol` and each neighbor in `neighborsCol`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

* Default: "similarities"
* @group param
*/
@Since("2.4.0")
val similaritiesCol = new Param[String](this, "similaritiesCol",
"Name of the input column for neighbors in the adjacency list representation.",
"Name of the input column for non-negative weights (similarities) of edges between the " +
"vertex in `idCol` and each neighbor in `neighborsCol`.",
(value: String) => value.nonEmpty)

setDefault(similaritiesCol, "similarities")
Expand Down
209 changes: 204 additions & 5 deletions python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

from pyspark import since, keyword_only
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaTransformer, JavaWrapper
from pyspark.ml.param.shared import *
from pyspark.ml.common import inherit_doc

__all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
'KMeans', 'KMeansModel',
'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering']


class ClusteringSummary(JavaWrapper):
Expand Down Expand Up @@ -836,7 +836,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter

Terminology:

- "term" = "word": an el
- "term" = "word": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over terms representing some concept
- "document": one piece of text, corresponding to one row in the input data
Expand Down Expand Up @@ -938,7 +938,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
"""
super(LDA, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
Expand Down Expand Up @@ -967,7 +967,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)

Sets params for LDA.
"""
Expand Down Expand Up @@ -1156,6 +1156,205 @@ def getKeepLastCheckpoint(self):
return self.getOrDefault(self.keepLastCheckpoint)


@inherit_doc
class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, JavaParams,
JavaMLReadable, JavaMLWritable):
"""
.. note:: Experimental

Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
<a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract:
PIC finds a very low-dimensional embedding of a dataset using truncated power
iteration on a normalized pair-wise similarity matrix of the data.

PIC takes an affinity matrix between items (or vertices) as input. An affinity matrix
is a symmetric matrix whose entries are non-negative similarities between items.
PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row
includes:

- :py:attr:`idCol`: vertex ID
- :py:attr:`neighborsCol`: neighbors of vertex in :py:attr:`idCol`
- :py:attr:`similaritiesCol`: non-negative weights (similarities) of edges between the
vertex in :py:attr:`idCol` and each neighbor in :py:attr:`neighborsCol`

PIC returns a cluster assignment for each input vertex. It appends a new column
:py:attr:`predictionCol` containing the cluster assignment in :py:attr:`[0,k)` for
each row (vertex).

.. note::

- [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation.
Transform runs the iterative PIC algorithm to cluster the whole input dataset.
- Input validation: This validates that similarities are non-negative but does NOT validate
that the input matrix is symmetric.

.. seealso:: `Wikipedia on Spectral clustering \
<http://en.wikipedia.org/wiki/Spectral_clustering>`_

>>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType
>>> similarities = [((long)(1), [0], [0.5]), ((long)(2), [0, 1], [0.7,0.5]), \
((long)(3), [0, 1, 2], [0.9, 0.7, 0.5]), \
((long)(4), [0, 1, 2, 3], [1.1, 0.9, 0.7,0.5]), \
((long)(5), [0, 1, 2, 3, 4], [1.3, 1.1, 0.9, 0.7,0.5])]
>>> rdd = sc.parallelize(similarities, 2)
>>> schema = StructType([StructField("id", LongType(), False), \
StructField("neighbors", ArrayType(LongType(), False), True), \
StructField("similarities", ArrayType(DoubleType(), False), True)])
>>> df = spark.createDataFrame(rdd, schema)
>>> pic = PowerIterationClustering()
>>> result = pic.setK(2).setMaxIter(10).transform(df)
>>> predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, result.prediction)
... .collect()]), key=lambda x: x[0])
>>> predictions[0]
(1, 1)
>>> predictions[1]
(2, 1)
>>> predictions[2]
(3, 0)
>>> predictions[3]
(4, 0)
>>> predictions[4]
(5, 0)
>>> pic_path = temp_path + "/pic"
>>> pic.save(pic_path)
>>> pic2 = PowerIterationClustering.load(pic_path)
>>> pic2.getK()
2
>>> pic2.getMaxIter()
10
>>> pic3 = PowerIterationClustering(k=4, initMode="degree")
>>> pic3.getIdCol()
'id'
>>> pic3.getK()
4
>>> pic3.getMaxIter()
20
>>> pic3.getInitMode()
'degree'

.. versionadded:: 2.4.0
"""

k = Param(Params._dummy(), "k",
"The number of clusters to create. Must be > 1.",
typeConverter=TypeConverters.toInt)
initMode = Param(Params._dummy(), "initMode",
"The initialization algorithm. This can be either " +
"'random' to use a random vector as vertex properties, or 'degree' to use " +
"a normalized sum of similarities with other vertices. Supported options: " +
"'random' and 'degree'.",
typeConverter=TypeConverters.toString)
idCol = Param(Params._dummy(), "idCol",
"Name of the input column for vertex IDs.",
typeConverter=TypeConverters.toString)
neighborsCol = Param(Params._dummy(), "neighborsCol",
"Name of the input column for neighbors in the adjacency list " +
"representation.",
typeConverter=TypeConverters.toString)
similaritiesCol = Param(Params._dummy(), "similaritiesCol",
"Name of the input column for non-negative weights (similarities) " +
"of edges between the vertex in `idCol` and each neighbor in " +
"`neighborsCol`",
typeConverter=TypeConverters.toString)

@keyword_only
def __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
"""
__init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities")
"""
super(PowerIterationClustering, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.clustering.PowerIterationClustering", self.uid)
self._setDefault(k=2, maxIter=20, initMode="random", idCol="id", neighborsCol="neighbors",
similaritiesCol="similarities")
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
@since("2.4.0")
def setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"):
"""
setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\
idCol="id", neighborsCol="neighbors", similaritiesCol="similarities")
Sets params for PowerIterationClustering.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)

@since("2.4.0")
def setK(self, value):
"""
Sets the value of :py:attr:`k`.
"""
return self._set(k=value)

@since("2.4.0")
def getK(self):
"""
Gets the value of :py:attr:`k`.
"""
return self.getOrDefault(self.k)

@since("2.4.0")
def setInitMode(self, value):
"""
Sets the value of :py:attr:`initMode`.
"""
return self._set(initMode=value)

@since("2.4.0")
def getInitMode(self):
"""
Gets the value of `initMode`
"""
return self.getOrDefault(self.initMode)

@since("2.4.0")
def setIdCol(self, value):
"""
Sets the value of :py:attr:`idCol`.
"""
return self._set(idCol=value)

@since("2.4.0")
def getIdCol(self):
"""
Gets the value of :py:attr:`idCol`.
"""
return self.getOrDefault(self.idCol)

@since("2.4.0")
def setNeighborsCol(self, value):
"""
Sets the value of :py:attr:`neighborsCol`.
"""
return self._set(neighborsCol=value)

@since("2.4.0")
def getNeighborsCol(self):
"""
Gets the value of :py:attr:`neighborsCol`.
"""
return self.getOrDefault(self.neighborsCol)

@since("2.4.0")
def setSimilaritiesCol(self, value):
"""
Sets the value of :py:attr:`similaritiesCol`.
"""
return self._set(similaritiesCol=value)

@since("2.4.0")
def getSimilaritiesCol(self):
"""
Gets the value of :py:attr:`similaritiesCol`.
"""
return self.getOrDefault(self.similaritiesCol)


if __name__ == "__main__":
import doctest
import pyspark.ml.clustering
Expand Down
47 changes: 47 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,53 @@ def test_kmeans_cosine_distance(self):
self.assertTrue(result[4].prediction == result[5].prediction)


class PowerIterationClustering(SparkSessionTestCase):

def test_power_iteration_clustering(self):
from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType
from pyspark.ml.clustering import PowerIterationClustering
import math

def genCircle(r, n):
points = []
for i in range(0, n):
theta = 2.0 * math.pi * i / n
points.append((r * math.cos(theta), r * math.sin(theta)))
return points

def sim(x, y):
dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
return math.exp(-dist / 2.0)

r1 = 1.0
n1 = 10
r2 = 4.0
n2 = 40
n = n1 + n2
points = genCircle(r1, n1) + genCircle(r2, n2)
similarities = []
for i in range(1, n):
neighbor = []
weight = []
for j in range(i):
neighbor.append((long)(j))
weight.append(sim(points[i], points[j]))
similarities.append([(long)(i), neighbor, weight])
rdd = self.sc.parallelize(similarities, 2)
schema = StructType([StructField("id", LongType(), False),
StructField("neighbors", ArrayType(LongType(), False), True),
StructField("similarities", ArrayType(DoubleType(), False), True)])
df = self.spark.createDataFrame(rdd, schema)
pic = PowerIterationClustering()
result = pic.setK(2).setMaxIter(40).transform(df)
predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id,
result.prediction).collect()]), key=lambda x: x[0])
for i in range(0, 8):
self.assertEqual(predictions[i], (i+1, 1))
for i in range(9, 48):
self.assertEqual(predictions[i], (i+1, 0))


class OneVsRestTests(SparkSessionTestCase):

def test_copy(self):
Expand Down