Skip to content

Commit

Permalink
one pass over APIs of GLMs, NaiveBayes, and ALS
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Apr 7, 2014
1 parent f21d862 commit 86b9e34
Show file tree
Hide file tree
Showing 23 changed files with 157 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD

/**
* <span class="badge badge-red" style="float: right;">DEVELOPER API</span>
*
* The Java stubs necessary for the Python mllib bindings.
* Users should not call the methods defined in this class directly.
*/
class PythonMLLibAPI extends Serializable {
private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class LogisticRegressionModel(
this
}

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
override protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
val score = 1.0/ (1.0 + math.exp(-margin))
Expand All @@ -70,28 +70,28 @@ class LogisticRegressionModel(
* Train a classification model for Logistic Regression using Stochastic Gradient Descent.
* NOTE: Labels used in Logistic Regression should be {0, 1}
*/
class LogisticRegressionWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
class LogisticRegressionWithSGD (
private var stepSize: Double,
private var numIterations: Int,
private var regParam: Double,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

val gradient = new LogisticGradient()
val updater = new SimpleUpdater()
private val gradient = new LogisticGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
override val validators = List(DataValidators.classificationLabels)
override protected val validators = List(DataValidators.binaryLabelValidator)

/**
* Construct a LogisticRegression object with default parameters
*/
def this() = this(1.0, 100, 0.0, 1.0)

def createModel(weights: Vector, intercept: Double) = {
override protected def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ class NaiveBayesModel(
private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)

var i = 0
while (i < theta.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j += 1
{
// Need to put an extra pair of braces to prevent Scala treat `i` as a member.
var i = 0
while (i < theta.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j += 1
}
i += 1
}
i += 1
}

override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
Expand All @@ -65,7 +68,7 @@ class NaiveBayesModel(
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*/
class NaiveBayes private (var lambda: Double) extends Serializable with Logging {
class NaiveBayes (private var lambda: Double) extends Serializable with Logging {

def this() = this(1.0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class SVMModel(
this
}

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double) = {
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
threshold match {
Expand All @@ -69,29 +71,28 @@ class SVMModel(
* Train a Support Vector Machine (SVM) using Stochastic Gradient Descent.
* NOTE: Labels used in SVM should be {0, 1}.
*/
class SVMWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
class SVMWithSGD(
private var stepSize: Double,
private var numIterations: Int,
private var regParam: Double,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {

val gradient = new HingeGradient()
val updater = new SquaredL2Updater()
private val gradient = new HingeGradient()
private val updater = new SquaredL2Updater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)

override val validators = List(DataValidators.classificationLabels)
override protected val validators = List(DataValidators.binaryLabelValidator)

/**
* Construct a SVM object with default parameters
*/
def this() = this(1.0, 100, 1.0, 1.0)

def createModel(weights: Vector, intercept: Double) = {
override protected def createModel(weights: Vector, intercept: Double) = {
new SVMModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ import org.apache.spark.util.random.XORShiftRandom
* This is an iterative algorithm that will make multiple passes over the data, so any RDDs given
* to it should be cached by the user.
*/
class KMeans private (
var k: Int,
var maxIterations: Int,
var runs: Int,
var initializationMode: String,
var initializationSteps: Int,
var epsilon: Double) extends Serializable with Logging {
class KMeans(
private var k: Int,
private var maxIterations: Int,
private var runs: Int,
private var initializationMode: String,
private var initializationSteps: Int,
private var epsilon: Double) extends Serializable with Logging {
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)

/** Set the number of clusters to create (k). Default: 2. */
Expand Down Expand Up @@ -71,6 +71,8 @@ class KMeans private (
}

/**
* <span class="badge" style="float: right; background-color: #257080;">EXPERIMENTAL</span>
*
* Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
Expand Down
19 changes: 11 additions & 8 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ trait Vector extends Serializable {

/**
* Factory methods for [[org.apache.spark.mllib.linalg.Vector]].
* We don't use the name `Vector` because Scala imports
* [[scala.collection.immutable.Vector]] by default.
*/
object Vectors {

/**
* Creates a dense vector.
* Creates a dense vector from its values.
*/
@varargs
def dense(firstValue: Double, otherValues: Double*): Vector =
Expand Down Expand Up @@ -158,20 +160,21 @@ class DenseVector(val values: Array[Double]) extends Vector {
/**
* A sparse vector represented by an index array and an value array.
*
* @param n size of the vector.
* @param size size of the vector.
* @param indices index array, assume to be strictly increasing.
* @param values value array, must have the same length as the index array.
*/
class SparseVector(val n: Int, val indices: Array[Int], val values: Array[Double]) extends Vector {

override def size: Int = n
class SparseVector(
override val size: Int,
val indices: Array[Int],
val values: Array[Double]) extends Vector {

override def toString: String = {
"(" + n + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
"(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
}

override def toArray: Array[Double] = {
val data = new Array[Double](n)
val data = new Array[Double](size)
var i = 0
val nnz = indices.length
while (i < nnz) {
Expand All @@ -181,5 +184,5 @@ class SparseVector(val n: Int, val indices: Array[Int], val values: Array[Double
data
}

private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, n)
private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import breeze.linalg.{axpy => brzAxpy}
import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Class used to compute the gradient for a loss function, given a single data point.
*/
abstract class Gradient extends Serializable {
Expand Down Expand Up @@ -51,6 +53,8 @@ abstract class Gradient extends Serializable {
}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Compute gradient and loss for a logistic loss function, as used in binary classification.
* See also the documentation for the precise formulation.
*/
Expand Down Expand Up @@ -92,6 +96,8 @@ class LogisticGradient extends Gradient {
}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Compute gradient and loss for a Least-squared loss function, as used in linear regression.
* This is correct for the averaged least squares loss function (mean squared error)
* L = 1/n ||A weights-y||^2
Expand Down Expand Up @@ -124,6 +130,8 @@ class LeastSquaresGradient extends Gradient {
}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
* See also the documentation for the precise formulation.
* NOTE: This assumes that the labels are {0,1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ package org.apache.spark.mllib.optimization

import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{Vector => BV, DenseVector => BDV}
import breeze.linalg.{DenseVector => BDV}

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Class used to solve an optimization problem using Gradient Descent.
* @param gradient Gradient function to be used.
* @param updater Updater to be used to update weights after every iteration.
*/
class GradientDescent(var gradient: Gradient, var updater: Updater)
class GradientDescent(private var gradient: Gradient, private var updater: Updater)
extends Optimizer with Logging
{
private var stepSize: Double = 1.0
Expand Down Expand Up @@ -107,7 +109,11 @@ class GradientDescent(var gradient: Gradient, var updater: Updater)

}

// Top-level method to run gradient descent.
/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Top-level method to run gradient descent.
*/
object GradientDescent extends Logging {
/**
* Run stochastic gradient descent (SGD) in parallel using mini batches.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import org.apache.spark.rdd.RDD

import org.apache.spark.mllib.linalg.Vector

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Trait for optimization problem solvers.
*/
trait Optimizer extends Serializable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import breeze.linalg.{norm => brzNorm, axpy => brzAxpy, Vector => BV}
import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Class used to perform steps (weight update) using Gradient Descent methods.
*
* For general minimization problems, or for regularized problems of the form
Expand Down Expand Up @@ -59,6 +61,8 @@ abstract class Updater extends Serializable {
}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* A simple updater for gradient descent *without* any regularization.
* Uses a step-size decreasing with the square root of the number of iterations.
*/
Expand All @@ -78,6 +82,8 @@ class SimpleUpdater extends Updater {
}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Updater for L1 regularized problems.
* R(w) = ||w||_1
* Uses a step-size decreasing with the square root of the number of iterations.
Expand Down Expand Up @@ -120,6 +126,8 @@ class L1Updater extends Updater {
}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
*
* Updater for L2 regularized problems.
* R(w) = 1/2 ||w||^2
* Uses a step-size decreasing with the square root of the number of iterations.
Expand Down
Loading

0 comments on commit 86b9e34

Please sign in to comment.