Skip to content

Commit

Permalink
SPARK-3278 changes after PR comments apache#3519
Browse files Browse the repository at this point in the history
  • Loading branch information
zapletal-martin committed Jan 21, 2015
1 parent ce0e30c commit fad4bf9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.mllib.regression

import org.apache.spark.api.java.{JavaRDD, JavaPairRDD}
import java.io.Serializable

import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.rdd.RDD

/**
Expand Down Expand Up @@ -46,8 +48,8 @@ class IsotonicRegressionModel (
* @param testData features to be labeled
* @return predicted labels
*/
def predict(testData: JavaRDD[java.lang.Double]): JavaRDD[java.lang.Double] =
testData.rdd.map(_.doubleValue()).map(predict).map(new java.lang.Double(_))
def predict(testData: JavaRDD[java.lang.Double]): JavaDoubleRDD =
JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]]))

/**
* Predict a single label
Expand All @@ -61,23 +63,12 @@ class IsotonicRegressionModel (
}

/**
* Base representing algorithm for isotonic regression
* Isotonic regression
* Currently implemented using oarallel pool adjacent violators algorithm for monotone regression
*/
trait IsotonicRegressionAlgorithm
class IsotonicRegression
extends Serializable {

/**
* Creates isotonic regression model with given parameters
*
* @param predictions labels estimated using isotonic regression algorithm.
* Used for predictions on new data points.
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
*/
protected def createModel(
predictions: Seq[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel

/**
* Run algorithm to obtain isotonic regression model
*
Expand All @@ -86,25 +77,22 @@ trait IsotonicRegressionAlgorithm
* @return isotonic regression model
*/
def run(
input: RDD[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel
}

/**
* Parallel pool adjacent violators algorithm for monotone regression
*/
class PoolAdjacentViolators private [mllib]
extends IsotonicRegressionAlgorithm {

override def run(
input: RDD[(Double, Double, Double)],
isotonic: Boolean = true): IsotonicRegressionModel = {
createModel(
parallelPoolAdjacentViolators(input, isotonic),
isotonic)
}

override protected def createModel(
/**
* Creates isotonic regression model with given parameters
*
* @param predictions labels estimated using isotonic regression algorithm.
* Used for predictions on new data points.
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
*/
protected def createModel(
predictions: Seq[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel = {
new IsotonicRegressionModel(predictions, isotonic)
Expand Down Expand Up @@ -132,31 +120,27 @@ class PoolAdjacentViolators private [mllib]
val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
val weight = poolSubArray.map(_._3).sum

for(i <- start to end) {
var i = start
while (i <= end) {
in(i) = (weightedSum / weight, in(i)._2, in(i)._3)
i = i + 1
}
}

val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y
val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y

def monotonicityConstraint(isotonic: Boolean) =
if(isotonic) isotonicConstraint else antitonicConstraint

val monotonicityConstraintHolds = monotonicityConstraint(isotonic)
val monotonicityConstraintHolds: (Double, Double) => Boolean =
(x, y) => if (isotonic) x <= y else x >= y

var i = 0

while(i < in.length) {
while (i < in.length) {
var j = i

// Find monotonicity violating sequence, if any
while(j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) {
while (j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) {
j = j + 1
}

// If monotonicity was not violated, move to next data point
if(i == j) {
if (i == j) {
i = i + 1
} else {
// Otherwise pool the violating sequence
Expand Down Expand Up @@ -212,7 +196,7 @@ object IsotonicRegression {
def train(
input: RDD[(Double, Double, Double)],
isotonic: Boolean = true): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, isotonic)
new IsotonicRegression().run(input, isotonic)
}

/**
Expand All @@ -227,7 +211,7 @@ object IsotonicRegression {
def train(
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
isotonic: Boolean): IsotonicRegressionModel = {
new PoolAdjacentViolators()
new IsotonicRegression()
.run(
input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), x._3.doubleValue())),
isotonic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

package org.apache.spark.mllib.regression;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.util.IsotonicDataGenerator;
import java.io.Serializable;
import java.util.List;

import scala.Tuple3;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple3;

import java.io.Serializable;
import java.util.List;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.util.IsotonicDataGenerator;

public class JavaIsotonicRegressionSuite implements Serializable {
private transient JavaSparkContext sc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.scalatest.{Matchers, FunSuite}
import scala.util.Random

import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.mllib.util.IsotonicDataGenerator._

class IsotonicRegressionSuite
Expand All @@ -32,26 +31,34 @@ class IsotonicRegressionSuite
Math.round(d * 100).toDouble / 100

test("increasing isotonic regression") {
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()
val trainRDD = sc.parallelize(
generateIsotonicInput(
1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
model.predictions should be(
generateIsotonicInput(
1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
}

test("increasing isotonic regression using api") {
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()
val trainRDD = sc.parallelize(
generateIsotonicInput(
1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()

val model = IsotonicRegression.train(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
model.predictions should be(
generateIsotonicInput(
1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
}

test("isotonic regression with size 0") {
val trainRDD = sc.parallelize(List[(Double, Double, Double)]()).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(List())
Expand All @@ -60,7 +67,7 @@ class IsotonicRegressionSuite
test("isotonic regression with size 1") {
val trainRDD = sc.parallelize(generateIsotonicInput(1)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1))
Expand All @@ -69,7 +76,7 @@ class IsotonicRegressionSuite
test("isotonic regression strictly increasing sequence") {
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
Expand All @@ -78,7 +85,7 @@ class IsotonicRegressionSuite
test("isotonic regression strictly decreasing sequence") {
val trainRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3))
Expand All @@ -87,7 +94,7 @@ class IsotonicRegressionSuite
test("isotonic regression with last element violating monotonicity") {
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3))
Expand All @@ -96,7 +103,7 @@ class IsotonicRegressionSuite
test("isotonic regression with first element violating monotonicity") {
val trainRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5))
Expand All @@ -105,7 +112,7 @@ class IsotonicRegressionSuite
test("isotonic regression with negative labels") {
val trainRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0))
Expand All @@ -114,45 +121,48 @@ class IsotonicRegressionSuite
test("isotonic regression with unordered input") {
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
}

test("weighted isotonic regression") {
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache()
val trainRDD = sc.parallelize(
generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2)))
model.predictions should be(
generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2)))
}

test("weighted isotonic regression with weights lower than 1") {
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache()
val trainRDD = sc.parallelize(
generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions.map(p => p.copy(_1 = round(p._1))) should be
(generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1)))
model.predictions.map(p => p.copy(_1 = round(p._1))) should be(
generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1)))
}

test("weighted isotonic regression with negative weights") {
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions.map(p => p.copy(_1 = round(p._1))) should be
(generateWeightedIsotonicInput(Seq(1, 10/6, 10/6, 10/6, 10/6), Seq(-1, 1, -3, 1, -5)))
model.predictions should be(
generateWeightedIsotonicInput(Seq(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6), Seq(-1, 1, -3, 1, -5)))
}

test("weighted isotonic regression with zero weights") {
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0)))
Expand All @@ -161,7 +171,7 @@ class IsotonicRegressionSuite
test("isotonic regression prediction") {
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predict(0) should be(1)
Expand All @@ -172,18 +182,18 @@ class IsotonicRegressionSuite

test("isotonic regression RDD prediction") {
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache()
val testRDD = sc.parallelize(List(0d, 2d, 3d, 10d)).cache()
val testRDD = sc.parallelize(List(0.0, 2.0, 3.0, 10.0)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, true)

model.predict(testRDD).collect() should be(Array(1, 2, 10d/3, 10d/3))
model.predict(testRDD).collect() should be(Array(1, 2, 10.0/3, 10.0/3))
}

test("antitonic regression prediction") {
val trainRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache()

val alg = new PoolAdjacentViolators
val alg = new IsotonicRegression
val model = alg.run(trainRDD, false)

model.predict(0) should be(7)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.mllib.util

import org.apache.spark.annotation.DeveloperApi
import scala.collection.JavaConversions._
import java.lang.{Double => JDouble}

import scala.collection.JavaConversions._

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Generate test data for Isotonic regresision.
Expand Down

0 comments on commit fad4bf9

Please sign in to comment.