Skip to content

Commit

Permalink
[SPARK-3961] [MLlib] [PySpark] Python API for mllib.feature
Browse files Browse the repository at this point in the history
Added completed Python API for MLlib.feature

Normalizer
StandardScalerModel
StandardScaler
HashTF
IDFModel
IDF

cc mengxr

Author: Davies Liu <[email protected]>
Author: Davies Liu <[email protected]>

Closes #2819 from davies/feature and squashes the following commits:

4f48f48 [Davies Liu] add a note for HashingTF
67f6d21 [Davies Liu] address comments
b628693 [Davies Liu] rollback changes in Word2Vec
efb4f4f [Davies Liu] Merge branch 'master' into feature
806c7c2 [Davies Liu] address comments
3abb8c2 [Davies Liu] address comments
59781b9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into feature
a405ae7 [Davies Liu] fix tests
7a1891a [Davies Liu] fix tests
486795f [Davies Liu] update programming guide, HashTF -> HashingTF
8a50584 [Davies Liu] Python API for mllib.feature
  • Loading branch information
Davies Liu authored and mengxr committed Oct 28, 2014
1 parent 46c6341 commit fae095b
Show file tree
Hide file tree
Showing 6 changed files with 499 additions and 61 deletions.
85 changes: 85 additions & 0 deletions docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,49 @@ tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">

TF and IDF are implemented in [HashingTF](api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF)
and [IDF](api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF).
`HashingTF` takes an RDD of list as the input.
Each record could be an iterable of strings or other types.

{% highlight python %}
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF

sc = SparkContext()

# Load documents (one per line).
documents = sc.textFile("...").map(lambda line: line.split(" "))

hashingTF = HashingTF()
tf = hashingTF.transform(documents)
{% endhighlight %}

While applying `HashingTF` only needs a single pass to the data, applying `IDF` needs two passes:
first to compute the IDF vector and second to scale the term frequencies by IDF.

{% highlight python %}
from pyspark.mllib.feature import IDF

# ... continue from the previous example
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
{% endhighlight %}

MLLib's IDF implementation provides an option for ignoring terms which occur in less than a
minimum number of documents. In such cases, the IDF for these terms is set to 0. This feature
can be used by passing the `minDocFreq` value to the IDF constructor.

{% highlight python %}
# ... continue from the previous example
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
{% endhighlight %}
</div>
</div>

Expand Down Expand Up @@ -223,6 +264,29 @@ val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
{% endhighlight %}
</div>

<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)

# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))

# Without converting the features into dense vectors, transformation with zero mean will raise
# exception on sparse vector.
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler1.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
{% endhighlight %}
</div>
</div>

## Normalizer
Expand Down Expand Up @@ -267,4 +331,25 @@ val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
{% endhighlight %}
</div>

<div data-lang="python">
{% highlight python %}
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import Normalizer

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))

# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))

# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
{% endhighlight %}
</div>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.feature.Word2Vec
import org.apache.spark.mllib.feature.Word2VecModel
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
Expand Down Expand Up @@ -291,6 +290,43 @@ class PythonMLLibAPI extends Serializable {
ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha)
}

/**
* Java stub for Normalizer.transform()
*/
def normalizeVector(p: Double, vector: Vector): Vector = {
new Normalizer(p).transform(vector)
}

/**
* Java stub for Normalizer.transform()
*/
def normalizeVector(p: Double, rdd: JavaRDD[Vector]): JavaRDD[Vector] = {
new Normalizer(p).transform(rdd)
}

/**
* Java stub for IDF.fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
def fitStandardScaler(
withMean: Boolean,
withStd: Boolean,
data: JavaRDD[Vector]): StandardScalerModel = {
new StandardScaler(withMean, withStd).fit(data.rdd)
}

/**
* Java stub for IDF.fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
def fitIDF(minDocFreq: Int, dataset: JavaRDD[Vector]): IDFModel = {
new IDF(minDocFreq).fit(dataset)
}

/**
* Java stub for Python mllib Word2Vec fit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
Expand Down Expand Up @@ -328,6 +364,15 @@ class PythonMLLibAPI extends Serializable {
model.transform(word)
}

/**
* Transforms an RDD of words to its vector representation
* @param rdd an RDD of words
* @return an RDD of vector representations of words
*/
def transform(rdd: JavaRDD[String]): JavaRDD[Vector] = {
rdd.rdd.map(model.transform)
}

def findSynonyms(word: String, num: Int): java.util.List[java.lang.Object] = {
val vec = transform(word)
findSynonyms(vec, num)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.feature

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -48,4 +49,14 @@ trait VectorTransformer extends Serializable {
data.map(x => this.transform(x))
}

/**
* Applies transformation on an JavaRDD[Vector].
*
* @param data JavaRDD[Vector] to be transformed.
* @return transformed JavaRDD[Vector].
*/
def transform(data: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(data.rdd)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ class Word2VecModel private[mllib] (
throw new IllegalStateException(s"$word not in vocabulary")
}
}

/**
* Find synonyms of a word
* @param word a word
Expand All @@ -443,7 +443,7 @@ class Word2VecModel private[mllib] (
val vector = transform(word)
findSynonyms(vector,num)
}

/**
* Find synonyms of the vector representation of a word
* @param vector vector representation of a word
Expand Down
Loading

0 comments on commit fae095b

Please sign in to comment.