Skip to content

Commit

Permalink
passed compile
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 26, 2014
1 parent 834ada2 commit 1859701
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,74 +114,125 @@ class PythonMLLibAPI extends Serializable {
java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), x.slice(1, x.length))
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val initialWeights = deserializeDoubleVector(initialWeightsBA)
val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.weights))
ret.add(serializeDoubleVector(model.weights.toArray))
ret.add(model.intercept: java.lang.Double)
ret
}

/**
* Java stub for Python mllib LinearRegressionWithSGD.train()
*/
def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
def trainLinearRegressionModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LinearRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LinearRegressionWithSGD.train(
data,
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib LassoWithSGD.train()
*/
def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainLassoModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LassoWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LassoWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib RidgeRegressionWithSGD.train()
*/
def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainRidgeModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
RidgeRegressionWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib SVMWithSGD.train()
*/
def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
def trainSVMModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
SVMWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
SVMWithSGD.train(
data,
numIterations,
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
* Java stub for Python mllib LogisticRegressionWithSGD.train()
*/
def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
def trainLogisticRegressionModelWithSGD(
dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
trainRegressionModel((data, initialWeights) =>
LogisticRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
trainRegressionModel(
(data, initialWeights) =>
LogisticRegressionWithSGD.train(
data,
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
dataBytesJRDD,
initialWeightsBA)
}

/**
Expand All @@ -192,7 +243,7 @@ class PythonMLLibAPI extends Serializable {
{
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), x.slice(1, x.length))
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.classification

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vector

trait ClassificationModel extends Serializable {
/**
Expand All @@ -26,13 +27,13 @@ trait ClassificationModel extends Serializable {
* @param testData RDD representing data points to be predicted
* @return RDD[Int] where each entry contains the corresponding prediction
*/
def predict(testData: RDD[Array[Double]]): RDD[Double]
def predict(testData: RDD[Vector]): RDD[Double]

/**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Int prediction from the trained model
*/
def predict(testData: Array[Double]): Double
def predict(testData: Vector): Double
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.DataValidators

import org.jblas.DoubleMatrix
import org.apache.spark.mllib.linalg.Vector

/**
* Classification model trained using Logistic Regression.
Expand All @@ -35,14 +34,14 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class LogisticRegressionModel(
override val weights: Array[Double],
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with ClassificationModel with Serializable {

override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
round(1.0/ (1.0 + math.exp(margin * -1)))
}
}
Expand Down Expand Up @@ -73,7 +72,7 @@ class LogisticRegressionWithSGD private (
*/
def this() = this(1.0, 100, 0.0, 1.0)

def createModel(weights: Array[Double], intercept: Double) = {
def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
}
}
Expand Down Expand Up @@ -105,11 +104,9 @@ object LogisticRegressionWithSGD {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
{
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
input, initialWeights)
initialWeights: Vector): LogisticRegressionModel = {
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
.run(input, initialWeights)
}

/**
Expand All @@ -128,11 +125,9 @@ object LogisticRegressionWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LogisticRegressionModel =
{
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
input)
miniBatchFraction: Double): LogisticRegressionModel = {
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
.run(input)
}

/**
Expand All @@ -150,9 +145,7 @@ object LogisticRegressionWithSGD {
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double)
: LogisticRegressionModel =
{
stepSize: Double): LogisticRegressionModel = {
train(input, numIterations, stepSize, 1.0)
}

Expand All @@ -168,9 +161,7 @@ object LogisticRegressionWithSGD {
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int)
: LogisticRegressionModel =
{
numIterations: Int): LogisticRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}

Expand All @@ -183,7 +174,7 @@ object LogisticRegressionWithSGD {
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Weights: " + model.weights)
println("Intercept: " + model.intercept)

sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vector

/**
* Model for Naive Bayes Classifiers.
Expand All @@ -39,9 +40,11 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
private val _theta = new DoubleMatrix(theta)

def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)

def predict(testData: Array[Double]): Double = {
override def predict(testData: Vector): Double = predict(testData.toArray)

private def predict(testData: Array[Double]): Double = {
val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
val result = _pi.add(_theta.mmul(dataMatrix))
result.argmax()
Expand Down Expand Up @@ -70,17 +73,26 @@ class NaiveBayes private (var lambda: Double)
/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
* @param data RDD of (label, array of features) pairs.
* @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
def run(data: RDD[LabeledPoint]) = {
runRaw(data.map(v => (v.label, v.features.toArray)))
}

/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
* @param data RDD of (label, array of features) pairs.
*/
private def runRaw(data: RDD[(Double, Array[Double])]) = {
// Aggregates all sample points to driver side to get sample count and summed feature vector
// for each label. The shape of `zeroCombiner` & `aggregated` is:
//
// label: Int -> (count: Int, featuresSum: DoubleMatrix)
val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
point match {
case LabeledPoint(label, features) =>
case (label, features) =>
val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
val fs = new DoubleMatrix(features.length, 1, features: _*)
combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
Expand Down
Loading

0 comments on commit 1859701

Please sign in to comment.