From a20e2f4aa19b6a5105eef3df18c94cacbb0d90f9 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Thu, 23 Jul 2015 23:56:48 -0700 Subject: [PATCH 01/11] Add decay to StreamingLinearAlgorithm through StreamingDecay trait. Revise test "parameter accuracy" in StreamingLinearRegressionSuite to account for decay. --- .../mllib/regression/StreamingDecay.scala | 35 +++++++++++++++++++ .../regression/StreamingLinearAlgorithm.scala | 22 ++++++++++-- .../StreamingLinearRegressionSuite.scala | 1 + 3 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala new file mode 100644 index 0000000000000..002d708c8cd10 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -0,0 +1,35 @@ +package org.apache.spark.mllib.regression + +import org.apache.spark.Logging + +trait StreamingDecay[T <: StreamingDecay[T]] extends Logging{ + var decayFactor: Double = 1 + var timeUnit: String = StreamingDecay.BATCHES + + /** Set the decay factor directly (for forgetful algorithms). */ + def setDecayFactor(a: Double): T = { + this.decayFactor = a + this.asInstanceOf[T] + } + + /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ + def setHalfLife(halfLife: Double, timeUnit: String): T = { + if (timeUnit != StreamingDecay.BATCHES && timeUnit != StreamingDecay.POINTS) { + throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) + } + this.decayFactor = math.exp(math.log(0.5) / halfLife) + logInfo("Setting decay factor to: %g ".format (this.decayFactor)) + this.timeUnit = timeUnit + this.asInstanceOf[T] + } + + def getDiscount(numNewDataPoints: Long): Double = timeUnit match { + case StreamingDecay.BATCHES => decayFactor + case StreamingDecay.POINTS => math.pow(decayFactor, numNewDataPoints) + } +} + +object StreamingDecay { + final val BATCHES = "batches" + final val POINTS = "points" +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 73948b2d9851a..5d214c1235f30 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.api.java.JavaSparkContext.fakeClassTag -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{BLAS, Vector} import org.apache.spark.streaming.api.java.{JavaDStream, JavaPairDStream} import org.apache.spark.streaming.dstream.DStream @@ -59,11 +59,14 @@ import org.apache.spark.streaming.dstream.DStream @DeveloperApi abstract class StreamingLinearAlgorithm[ M <: GeneralizedLinearModel, - A <: GeneralizedLinearAlgorithm[M]] extends Logging { + A <: GeneralizedLinearAlgorithm[M]] + extends StreamingDecay[StreamingLinearAlgorithm[M,A]] with Logging { /** The model to be updated and used for prediction. */ protected var model: Option[M] + protected var previousDataWeight: Double = 0 + /** The algorithm to use for updating. */ protected val algorithm: A @@ -91,7 +94,20 @@ abstract class StreamingLinearAlgorithm[ } data.foreachRDD { (rdd, time) => if (!rdd.isEmpty) { - model = Some(algorithm.run(rdd, model.get.weights)) + val newModel = algorithm.run(rdd, model.get.weights) + + val numNewDataPoints = rdd.count() + val discount = getDiscount(numNewDataPoints) + + val updatedDataWeight = previousDataWeight * discount + numNewDataPoints + val lambda = numNewDataPoints / math.max(updatedDataWeight, 1e-16) + + BLAS.scal(lambda, newModel.weights) + BLAS.axpy(1-lambda, model.get.weights, newModel.weights) + + previousDataWeight = updatedDataWeight + model = Some(newModel) + logInfo(s"Model updated at time ${time.toString}") val display = model.get.weights.size match { case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...") diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 34c07ed170816..e6790671941b4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -63,6 +63,7 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { .setStepSize(0.2) .setNumIterations(25) .setConvergenceTol(0.0001) + .setDecayFactor(0.1) // generate sequence of simulated data val numBatches = 10 From d43c3a8ebdacba3cd1b2ff8701aad80e66e31874 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Fri, 24 Jul 2015 22:06:00 -0700 Subject: [PATCH 02/11] Fix fluent setter API Split StreamingDecay into two traits. Update StreamingLogisticRegressionWithSGD. Update test suites. --- .../StreamingLogisticRegressionWithSGD.scala | 3 ++- .../spark/mllib/regression/StreamingDecay.scala | 11 ++++++++--- .../mllib/regression/StreamingLinearAlgorithm.scala | 2 +- .../regression/StreamingLinearRegressionWithSGD.scala | 1 + .../StreamingLogisticRegressionSuite.scala | 2 ++ .../regression/StreamingLinearRegressionSuite.scala | 2 +- 6 files changed, 15 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala index 47bff5ebdde47..155eb339be878 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.classification import org.apache.spark.annotation.Since import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.StreamingLinearAlgorithm +import org.apache.spark.mllib.regression.{StreamingDecaySetter, StreamingLinearAlgorithm} /** * Train or predict a logistic regression model on streaming data. Training uses @@ -49,6 +49,7 @@ class StreamingLogisticRegressionWithSGD private[mllib] ( private var miniBatchFraction: Double, private var regParam: Double) extends StreamingLinearAlgorithm[LogisticRegressionModel, LogisticRegressionWithSGD] + with StreamingDecaySetter[StreamingLogisticRegressionWithSGD] with Serializable { /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index 002d708c8cd10..a9852c7a66520 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -2,14 +2,19 @@ package org.apache.spark.mllib.regression import org.apache.spark.Logging -trait StreamingDecay[T <: StreamingDecay[T]] extends Logging{ +trait StreamingDecay { + def getDiscount(numNewDataPoints: Long): Double +} + +trait StreamingDecaySetter[T <: StreamingDecaySetter[T]] extends Logging { + self: T => var decayFactor: Double = 1 var timeUnit: String = StreamingDecay.BATCHES /** Set the decay factor directly (for forgetful algorithms). */ def setDecayFactor(a: Double): T = { this.decayFactor = a - this.asInstanceOf[T] + this } /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ @@ -20,7 +25,7 @@ trait StreamingDecay[T <: StreamingDecay[T]] extends Logging{ this.decayFactor = math.exp(math.log(0.5) / halfLife) logInfo("Setting decay factor to: %g ".format (this.decayFactor)) this.timeUnit = timeUnit - this.asInstanceOf[T] + this } def getDiscount(numNewDataPoints: Long): Double = timeUnit match { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 5d214c1235f30..94b40c6affce9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -60,7 +60,7 @@ import org.apache.spark.streaming.dstream.DStream abstract class StreamingLinearAlgorithm[ M <: GeneralizedLinearModel, A <: GeneralizedLinearAlgorithm[M]] - extends StreamingDecay[StreamingLinearAlgorithm[M,A]] with Logging { + extends StreamingDecay with Logging { /** The model to be updated and used for prediction. */ protected var model: Option[M] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index fe2a46b9eecc7..0b08e5d63531d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -45,6 +45,7 @@ class StreamingLinearRegressionWithSGD private[mllib] ( private var numIterations: Int, private var miniBatchFraction: Double) extends StreamingLinearAlgorithm[LinearRegressionModel, LinearRegressionWithSGD] + with StreamingDecaySetter[StreamingLinearRegressionWithSGD] with Serializable { /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index d7b291d5a6330..c461be2edf6f6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -48,6 +48,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase // create model val model = new StreamingLogisticRegressionWithSGD() + .setDecayFactor(0.01) .setInitialWeights(Vectors.dense(0.0)) .setStepSize(0.2) .setNumIterations(25) @@ -144,6 +145,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase test("training and prediction") { // create model initialized with zero weights val model = new StreamingLogisticRegressionWithSGD() + .setDecayFactor(0.01) .setInitialWeights(Vectors.dense(-0.1)) .setStepSize(0.01) .setNumIterations(10) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index e6790671941b4..e0eacf5d33eac 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -59,11 +59,11 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { test("parameter accuracy") { // create model val model = new StreamingLinearRegressionWithSGD() + .setDecayFactor(0.1) .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.2) .setNumIterations(25) .setConvergenceTol(0.0001) - .setDecayFactor(0.1) // generate sequence of simulated data val numBatches = 10 From 05343289b5713e8a23e2b071e72bc17b2b0d06a0 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Thu, 6 Aug 2015 21:46:13 -0700 Subject: [PATCH 03/11] Add unit tests. Also make StreamingDecaySetter to be private[mllib]. --- .../mllib/regression/StreamingDecay.scala | 21 +++++- .../StreamingLogisticRegressionSuite.scala | 67 ++++++++++++++++++- .../StreamingLinearRegressionSuite.scala | 65 +++++++++++++++++- 3 files changed, 149 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index a9852c7a66520..101ca78c63777 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.regression import org.apache.spark.Logging @@ -6,9 +23,9 @@ trait StreamingDecay { def getDiscount(numNewDataPoints: Long): Double } -trait StreamingDecaySetter[T <: StreamingDecaySetter[T]] extends Logging { +private[mllib] trait StreamingDecaySetter[T <: StreamingDecaySetter[T]] extends Logging { self: T => - var decayFactor: Double = 1 + var decayFactor: Double = 0 var timeUnit: String = StreamingDecay.BATCHES /** Set the decay factor directly (for forgetful algorithms). */ diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index c461be2edf6f6..600ee93bdbf30 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -48,7 +48,6 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase // create model val model = new StreamingLogisticRegressionWithSGD() - .setDecayFactor(0.01) .setInitialWeights(Vectors.dense(0.0)) .setStepSize(0.2) .setNumIterations(25) @@ -186,4 +185,70 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase ) val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) } + + test("parameter accuracy with full memory (decayFactor = 1)") { + + val nPoints = 100 + + // create model + val model = new StreamingLogisticRegressionWithSGD() + .setDecayFactor(1) + .setInitialWeights(Vectors.dense(0.0)) + .setStepSize(0.5) + .setNumIterations(50) + + // generate sequence of simulated data + val numBatches = 20 + // the first few RDD's are generated under the model A + val inputA = (0 until (numBatches - 1)).map { i => + LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 42 * (i + 1)) + } + // the last RDD is generated under the model B + val inputB = LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) + val input = inputA :+ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with full memory, the final parameter estimates should be close to model A + assert(model.latestModel().weights(0) ~== 0.5 relTol 0.5) + + } + + test("parameter accuracy with no memory (decayFactor = 0)") { + + val nPoints = 100 + + // create model + val model = new StreamingLogisticRegressionWithSGD() + .setDecayFactor(0) + .setInitialWeights(Vectors.dense(0.0)) + .setStepSize(0.5) + .setNumIterations(50) + + // generate sequence of simulated data + val numBatches = 20 + // the first few RDD's are generated under the model A + val inputA = (0 until (numBatches - 1)).map { i => + LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 42 * (i + 1)) + } + // the last RDD is generated under the model B + val inputB = LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) + val input = inputA :+ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with no memory, the final parameter estimates should be close to model B + assert(model.latestModel().weights(0) ~== 1.5 relTol 0.5) + + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index e0eacf5d33eac..7aa8adbb44cc4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -59,7 +59,6 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { test("parameter accuracy") { // create model val model = new StreamingLinearRegressionWithSGD() - .setDecayFactor(0.1) .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.2) .setNumIterations(25) @@ -195,4 +194,68 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { ) val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) } + + test("parameter accuracy with full memory (decayFactor = 1)") { + // create model + val model = new StreamingLinearRegressionWithSGD() + .setDecayFactor(1) + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.5) + .setNumIterations(50) + .setConvergenceTol(0.0001) + + // generate sequence of simulated data + val numBatches = 10 + // the first few RDD's are generated under the model A + val inputA = (0 until (numBatches-1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) + } + // the last RDD is generated under the model B + val inputB = LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) + val input = inputA :+ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with full memory, the final parameter estimates should be close to model A + assertEqual(model.latestModel().intercept, 0.0, 1.0) + assertEqual(model.latestModel().weights(0), 10.0, 1.0) + assertEqual(model.latestModel().weights(1), 10.0, 1.0) + } + + test("parameter accuracy with no memory (decayFactor = 0)") { + // create model + val model = new StreamingLinearRegressionWithSGD() + .setDecayFactor(0) + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.5) + .setNumIterations(50) + .setConvergenceTol(0.0001) + + // generate sequence of simulated data + val numBatches = 10 + // the first few RDD's are generated under the model A + val inputA = (0 until (numBatches - 1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) + } + // the last RDD is generated under the model B + val inputB = LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) + val input = inputA :+ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with no memory, the final parameter estimates should be close to model B + assertEqual(model.latestModel().intercept, 0.0, 1.0) + assertEqual(model.latestModel().weights(0), 5.0, 1.0) + assertEqual(model.latestModel().weights(1), 3.0, 1.0) + } } From 999bebac403a70a3bc9d9177bebb7619b6087b06 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Thu, 6 Aug 2015 21:54:55 -0700 Subject: [PATCH 04/11] minor fixes --- .../org/apache/spark/mllib/regression/StreamingDecay.scala | 2 +- .../mllib/regression/StreamingLinearRegressionWithSGD.scala | 1 + .../mllib/classification/StreamingLogisticRegressionSuite.scala | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index 101ca78c63777..af0be263a219e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -54,4 +54,4 @@ private[mllib] trait StreamingDecaySetter[T <: StreamingDecaySetter[T]] extends object StreamingDecay { final val BATCHES = "batches" final val POINTS = "points" -} \ No newline at end of file +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index 0b08e5d63531d..933ee6d21d4eb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -106,4 +106,5 @@ class StreamingLinearRegressionWithSGD private[mllib] ( this.algorithm.optimizer.setConvergenceTol(tolerance) this } + } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index 600ee93bdbf30..d379d193a910b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -144,7 +144,6 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase test("training and prediction") { // create model initialized with zero weights val model = new StreamingLogisticRegressionWithSGD() - .setDecayFactor(0.01) .setInitialWeights(Vectors.dense(-0.1)) .setStepSize(0.01) .setNumIterations(10) From 98a8a5b5f42c0357367877a323d8076165bf8628 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Thu, 6 Aug 2015 22:26:04 -0700 Subject: [PATCH 05/11] fix Scala style --- .../org/apache/spark/mllib/regression/StreamingDecay.scala | 2 +- .../classification/StreamingLogisticRegressionSuite.scala | 6 ++++-- .../mllib/regression/StreamingLinearRegressionSuite.scala | 6 ++++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index af0be263a219e..65f53a43193b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -44,7 +44,7 @@ private[mllib] trait StreamingDecaySetter[T <: StreamingDecaySetter[T]] extends this.timeUnit = timeUnit this } - + def getDiscount(numNewDataPoints: Long): Double = timeUnit match { case StreamingDecay.BATCHES => decayFactor case StreamingDecay.POINTS => math.pow(decayFactor, numNewDataPoints) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index d379d193a910b..d2e078c1fa946 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -203,7 +203,8 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 42 * (i + 1)) } // the last RDD is generated under the model B - val inputB = LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) + val inputB = + LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) val input = inputA :+ inputB // apply model training to input stream @@ -236,7 +237,8 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 42 * (i + 1)) } // the last RDD is generated under the model B - val inputB = LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) + val inputB = + LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) val input = inputA :+ inputB // apply model training to input stream diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 7aa8adbb44cc4..ccbb2bca0cd3e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -211,7 +211,8 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) } // the last RDD is generated under the model B - val inputB = LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) + val inputB = + LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) val input = inputA :+ inputB // apply model training to input stream @@ -243,7 +244,8 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) } // the last RDD is generated under the model B - val inputB = LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) + val inputB = + LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) val input = inputA :+ inputB // apply model training to input stream From 7915a12574e9a3f9714995b1b79f922acf346c39 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Wed, 26 Aug 2015 23:30:32 -0700 Subject: [PATCH 06/11] incorporating comments Add ScalaDoc for public API. Add ScalaDoc to decribe the forgetful algorithm in StreamingLinearAlgorithm. Remove F-polymorphism in StreamingDecaySetter[T]. decayFactor and timeUnit in StreamingDecaySetter[T] are now private. Remove division by zero in trainOn of StreamingLinearAlgorithm; provide comments to explains why. Improve testing cases of StreamingLogisticRegressionSuite to have rel tol=0.1. --- .../StreamingLogisticRegressionWithSGD.scala | 5 ++ .../mllib/regression/StreamingDecay.scala | 73 +++++++++++++++++-- .../regression/StreamingLinearAlgorithm.scala | 21 +++++- .../StreamingLinearRegressionWithSGD.scala | 5 ++ .../StreamingLogisticRegressionSuite.scala | 24 +++--- 5 files changed, 108 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala index 155eb339be878..ad29d89d334ec 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala @@ -31,6 +31,11 @@ import org.apache.spark.mllib.regression.{StreamingDecaySetter, StreamingLinearA * of features must be constant. An initial weight * vector must be provided. * + * This class inherits the forgetful algorithm from StreamingLinearAlgorithm + * to handle evolution of data source. Users can specify the degree of forgetfulness + * by the decay factor or the half-life. Refer to StreamingLinearAlgorithm for + * more details. + * * Use a builder pattern to construct a streaming logistic regression * analysis in an application, like: * diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index 65f53a43193b0..2b9f7376ebae8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -18,23 +18,63 @@ package org.apache.spark.mllib.regression import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +/** + * :: Experimental :: + * Supplies an interface for the discount value in + * the forgetful update rule in StreamingLinearAlgorithm. + * Actual implementation is provided in StreamingDecaySetter[T]. + */ +@Experimental trait StreamingDecay { + /** + * Derive the discount factor. + * + * @param numNewDataPoints number of data points for the RDD arriving at time t. + * @return Discount factor + */ def getDiscount(numNewDataPoints: Long): Double } -private[mllib] trait StreamingDecaySetter[T <: StreamingDecaySetter[T]] extends Logging { +/** + * :: Experimental :: + * StreamingDecaySetter provides the concrete implementation + * of getDiscount in StreamingDecay and setters for decay factor + * and half-life. + */ +@Experimental +private[mllib] trait StreamingDecaySetter[T] extends Logging { self: T => - var decayFactor: Double = 0 - var timeUnit: String = StreamingDecay.BATCHES + private var decayFactor: Double = 0 + private var timeUnit: String = StreamingDecay.BATCHES - /** Set the decay factor directly (for forgetful algorithms). */ - def setDecayFactor(a: Double): T = { - this.decayFactor = a + /** + * Set the decay factor for the forgetful algorithms. + * The decay factor should be between 0 and 1, inclusive. + * decayFactor = 0: only the data from the most recent RDD will be used. + * decayFactor = 1: all data since the beginning of the DStream will be used. + * decayFactor is default to zero. + * + * @param decayFactor the decay factor + */ + def setDecayFactor(decayFactor: Double): T = { + this.decayFactor = decayFactor this } - /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ + + /** + * Set the half life and time unit ("batches" or "points") for the forgetful algorithm. + * The half life along with the time unit provides an alternative way to specify decay factor. + * The decay factor is calculated such that, for data acquired at time t, + * its contribution by time t + halfLife will have dropped to 0.5. + * The unit of time can be specified either as batches or points; + * see StreamingDecay companion object. + * + * @param halfLife the half life + * @param timeUnit the time unit + */ def setHalfLife(halfLife: Double, timeUnit: String): T = { if (timeUnit != StreamingDecay.BATCHES && timeUnit != StreamingDecay.POINTS) { throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) @@ -45,13 +85,32 @@ private[mllib] trait StreamingDecaySetter[T <: StreamingDecaySetter[T]] extends this } + /** + * Derive the discount factor. + * + * @param numNewDataPoints number of data points for the RDD arriving at time t. + * @return Discount factor + */ def getDiscount(numNewDataPoints: Long): Double = timeUnit match { case StreamingDecay.BATCHES => decayFactor case StreamingDecay.POINTS => math.pow(decayFactor, numNewDataPoints) } } +/** + * :: Experimental :: + * Provides the String constants for allowed time unit in the forgetful algorithm. + */ +@Experimental object StreamingDecay { + /** + * Each RDD in the DStream will be treated as 1 time unit. + * + */ final val BATCHES = "batches" + /** + * Each data point will be treated as 1 time unit. + * + */ final val POINTS = "points" } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 94b40c6affce9..8d793ab4c7fa0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -39,6 +39,23 @@ import org.apache.spark.streaming.dstream.DStream * Only weights will be updated, not an intercept. If the model needs * an intercept, it should be manually appended to the input data. * + * StreamingLinearAlgorithm use the forgetful algorithm + * to dynamically adjust for evolution of data source. For each batch of data, + * we update the model estimates by: + * + * $$ \theta_{t+1} = \frac{theta_t n_t \alpha + \beta_t m_t}{n_t \alpha + m_t} $$ + * $$ n_{t+1} = n_t \alpha + m_t $$ + * + * where $\theta_t$ is the model estimate before the data arriving at time t; + * $n_t$ is the cumulative contribution of data arriving before time t; + * $\beta_t$ is the estimate using data arriving at time t along; + * $\m_t$ is the number of data point for data arriving at time t along; + * $\alpha$ is the discount factor, $\alpha=0$ only the data from the + * most recent RDD will be used, $\alpha=0$ all data since the beginning + * of the DStream will be used with equal contributions. + * + * This updating rule is analogous to an exponentially-weighted moving average. + * * For example usage, see `StreamingLinearRegressionWithSGD`. * * NOTE: In some use cases, the order in which trainOn and predictOn @@ -100,7 +117,9 @@ abstract class StreamingLinearAlgorithm[ val discount = getDiscount(numNewDataPoints) val updatedDataWeight = previousDataWeight * discount + numNewDataPoints - val lambda = numNewDataPoints / math.max(updatedDataWeight, 1e-16) + // updatedDataWeight >= 1 because rdd is not empty; + // no need to check division by zero in below + val lambda = numNewDataPoints / updatedDataWeight BLAS.scal(lambda, newModel.weights) BLAS.axpy(1-lambda, model.get.weights, newModel.weights) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index 933ee6d21d4eb..fd0dd841bda61 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -30,6 +30,11 @@ import org.apache.spark.mllib.linalg.Vector * of features must be constant. An initial weight * vector must be provided. * + * This class inherits the forgetful algorithm from StreamingLinearAlgorithm + * to handle evolution of data source. Users can specify the degree of forgetfulness + * by the decay factor or the half-life. Refer to StreamingLinearAlgorithm for + * more details. + * * Use a builder pattern to construct a streaming linear regression * analysis in an application, like: * diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index d2e078c1fa946..00a24d376286c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -187,24 +187,24 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase test("parameter accuracy with full memory (decayFactor = 1)") { - val nPoints = 100 + val nPoints = 600 // create model val model = new StreamingLogisticRegressionWithSGD() .setDecayFactor(1) .setInitialWeights(Vectors.dense(0.0)) - .setStepSize(0.5) - .setNumIterations(50) + .setStepSize(1) + .setNumIterations(100) // generate sequence of simulated data val numBatches = 20 // the first few RDD's are generated under the model A val inputA = (0 until (numBatches - 1)).map { i => - LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 42 * (i + 1)) + LogisticRegressionSuite.generateLogisticInput(0.0, 0.1, nPoints, 33 * (i + 1)) } // the last RDD is generated under the model B val inputB = - LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) + LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 33 * (numBatches + 1)) val input = inputA :+ inputB // apply model training to input stream @@ -215,30 +215,30 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase runStreams(ssc, numBatches, numBatches) // with full memory, the final parameter estimates should be close to model A - assert(model.latestModel().weights(0) ~== 0.5 relTol 0.5) + assert(model.latestModel().weights(0) ~== 0.1 relTol 0.1) } test("parameter accuracy with no memory (decayFactor = 0)") { - val nPoints = 100 + val nPoints = 600 // create model val model = new StreamingLogisticRegressionWithSGD() .setDecayFactor(0) .setInitialWeights(Vectors.dense(0.0)) - .setStepSize(0.5) - .setNumIterations(50) + .setStepSize(1) + .setNumIterations(100) // generate sequence of simulated data val numBatches = 20 // the first few RDD's are generated under the model A val inputA = (0 until (numBatches - 1)).map { i => - LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 42 * (i + 1)) + LogisticRegressionSuite.generateLogisticInput(0.0, 0.1, nPoints, 33 * (i + 1)) } // the last RDD is generated under the model B val inputB = - LogisticRegressionSuite.generateLogisticInput(0.0, 1.5, nPoints, 42 * (numBatches + 1)) + LogisticRegressionSuite.generateLogisticInput(0.0, 0.5, nPoints, 33 * (numBatches + 1)) val input = inputA :+ inputB // apply model training to input stream @@ -249,7 +249,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase runStreams(ssc, numBatches, numBatches) // with no memory, the final parameter estimates should be close to model B - assert(model.latestModel().weights(0) ~== 1.5 relTol 0.5) + assert(model.latestModel().weights(0) ~== 0.5 relTol 0.1) } } From 16227ab2a5c2472a374dc3cda2f5739f448b50a5 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Mon, 31 Aug 2015 22:10:31 -0700 Subject: [PATCH 07/11] incorporating further comments Refactor StreamingDecay Use case object for TimeUnit Clean up ScalaDoc --- .../StreamingLogisticRegressionWithSGD.scala | 18 ++++-- .../mllib/regression/StreamingDecay.scala | 55 +++++++------------ .../StreamingLinearRegressionWithSGD.scala | 17 +++++- 3 files changed, 47 insertions(+), 43 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala index ad29d89d334ec..fe1378923bbd7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala @@ -19,7 +19,8 @@ package org.apache.spark.mllib.classification import org.apache.spark.annotation.Since import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.{StreamingDecaySetter, StreamingLinearAlgorithm} +import org.apache.spark.mllib.regression.StreamingDecay.TimeUnit +import org.apache.spark.mllib.regression.StreamingLinearAlgorithm /** * Train or predict a logistic regression model on streaming data. Training uses @@ -31,9 +32,9 @@ import org.apache.spark.mllib.regression.{StreamingDecaySetter, StreamingLinearA * of features must be constant. An initial weight * vector must be provided. * - * This class inherits the forgetful algorithm from StreamingLinearAlgorithm + * This class inherits the forgetful algorithm from [[StreamingLinearAlgorithm]] * to handle evolution of data source. Users can specify the degree of forgetfulness - * by the decay factor or the half-life. Refer to StreamingLinearAlgorithm for + * by the decay factor or the half-life. Refer to [[StreamingLinearAlgorithm]] for * more details. * * Use a builder pattern to construct a streaming logistic regression @@ -54,7 +55,6 @@ class StreamingLogisticRegressionWithSGD private[mllib] ( private var miniBatchFraction: Double, private var regParam: Double) extends StreamingLinearAlgorithm[LogisticRegressionModel, LogisticRegressionWithSGD] - with StreamingDecaySetter[StreamingLogisticRegressionWithSGD] with Serializable { /** @@ -105,4 +105,14 @@ class StreamingLogisticRegressionWithSGD private[mllib] ( this.model = Some(algorithm.createModel(initialWeights, 0.0)) this } + + override def setDecayFactor(decayFactor: Double): this.type = { + super.setDecayFactor(decayFactor) + this + } + + override def setHalfLife(halfLife: Double, timeUnit: TimeUnit): this.type = { + super.setHalfLife(halfLife, timeUnit) + this + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index 2b9f7376ebae8..a254a90ab606c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -19,35 +19,21 @@ package org.apache.spark.mllib.regression import org.apache.spark.Logging import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.regression.StreamingDecay.{TimeUnit, BATCHES, POINTS} /** * :: Experimental :: - * Supplies an interface for the discount value in - * the forgetful update rule in StreamingLinearAlgorithm. - * Actual implementation is provided in StreamingDecaySetter[T]. + * Supply the discount value for the + * forgetful update rule in [[StreamingLinearAlgorithm]]; + * The degree of forgetfulness can be specified by the decay factor + * or the half life. + * */ @Experimental -trait StreamingDecay { - /** - * Derive the discount factor. - * - * @param numNewDataPoints number of data points for the RDD arriving at time t. - * @return Discount factor - */ - def getDiscount(numNewDataPoints: Long): Double -} +private[mllib] trait StreamingDecay extends Logging{ -/** - * :: Experimental :: - * StreamingDecaySetter provides the concrete implementation - * of getDiscount in StreamingDecay and setters for decay factor - * and half-life. - */ -@Experimental -private[mllib] trait StreamingDecaySetter[T] extends Logging { - self: T => - private var decayFactor: Double = 0 - private var timeUnit: String = StreamingDecay.BATCHES + private[this] var decayFactor: Double = 0 + private[this] var timeUnit: TimeUnit = BATCHES /** * Set the decay factor for the forgetful algorithms. @@ -58,7 +44,7 @@ private[mllib] trait StreamingDecaySetter[T] extends Logging { * * @param decayFactor the decay factor */ - def setDecayFactor(decayFactor: Double): T = { + def setDecayFactor(decayFactor: Double): this.type = { this.decayFactor = decayFactor this } @@ -68,17 +54,13 @@ private[mllib] trait StreamingDecaySetter[T] extends Logging { * Set the half life and time unit ("batches" or "points") for the forgetful algorithm. * The half life along with the time unit provides an alternative way to specify decay factor. * The decay factor is calculated such that, for data acquired at time t, - * its contribution by time t + halfLife will have dropped to 0.5. - * The unit of time can be specified either as batches or points; - * see StreamingDecay companion object. + * its contribution by time t + halfLife will have dropped by 0.5. + * The unit of time can be specified either as batches or points. * * @param halfLife the half life * @param timeUnit the time unit */ - def setHalfLife(halfLife: Double, timeUnit: String): T = { - if (timeUnit != StreamingDecay.BATCHES && timeUnit != StreamingDecay.POINTS) { - throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) - } + def setHalfLife(halfLife: Double, timeUnit: TimeUnit): this.type = { this.decayFactor = math.exp(math.log(0.5) / halfLife) logInfo("Setting decay factor to: %g ".format (this.decayFactor)) this.timeUnit = timeUnit @@ -91,9 +73,9 @@ private[mllib] trait StreamingDecaySetter[T] extends Logging { * @param numNewDataPoints number of data points for the RDD arriving at time t. * @return Discount factor */ - def getDiscount(numNewDataPoints: Long): Double = timeUnit match { - case StreamingDecay.BATCHES => decayFactor - case StreamingDecay.POINTS => math.pow(decayFactor, numNewDataPoints) + private[mllib] def getDiscount(numNewDataPoints: Long): Double = timeUnit match { + case BATCHES => decayFactor + case POINTS => math.pow(decayFactor, numNewDataPoints) } } @@ -103,14 +85,15 @@ private[mllib] trait StreamingDecaySetter[T] extends Logging { */ @Experimental object StreamingDecay { + sealed trait TimeUnit /** * Each RDD in the DStream will be treated as 1 time unit. * */ - final val BATCHES = "batches" + case object BATCHES extends TimeUnit /** * Each data point will be treated as 1 time unit. * */ - final val POINTS = "points" + case object POINTS extends TimeUnit } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index fd0dd841bda61..0d9ab765f6978 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.annotation.Since import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.StreamingDecay.TimeUnit /** * Train or predict a linear regression model on streaming data. Training uses @@ -30,9 +31,9 @@ import org.apache.spark.mllib.linalg.Vector * of features must be constant. An initial weight * vector must be provided. * - * This class inherits the forgetful algorithm from StreamingLinearAlgorithm + * This class inherits the forgetful algorithm from [[StreamingLinearAlgorithm]] * to handle evolution of data source. Users can specify the degree of forgetfulness - * by the decay factor or the half-life. Refer to StreamingLinearAlgorithm for + * by the decay factor or the half-life. Refer to [[StreamingLinearAlgorithm]] for * more details. * * Use a builder pattern to construct a streaming linear regression @@ -50,7 +51,6 @@ class StreamingLinearRegressionWithSGD private[mllib] ( private var numIterations: Int, private var miniBatchFraction: Double) extends StreamingLinearAlgorithm[LinearRegressionModel, LinearRegressionWithSGD] - with StreamingDecaySetter[StreamingLinearRegressionWithSGD] with Serializable { /** @@ -112,4 +112,15 @@ class StreamingLinearRegressionWithSGD private[mllib] ( this } + override def setDecayFactor(decayFactor: Double): this.type = { + super.setDecayFactor(decayFactor) + this + } + + override def setHalfLife(halfLife: Double, timeUnit: TimeUnit): this.type = { + super.setHalfLife(halfLife, timeUnit) + this + } + + } From 8605004821beffda7d3a132fd907c7ba2ef589a0 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Wed, 2 Sep 2015 08:51:32 -0700 Subject: [PATCH 08/11] Add tests for half-life and TimeUnit. --- .../mllib/regression/StreamingDecay.scala | 2 +- .../StreamingLinearRegressionSuite.scala | 136 ++++++++++++++++++ 2 files changed, 137 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index a254a90ab606c..152c66c4f6609 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -85,7 +85,7 @@ private[mllib] trait StreamingDecay extends Logging{ */ @Experimental object StreamingDecay { - sealed trait TimeUnit + private[mllib] sealed trait TimeUnit /** * Each RDD in the DStream will be treated as 1 time unit. * diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index ccbb2bca0cd3e..a0a2f34d8fa6e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.regression +import org.apache.spark.mllib.regression.StreamingDecay.{BATCHES, POINTS} + import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkFunSuite @@ -260,4 +262,138 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { assertEqual(model.latestModel().weights(0), 5.0, 1.0) assertEqual(model.latestModel().weights(1), 3.0, 1.0) } + + test("parameter accuracy with long half life and POINTS as TimeUnit") { + // create model + val model = new StreamingLinearRegressionWithSGD() + .setHalfLife(5000, POINTS) + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.5) + .setNumIterations(50) + .setConvergenceTol(0.0001) + + // generate sequence of simulated data + val numBatches = 10 + // the first few RDD's are generated under the model A + val inputA = (0 until (numBatches-1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) + } + // the last RDD is generated under the model B + val inputB = + LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) + val input = inputA :+ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with long half life, the final parameter estimates should be close to model A + assertEqual(model.latestModel().intercept, 0.0, 1.0) + assertEqual(model.latestModel().weights(0), 10.0, 1.0) + assertEqual(model.latestModel().weights(1), 10.0, 1.0) + } + + test("parameter accuracy with long half life and BATCHES as TimeUnit") { + // create model + val model = new StreamingLinearRegressionWithSGD() + .setHalfLife(20, BATCHES) + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.5) + .setNumIterations(50) + .setConvergenceTol(0.0001) + + // generate sequence of simulated data + val numBatches = 10 + // the first few RDD's are generated under the model A + val inputA = (0 until (numBatches-1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) + } + // the last RDD is generated under the model B + val inputB = + LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (numBatches + 1)) + val input = inputA :+ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with long half life, the final parameter estimates should be close to model A + assertEqual(model.latestModel().intercept, 0.0, 1.0) + assertEqual(model.latestModel().weights(0), 10.0, 1.0) + assertEqual(model.latestModel().weights(1), 10.0, 1.0) + } + + test("parameter accuracy with short half life and POINTS as TimeUnit") { + // create model + val model = new StreamingLinearRegressionWithSGD() + .setHalfLife(50, POINTS) + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.5) + .setNumIterations(50) + .setConvergenceTol(0.0001) + + // generate sequence of simulated data + val numBatches = 10 + // the first half of the RDD's are generated under the model A + val inputA = (0 until (numBatches / 2 - 1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) + } + // the second half of the RDD's are generated under the model B + val inputB = (0 until (numBatches / 2 - 1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (i + 1)) + } + val input = inputA ++ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with short half life, the final parameter estimates should be close to model B + assertEqual(model.latestModel().intercept, 0.0, 1.0) + assertEqual(model.latestModel().weights(0), 5.0, 1.0) + assertEqual(model.latestModel().weights(1), 3.0, 1.0) + } + + test("parameter accuracy with short half life and BATCHES as TimeUnit") { + // create model + val model = new StreamingLinearRegressionWithSGD() + .setHalfLife(1, BATCHES) + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.5) + .setNumIterations(50) + .setConvergenceTol(0.0001) + + // generate sequence of simulated data + val numBatches = 10 + // the first half of the RDD's are generated under the model A + val inputA = (0 until (numBatches / 2 - 1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 200, 42 * (i + 1)) + } + // the second half of the RDD's are generated under the model B + val inputB = (0 until (numBatches / 2 - 1)).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(5.0, 3.0), 200, 42 * (i + 1)) + } + val input = inputA ++ inputB + + // apply model training to input stream + ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // with short half life, the final parameter estimates should be close to model B + assertEqual(model.latestModel().intercept, 0.0, 1.0) + assertEqual(model.latestModel().weights(0), 5.0, 1.0) + assertEqual(model.latestModel().weights(1), 3.0, 1.0) + } } From 686fd2c8cc5194ba106eefb8d5e9fd6acaff43be Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Fri, 4 Sep 2015 15:09:46 -0700 Subject: [PATCH 09/11] Refactor: timeUnit has its own setter. Add @Since. Clean up ScalaDoc. --- .../StreamingLogisticRegressionWithSGD.scala | 10 ++-- .../mllib/regression/StreamingDecay.scala | 42 ++++++++++----- .../StreamingLinearRegressionWithSGD.scala | 9 ++-- .../StreamingLinearRegressionSuite.scala | 54 ++++++++++--------- 4 files changed, 70 insertions(+), 45 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala index fe1378923bbd7..7e65e52af81ff 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala @@ -108,11 +108,13 @@ class StreamingLogisticRegressionWithSGD private[mllib] ( override def setDecayFactor(decayFactor: Double): this.type = { super.setDecayFactor(decayFactor) - this } - override def setHalfLife(halfLife: Double, timeUnit: TimeUnit): this.type = { - super.setHalfLife(halfLife, timeUnit) - this + override def setHalfLife(halfLife: Double): this.type = { + super.setHalfLife(halfLife) + } + + override def setTimeUnit(timeUnit: TimeUnit): this.type = { + super.setTimeUnit(timeUnit) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index 152c66c4f6609..10118e5f07721 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.Logging -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{Since, Experimental} import org.apache.spark.mllib.regression.StreamingDecay.{TimeUnit, BATCHES, POINTS} /** @@ -31,38 +31,53 @@ import org.apache.spark.mllib.regression.StreamingDecay.{TimeUnit, BATCHES, POIN */ @Experimental private[mllib] trait StreamingDecay extends Logging{ - private[this] var decayFactor: Double = 0 private[this] var timeUnit: TimeUnit = BATCHES /** * Set the decay factor for the forgetful algorithms. - * The decay factor should be between 0 and 1, inclusive. - * decayFactor = 0: only the data from the most recent RDD will be used. - * decayFactor = 1: all data since the beginning of the DStream will be used. - * decayFactor is default to zero. + * The decay factor specifies the decay of + * the contribution of data from time t-1 to time t. + * Valid decayFactor ranges from 0 to 1, inclusive. + * decayFactor = 0: previous data have no contribution to the model at the next time unit. + * decayFactor = 1: previous data have equal contribution to the model as the data arriving + * at the next time unit. + * decayFactor is default to 0. * * @param decayFactor the decay factor */ + @Since("1.6.0") def setDecayFactor(decayFactor: Double): this.type = { this.decayFactor = decayFactor this } - /** - * Set the half life and time unit ("batches" or "points") for the forgetful algorithm. - * The half life along with the time unit provides an alternative way to specify decay factor. + * Set the half life for the forgetful algorithm. + * The half life provides an alternative way to specify decay factor. * The decay factor is calculated such that, for data acquired at time t, * its contribution by time t + halfLife will have dropped by 0.5. - * The unit of time can be specified either as batches or points. + * Half life > 0 is considered as valid. * * @param halfLife the half life - * @param timeUnit the time unit */ - def setHalfLife(halfLife: Double, timeUnit: TimeUnit): this.type = { + @Since("1.6.0") + def setHalfLife(halfLife: Double): this.type = { this.decayFactor = math.exp(math.log(0.5) / halfLife) logInfo("Setting decay factor to: %g ".format (this.decayFactor)) + this + } + + /** + * Set the time unit for the forgetful algorithm. + * BATCHES: Each RDD in the DStream will be treated as 1 time unit. + * POINTS: Each data point will be treated as 1 time unit. + * timeUnit is default to BATCHES. + * + * @param timeUnit the time unit + */ + @Since("1.6.0") + def setTimeUnit(timeUnit: TimeUnit): this.type = { this.timeUnit = timeUnit this } @@ -84,16 +99,19 @@ private[mllib] trait StreamingDecay extends Logging{ * Provides the String constants for allowed time unit in the forgetful algorithm. */ @Experimental +@Since("1.6.0") object StreamingDecay { private[mllib] sealed trait TimeUnit /** * Each RDD in the DStream will be treated as 1 time unit. * */ + @Since("1.6.0") case object BATCHES extends TimeUnit /** * Each data point will be treated as 1 time unit. * */ + @Since("1.6.0") case object POINTS extends TimeUnit } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index 0d9ab765f6978..c5bfcde8a5de3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -114,13 +114,14 @@ class StreamingLinearRegressionWithSGD private[mllib] ( override def setDecayFactor(decayFactor: Double): this.type = { super.setDecayFactor(decayFactor) - this } - override def setHalfLife(halfLife: Double, timeUnit: TimeUnit): this.type = { - super.setHalfLife(halfLife, timeUnit) - this + override def setHalfLife(halfLife: Double): this.type = { + super.setHalfLife(halfLife) } + override def setTimeUnit(timeUnit: TimeUnit): this.type = { + super.setTimeUnit(timeUnit) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index a0a2f34d8fa6e..0bc8a4e6f2938 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.regression.StreamingDecay.{BATCHES, POINTS} - import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkFunSuite import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.StreamingDecay.{BATCHES, POINTS} import org.apache.spark.mllib.util.LinearDataGenerator -import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { @@ -225,9 +225,9 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { runStreams(ssc, numBatches, numBatches) // with full memory, the final parameter estimates should be close to model A - assertEqual(model.latestModel().intercept, 0.0, 1.0) - assertEqual(model.latestModel().weights(0), 10.0, 1.0) - assertEqual(model.latestModel().weights(1), 10.0, 1.0) + assert(model.latestModel().intercept ~== 0.0 absTol 1.0) + assert(model.latestModel().weights(0) ~== 10.0 absTol 1.0) + assert(model.latestModel().weights(1) ~== 10.0 absTol 1.0) } test("parameter accuracy with no memory (decayFactor = 0)") { @@ -258,15 +258,16 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { runStreams(ssc, numBatches, numBatches) // with no memory, the final parameter estimates should be close to model B - assertEqual(model.latestModel().intercept, 0.0, 1.0) - assertEqual(model.latestModel().weights(0), 5.0, 1.0) - assertEqual(model.latestModel().weights(1), 3.0, 1.0) + assert(model.latestModel().intercept ~== 0.0 absTol 1.0) + assert(model.latestModel().weights(0) ~== 5.0 absTol 1.0) + assert(model.latestModel().weights(1) ~== 3.0 absTol 1.0) } test("parameter accuracy with long half life and POINTS as TimeUnit") { // create model val model = new StreamingLinearRegressionWithSGD() - .setHalfLife(5000, POINTS) + .setHalfLife(5000) + .setTimeUnit(POINTS) .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.5) .setNumIterations(50) @@ -291,15 +292,16 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { runStreams(ssc, numBatches, numBatches) // with long half life, the final parameter estimates should be close to model A - assertEqual(model.latestModel().intercept, 0.0, 1.0) - assertEqual(model.latestModel().weights(0), 10.0, 1.0) - assertEqual(model.latestModel().weights(1), 10.0, 1.0) + assert(model.latestModel().intercept ~== 0.0 absTol 1.0) + assert(model.latestModel().weights(0) ~== 10.0 absTol 1.0) + assert(model.latestModel().weights(1) ~== 10.0 absTol 1.0) } test("parameter accuracy with long half life and BATCHES as TimeUnit") { // create model val model = new StreamingLinearRegressionWithSGD() - .setHalfLife(20, BATCHES) + .setHalfLife(20) + .setTimeUnit(BATCHES) .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.5) .setNumIterations(50) @@ -324,15 +326,16 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { runStreams(ssc, numBatches, numBatches) // with long half life, the final parameter estimates should be close to model A - assertEqual(model.latestModel().intercept, 0.0, 1.0) - assertEqual(model.latestModel().weights(0), 10.0, 1.0) - assertEqual(model.latestModel().weights(1), 10.0, 1.0) + assert(model.latestModel().intercept ~== 0.0 absTol 1.0) + assert(model.latestModel().weights(0) ~== 10.0 absTol 1.0) + assert(model.latestModel().weights(1) ~== 10.0 absTol 1.0) } test("parameter accuracy with short half life and POINTS as TimeUnit") { // create model val model = new StreamingLinearRegressionWithSGD() - .setHalfLife(50, POINTS) + .setHalfLife(50) + .setTimeUnit(POINTS) .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.5) .setNumIterations(50) @@ -358,15 +361,16 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { runStreams(ssc, numBatches, numBatches) // with short half life, the final parameter estimates should be close to model B - assertEqual(model.latestModel().intercept, 0.0, 1.0) - assertEqual(model.latestModel().weights(0), 5.0, 1.0) - assertEqual(model.latestModel().weights(1), 3.0, 1.0) + assert(model.latestModel().intercept ~== 0.0 absTol 1.0) + assert(model.latestModel().weights(0) ~== 5.0 absTol 1.0) + assert(model.latestModel().weights(1) ~== 3.0 absTol 1.0) } test("parameter accuracy with short half life and BATCHES as TimeUnit") { // create model val model = new StreamingLinearRegressionWithSGD() - .setHalfLife(1, BATCHES) + .setHalfLife(1) + .setTimeUnit(BATCHES) .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.5) .setNumIterations(50) @@ -392,8 +396,8 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { runStreams(ssc, numBatches, numBatches) // with short half life, the final parameter estimates should be close to model B - assertEqual(model.latestModel().intercept, 0.0, 1.0) - assertEqual(model.latestModel().weights(0), 5.0, 1.0) - assertEqual(model.latestModel().weights(1), 3.0, 1.0) + assert(model.latestModel().intercept ~== 0.0 absTol 1.0) + assert(model.latestModel().weights(0) ~== 5.0 absTol 1.0) + assert(model.latestModel().weights(1) ~== 3.0 absTol 1.0) } } From 3b42f96cc522503f89fd9378382561cfdd7fb727 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Sat, 5 Sep 2015 11:21:50 -0700 Subject: [PATCH 10/11] remove duplicate setters. clean up new lines and comments. --- .../StreamingLogisticRegressionWithSGD.scala | 12 ------------ .../spark/mllib/regression/StreamingDecay.scala | 11 ++--------- .../mllib/regression/StreamingLinearAlgorithm.scala | 1 + .../StreamingLinearRegressionWithSGD.scala | 13 ------------- 4 files changed, 3 insertions(+), 34 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala index 7e65e52af81ff..07cc6d66e17fc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala @@ -105,16 +105,4 @@ class StreamingLogisticRegressionWithSGD private[mllib] ( this.model = Some(algorithm.createModel(initialWeights, 0.0)) this } - - override def setDecayFactor(decayFactor: Double): this.type = { - super.setDecayFactor(decayFactor) - } - - override def setHalfLife(halfLife: Double): this.type = { - super.setHalfLife(halfLife) - } - - override def setTimeUnit(timeUnit: TimeUnit): this.type = { - super.setTimeUnit(timeUnit) - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index 10118e5f07721..318418d1b7073 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -27,12 +27,11 @@ import org.apache.spark.mllib.regression.StreamingDecay.{TimeUnit, BATCHES, POIN * forgetful update rule in [[StreamingLinearAlgorithm]]; * The degree of forgetfulness can be specified by the decay factor * or the half life. - * */ @Experimental private[mllib] trait StreamingDecay extends Logging{ - private[this] var decayFactor: Double = 0 - private[this] var timeUnit: TimeUnit = BATCHES + private var decayFactor: Double = 0 + private var timeUnit: TimeUnit = BATCHES /** * Set the decay factor for the forgetful algorithms. @@ -43,7 +42,6 @@ private[mllib] trait StreamingDecay extends Logging{ * decayFactor = 1: previous data have equal contribution to the model as the data arriving * at the next time unit. * decayFactor is default to 0. - * * @param decayFactor the decay factor */ @Since("1.6.0") @@ -58,7 +56,6 @@ private[mllib] trait StreamingDecay extends Logging{ * The decay factor is calculated such that, for data acquired at time t, * its contribution by time t + halfLife will have dropped by 0.5. * Half life > 0 is considered as valid. - * * @param halfLife the half life */ @Since("1.6.0") @@ -73,7 +70,6 @@ private[mllib] trait StreamingDecay extends Logging{ * BATCHES: Each RDD in the DStream will be treated as 1 time unit. * POINTS: Each data point will be treated as 1 time unit. * timeUnit is default to BATCHES. - * * @param timeUnit the time unit */ @Since("1.6.0") @@ -84,7 +80,6 @@ private[mllib] trait StreamingDecay extends Logging{ /** * Derive the discount factor. - * * @param numNewDataPoints number of data points for the RDD arriving at time t. * @return Discount factor */ @@ -104,13 +99,11 @@ object StreamingDecay { private[mllib] sealed trait TimeUnit /** * Each RDD in the DStream will be treated as 1 time unit. - * */ @Since("1.6.0") case object BATCHES extends TimeUnit /** * Each data point will be treated as 1 time unit. - * */ @Since("1.6.0") case object POINTS extends TimeUnit diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 8d793ab4c7fa0..57a1a72b9e5a8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -82,6 +82,7 @@ abstract class StreamingLinearAlgorithm[ /** The model to be updated and used for prediction. */ protected var model: Option[M] + /** The weight estimated with data arriving before the current time unit. */ protected var previousDataWeight: Double = 0 /** The algorithm to use for updating. */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index c5bfcde8a5de3..f086484f10d3c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -111,17 +111,4 @@ class StreamingLinearRegressionWithSGD private[mllib] ( this.algorithm.optimizer.setConvergenceTol(tolerance) this } - - override def setDecayFactor(decayFactor: Double): this.type = { - super.setDecayFactor(decayFactor) - } - - override def setHalfLife(halfLife: Double): this.type = { - super.setHalfLife(halfLife) - } - - override def setTimeUnit(timeUnit: TimeUnit): this.type = { - super.setTimeUnit(timeUnit) - } - } From 0072400825c0c4a871467a92799d12752fe96402 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Sun, 8 Nov 2015 19:57:19 -0800 Subject: [PATCH 11/11] Improve Java API compatibility. --- .../StreamingLogisticRegressionWithSGD.scala | 16 +++++++++++++++- .../spark/mllib/regression/StreamingDecay.scala | 14 ++++++++------ .../StreamingLinearRegressionWithSGD.scala | 16 +++++++++++++++- .../JavaStreamingLogisticRegressionSuite.java | 5 ++++- .../JavaStreamingLinearRegressionSuite.java | 4 +++- 5 files changed, 45 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala index 07cc6d66e17fc..a13e4143e5c4f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionWithSGD.scala @@ -19,7 +19,6 @@ package org.apache.spark.mllib.classification import org.apache.spark.annotation.Since import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.StreamingDecay.TimeUnit import org.apache.spark.mllib.regression.StreamingLinearAlgorithm /** @@ -105,4 +104,19 @@ class StreamingLogisticRegressionWithSGD private[mllib] ( this.model = Some(algorithm.createModel(initialWeights, 0.0)) this } + + override def setDecayFactor(decayFactor: Double): this.type = { + super.setDecayFactor(decayFactor) + this + } + + override def setHalfLife(halfLife: Double): this.type = { + super.setHalfLife(halfLife) + this + } + + override def setTimeUnit(timeUnit: String): this.type = { + super.setTimeUnit(timeUnit) + this + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala index 318418d1b7073..643b70c3552e2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingDecay.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.Logging import org.apache.spark.annotation.{Since, Experimental} -import org.apache.spark.mllib.regression.StreamingDecay.{TimeUnit, BATCHES, POINTS} +import org.apache.spark.mllib.regression.StreamingDecay.{BATCHES, POINTS} /** * :: Experimental :: @@ -31,7 +31,7 @@ import org.apache.spark.mllib.regression.StreamingDecay.{TimeUnit, BATCHES, POIN @Experimental private[mllib] trait StreamingDecay extends Logging{ private var decayFactor: Double = 0 - private var timeUnit: TimeUnit = BATCHES + private var timeUnit: String = StreamingDecay.BATCHES /** * Set the decay factor for the forgetful algorithms. @@ -73,7 +73,10 @@ private[mllib] trait StreamingDecay extends Logging{ * @param timeUnit the time unit */ @Since("1.6.0") - def setTimeUnit(timeUnit: TimeUnit): this.type = { + def setTimeUnit(timeUnit: String): this.type = { + if (timeUnit != StreamingDecay.BATCHES && timeUnit != StreamingDecay.POINTS) { + throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) + } this.timeUnit = timeUnit this } @@ -96,15 +99,14 @@ private[mllib] trait StreamingDecay extends Logging{ @Experimental @Since("1.6.0") object StreamingDecay { - private[mllib] sealed trait TimeUnit /** * Each RDD in the DStream will be treated as 1 time unit. */ @Since("1.6.0") - case object BATCHES extends TimeUnit + final val BATCHES = "BATCHES" /** * Each data point will be treated as 1 time unit. */ @Since("1.6.0") - case object POINTS extends TimeUnit + final val POINTS = "POINTS" } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index f086484f10d3c..67d66ab70b90c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -19,7 +19,6 @@ package org.apache.spark.mllib.regression import org.apache.spark.annotation.Since import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.StreamingDecay.TimeUnit /** * Train or predict a linear regression model on streaming data. Training uses @@ -111,4 +110,19 @@ class StreamingLinearRegressionWithSGD private[mllib] ( this.algorithm.optimizer.setConvergenceTol(tolerance) this } + + override def setDecayFactor(decayFactor: Double): this.type = { + super.setDecayFactor(decayFactor) + this + } + + override def setHalfLife(halfLife: Double): this.type = { + super.setHalfLife(halfLife) + this + } + + override def setTimeUnit(timeUnit: String): this.type = { + super.setTimeUnit(timeUnit) + this + } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java index c9e5ee22f3273..351aa22b78804 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaStreamingLogisticRegressionSuite.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.spark.mllib.regression.StreamingDecay; import scala.Tuple2; import org.junit.After; @@ -72,7 +73,9 @@ public void javaAPI() { attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2)); StreamingLogisticRegressionWithSGD slr = new StreamingLogisticRegressionWithSGD() .setNumIterations(2) - .setInitialWeights(Vectors.dense(0.0)); + .setInitialWeights(Vectors.dense(0.0)) + .setDecayFactor(0.5) + .setTimeUnit("POINTS"); slr.trainOn(training); JavaPairDStream prediction = slr.predictOnValues(test); attachTestOutputStream(prediction.count()); diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java index dbf6488d41085..03b24c7c2f9b3 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaStreamingLinearRegressionSuite.java @@ -71,7 +71,9 @@ public void javaAPI() { attachTestInputStream(ssc, Arrays.asList(testBatch, testBatch), 2)); StreamingLinearRegressionWithSGD slr = new StreamingLinearRegressionWithSGD() .setNumIterations(2) - .setInitialWeights(Vectors.dense(0.0)); + .setInitialWeights(Vectors.dense(0.0)) + .setDecayFactor(0.5) + .setTimeUnit("POINTS"); slr.trainOn(training); JavaPairDStream prediction = slr.predictOnValues(test); attachTestOutputStream(prediction.count());