Skip to content

Commit

Permalink
SPARK-3278 added initial version of Isotonic regression algorithm inc…
Browse files Browse the repository at this point in the history
…luding proposed API
  • Loading branch information
zapletal-martin committed Nov 19, 2014
1 parent c6e0c2a commit 3de71d0
Show file tree
Hide file tree
Showing 2 changed files with 388 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

sealed trait MonotonicityConstraint {
def holds(current: LabeledPoint, next: LabeledPoint): Boolean
}

case object Isotonic extends MonotonicityConstraint {
override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = {
current.label <= next.label
}
}
case object Antitonic extends MonotonicityConstraint {
override def holds(current: LabeledPoint, next: LabeledPoint): Boolean = {
current.label >= next.label
}
}

/**
* Regression model for Isotonic regression
*
* @param predictions Weights computed for every feature.
*/
class IsotonicRegressionModel(
val predictions: Seq[LabeledPoint],
val monotonicityConstraint: MonotonicityConstraint)
extends RegressionModel {
override def predict(testData: RDD[Vector]): RDD[Double] =
testData.map(predict)

//take the highest of elements smaller than our feature or weight with lowest feature
override def predict(testData: Vector): Double =
(predictions.head +: predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
}

trait IsotonicRegressionAlgorithm
extends Serializable {

protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
}

class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {

override def run(input: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel =
createModel(parallelPoolAdjacentViolators(input, monotonicityConstraint), monotonicityConstraint)

override def run(input: RDD[LabeledPoint], initialWeights: Vector, monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = ???

override protected def createModel(weights: Seq[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel =
new IsotonicRegressionModel(weights, monotonicityConstraint)

//Performs a pool adjacent violators algorithm (PAVA)
//Uses approach with single processing of data where violators in previously processed
//data created by pooling are fixed immediatelly.
//Uses optimization of discovering monotonicity violating sequences
//Method in situ mutates input array
private def poolAdjacentViolators(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {

//Pools sub array within given bounds assigning weighted average value to all elements
def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
val poolSubArray = in.slice(start, end + 1)

val weightedSum = poolSubArray.map(_.label).sum
val weight = poolSubArray.length

for(i <- start to end) {
in(i) = LabeledPoint(weightedSum / weight, in(i).features)
}
}

var i = 0

while(i < in.length) {
var j = i

//find monotonicity violating sequence, if any
while(j < in.length - 1 && !monotonicityConstraint.holds(in(j), in(j + 1))) {
j = j + 1
}

//if monotonicity was not violated, move to next data point
if(i == j) {
i = i + 1
} else {
//otherwise pool the violating sequence
//and check if pooling caused monotonicity violation in previously processed points
while (i >= 0 && !monotonicityConstraint.holds(in(i), in(i + 1))) {
pool(in, i, j)
i = i - 1
}

i = j
}
}

in
}

private def parallelPoolAdjacentViolators(testData: RDD[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Seq[LabeledPoint] = {
poolAdjacentViolators(
testData
.sortBy(_.features.toArray.head)
.mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator)
.collect(), monotonicityConstraint)
}
}

/**
* Top-level methods for calling IsotonicRegression.
*/
object IsotonicRegression {

/**
* Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
* in gradient descent are initialized using the initial weights provided.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
initialWeights: Vector,
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, initialWeights, monotonicityConstraint)
}

/**
* Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
*/
def train(
input: RDD[LabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, monotonicityConstraint)
}
}

/*def functionalOption(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
def pool2(in: Array[LabeledPoint]): Array[LabeledPoint] =
in.map(p => LabeledPoint(in.map(_.label).sum / in.length, p.features))
def iterate(checked: Array[LabeledPoint], remaining: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
if(remaining.size < 2) {
checked ++ remaining
} else {
val newRemaining = if(remaining.size == 2) Array[LabeledPoint]() else remaining.slice(2, remaining.length)
if(!monotonicityConstraint.holds(remaining.head, remaining.tail.head)) {
iterate(checked ++ pool2(remaining.slice(0, 2)), newRemaining, monotonicityConstraint)
} else {
iterate(checked ++ remaining.slice(0, 2), newRemaining, monotonicityConstraint)
}
}
}
iterate(Array(), in, monotonicityConstraint)
}
functionalOption(in, monotonicityConstraint)*/

/*def option1(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint) = {
def findMonotonicityViolators(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Unit = {
var j = start
while (j >= 1 && !monotonicityConstraint.holds(in(j - 1), in(j))) {
pool(in, j - 1, start + 1)
j = j - 1
}
}
for (i <- 0 to in.length - 1) {
findMonotonicityViolators(in, i, monotonicityConstraint)
}
in
}*/

/*
def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
val subArraySum = in.slice(start, end).map(_.label).sum
val subArrayLength = math.abs(start - end)
for(i <- start to end - 1) {
in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
}
}*/



/*
OPTION 2
def pool(in: Array[LabeledPoint], range: Range): Unit = {
val subArray = in.slice(range.start, range.end + 1)
val subArraySum = subArray.map(_.label).sum
val subArrayLength = subArray.length
for(i <- range.start to range.end) {
in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
}
}
def poolExtendedViolators(in: Array[LabeledPoint], range: Range, monotonicityConstraint: MonotonicityConstraint): Unit = {
var extendedRange = Range(range.start, range.end)
while (extendedRange.start >= 0 && !monotonicityConstraint.holds(in(extendedRange.start), in(extendedRange.start + 1))) {
pool(in, Range(extendedRange.start, extendedRange.end))
extendedRange = Range(extendedRange.start - 1, extendedRange.end)
}
}
def findViolatingSequence(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Option[Range] = {
var j = start
while(j < in.length - 1 && !monotonicityConstraint.holds(in(start), in(j + 1))) {
j = j + 1
}
if(j == start) {
None
} else {
Some(Range(start, j))
}
}
var i = 0;
while(i < in.length) {
findViolatingSequence(in, i, monotonicityConstraint).fold[Unit]({
i = i + 1
})(r => {
poolExtendedViolators(in, r, monotonicityConstraint)
i = r.end
})
}
in
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.scalatest.{Matchers, FunSuite}

import scala.util.Random

class IsotonicRegressionSuite
extends FunSuite
with MLlibTestSparkContext
with Matchers {

def generateDataPoints(labels: Double*): Seq[LabeledPoint] =
labels.zip((1 to labels.size)).map(point => LabeledPoint(point._1, Vectors.dense(point._2)))


test("increasing isotonic regression") {
val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(generateDataPoints(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
}

test("isotonic regression with size 0") {
val testRDD = sc.parallelize(List[LabeledPoint]()).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(List())
}

test("isotonic regression with size 1") {
val testRDD = sc.parallelize(generateDataPoints(1)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(generateDataPoints(1))
}

test("isotonic regression strictly increasing sequence") {
val testRDD = sc.parallelize(generateDataPoints(1, 2, 3, 4, 5)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(generateDataPoints(1, 2, 3, 4, 5))
}

test("isotonic regression strictly decreasing sequence") {
val testRDD = sc.parallelize(generateDataPoints(5, 4, 3, 2, 1)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predictions should be(generateDataPoints(3, 3, 3, 3, 3))
}

test("isotonic regression prediction") {
val testRDD = sc.parallelize(generateDataPoints(1, 2, 7, 1, 2)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Isotonic)

model.predict(Vectors.dense(0)) should be(1)
model.predict(Vectors.dense(2)) should be(2)
model.predict(Vectors.dense(3)) should be(10d/3)
model.predict(Vectors.dense(10)) should be(10d/3)
}

test("antitonic regression prediction") {
val testRDD = sc.parallelize(generateDataPoints(7, 5, 3, 5, 1)).cache()

val alg = new PoolAdjacentViolators
val model = alg.run(testRDD, Antitonic)

model.predict(Vectors.dense(0)) should be(7)
model.predict(Vectors.dense(2)) should be(5)
model.predict(Vectors.dense(3)) should be(4)
model.predict(Vectors.dense(10)) should be(1)
}
}

class IsotonicRegressionClusterSuite extends FunSuite with LocalClusterSparkContext {

test("task size should be small in both training and prediction") {
val m = 4
val n = 200000
val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) =>
val random = new Random(idx)
iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble()))))
}.cache()

// If we serialize data directly in the task closure, the size of the serialized task would be
// greater than 1MB and hence Spark would throw an error.
val model = IsotonicRegression.train(points, Isotonic)
val predictions = model.predict(points.map(_.features))
}
}

0 comments on commit 3de71d0

Please sign in to comment.