From 664f38d09062443cf502c3f82508db1292e931fc Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 16 Apr 2014 18:23:07 -0700 Subject: [PATCH] SPARK-1462: Examples of ML algorithms are using deprecated APIs This will also fix SPARK-1464: Update MLLib Examples to Use Breeze. Author: Sandeep Closes #416 from techaddict/1462 and squashes the following commits: a43638e [Sandeep] Some Style Changes 3ce69c3 [Sandeep] Fix Ordering and Naming of Imports in Examples 6c7e543 [Sandeep] SPARK-1462: Examples of ML algorithms are using deprecated APIs --- .../spark/examples/CassandraCQLTest.scala | 3 +++ .../apache/spark/examples/CassandraTest.scala | 16 +++++++------ .../apache/spark/examples/GroupByTest.scala | 3 ++- .../org/apache/spark/examples/HBaseTest.scala | 7 +++--- .../org/apache/spark/examples/LocalALS.scala | 3 ++- .../apache/spark/examples/LocalFileLR.scala | 15 ++++++------ .../apache/spark/examples/LocalKMeans.scala | 24 +++++++++++-------- .../org/apache/spark/examples/LocalLR.scala | 15 ++++++------ .../org/apache/spark/examples/LocalPi.scala | 3 ++- .../org/apache/spark/examples/LogQuery.scala | 1 + .../spark/examples/MultiBroadcastTest.scala | 2 +- .../examples/SimpleSkewedGroupByTest.scala | 3 ++- .../spark/examples/SkewedGroupByTest.scala | 3 ++- .../org/apache/spark/examples/SparkALS.scala | 4 +++- .../apache/spark/examples/SparkHdfsLR.scala | 14 +++++++---- .../apache/spark/examples/SparkKMeans.scala | 17 +++++++------ .../org/apache/spark/examples/SparkLR.scala | 13 ++++++---- .../apache/spark/examples/SparkPageRank.scala | 1 - .../org/apache/spark/examples/SparkTC.scala | 5 ++-- .../spark/examples/SparkTachyonHdfsLR.scala | 14 +++++++---- 20 files changed, 100 insertions(+), 66 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 1f8d7cb5995b8..4e787240e912d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -18,15 +18,18 @@ package org.apache.spark.examples import java.nio.ByteBuffer + import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer import scala.collection.immutable.Map + import org.apache.cassandra.hadoop.ConfigHelper import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat import org.apache.cassandra.hadoop.cql3.CqlConfigHelper import org.apache.cassandra.hadoop.cql3.CqlOutputFormat import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job + import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index 3e3a3b2d50abe..ed5d2f9e46f29 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -17,19 +17,21 @@ package org.apache.spark.examples -import org.apache.hadoop.mapreduce.Job +import java.nio.ByteBuffer +import java.util.SortedMap + +import scala.collection.JavaConversions._ + +import org.apache.cassandra.db.IColumn import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat import org.apache.cassandra.hadoop.ConfigHelper import org.apache.cassandra.hadoop.ColumnFamilyInputFormat import org.apache.cassandra.thrift._ -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import java.nio.ByteBuffer -import java.util.SortedMap -import org.apache.cassandra.db.IColumn import org.apache.cassandra.utils.ByteBufferUtil -import scala.collection.JavaConversions._ +import org.apache.hadoop.mapreduce.Job +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ /* * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index 29114c6dabcdb..2b7ecdc991325 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -17,9 +17,10 @@ package org.apache.spark.examples +import java.util.Random + import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import java.util.Random object GroupByTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index 700121d16dd60..cbf78e8e9eba1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -17,12 +17,13 @@ package org.apache.spark.examples -import org.apache.spark._ -import org.apache.spark.rdd.NewHadoopRDD -import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat +import org.apache.spark._ +import org.apache.spark.rdd.NewHadoopRDD + object HBaseTest { def main(args: Array[String]) { val sc = new SparkContext(args(0), "HBaseTest", diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 37ad4bd0999bd..658f73d96a86a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -18,9 +18,10 @@ package org.apache.spark.examples import scala.math.sqrt -import cern.jet.math._ + import cern.colt.matrix._ import cern.colt.matrix.linalg._ +import cern.jet.math._ /** * Alternating least squares matrix factorization. diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index 737c4441398cd..0ef3001ca4ccd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -18,17 +18,18 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.util.Vector + +import breeze.linalg.{Vector, DenseVector} object LocalFileLR { val D = 10 // Numer of dimensions val rand = new Random(42) - case class DataPoint(x: Vector, y: Double) + case class DataPoint(x: Vector[Double], y: Double) def parsePoint(line: String): DataPoint = { val nums = line.split(' ').map(_.toDouble) - DataPoint(new Vector(nums.slice(1, D + 1)), nums(0)) + DataPoint(new DenseVector(nums.slice(1, D + 1)), nums(0)) } def main(args: Array[String]) { @@ -37,15 +38,15 @@ object LocalFileLR { val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) + var w = DenseVector.fill(D){2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) - var gradient = Vector.zeros(D) + var gradient = DenseVector.zeros[Double](D) for (p <- points) { - val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y - gradient += scale * p.x + val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y + gradient += p.x * scale } w -= gradient } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index 3895675b3b003..e33a1b336d163 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -18,11 +18,14 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.util.Vector -import org.apache.spark.SparkContext._ + import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +import breeze.linalg.{Vector, DenseVector, squaredDistance} + +import org.apache.spark.SparkContext._ + /** * K-means clustering. */ @@ -36,19 +39,19 @@ object LocalKMeans { def generateData = { def generatePoint(i: Int) = { - Vector(D, _ => rand.nextDouble * R) + DenseVector.fill(D){rand.nextDouble * R} } Array.tabulate(N)(generatePoint) } - def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { + def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = { var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity for (i <- 1 to centers.size) { val vCurr = centers.get(i).get - val tempDist = p.squaredDist(vCurr) + val tempDist = squaredDistance(p, vCurr) if (tempDist < closest) { closest = tempDist bestIndex = i @@ -60,8 +63,8 @@ object LocalKMeans { def main(args: Array[String]) { val data = generateData - var points = new HashSet[Vector] - var kPoints = new HashMap[Int, Vector] + var points = new HashSet[Vector[Double]] + var kPoints = new HashMap[Int, Vector[Double]] var tempDist = 1.0 while (points.size < K) { @@ -81,16 +84,17 @@ object LocalKMeans { var mappings = closest.groupBy[Int] (x => x._1) var pointStats = mappings.map { pair => - pair._2.reduceLeft [(Int, (Vector, Int))] { + pair._2.reduceLeft [(Int, (Vector[Double], Int))] { case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2)) } } - var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)} + var newPoints = pointStats.map {mapping => + (mapping._1, mapping._2._1 * (1.0 / mapping._2._2))} tempDist = 0.0 for (mapping <- newPoints) { - tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2) + tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2) } for (newP <- newPoints) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala index cd4e9f1af0e2c..385b48089d572 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala @@ -18,7 +18,8 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.util.Vector + +import breeze.linalg.{Vector, DenseVector} /** * Logistic regression based classification. @@ -30,12 +31,12 @@ object LocalLR { val ITERATIONS = 5 val rand = new Random(42) - case class DataPoint(x: Vector, y: Double) + case class DataPoint(x: Vector[Double], y: Double) def generateData = { def generatePoint(i: Int) = { val y = if(i % 2 == 0) -1 else 1 - val x = Vector(D, _ => rand.nextGaussian + y * R) + val x = DenseVector.fill(D){rand.nextGaussian + y * R} DataPoint(x, y) } Array.tabulate(N)(generatePoint) @@ -45,15 +46,15 @@ object LocalLR { val data = generateData // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) + var w = DenseVector.fill(D){2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) - var gradient = Vector.zeros(D) + var gradient = DenseVector.zeros[Double](D) for (p <- data) { - val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y - gradient += scale * p.x + val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y + gradient += p.x * scale } w -= gradient } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala index bb7f22ec8df42..ee6b3ee34aeb2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala @@ -18,8 +18,9 @@ package org.apache.spark.examples import scala.math.random + import org.apache.spark._ -import SparkContext._ +import org.apache.spark.SparkContext._ object LocalPi { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index fcaba6bb4fb85..35758fa003d94 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -19,6 +19,7 @@ package org.apache.spark.examples import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ + /** * Executes a roll up-style query against Apache logs. */ diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index 97321ab8f41db..58f26f1e24052 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -17,8 +17,8 @@ package org.apache.spark.examples -import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext object MultiBroadcastTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index d05eedd31caa0..557a0c1841339 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -17,9 +17,10 @@ package org.apache.spark.examples +import java.util.Random + import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import java.util.Random object SimpleSkewedGroupByTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index fd9f043247d18..05a74725b875b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -17,9 +17,10 @@ package org.apache.spark.examples +import java.util.Random + import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import java.util.Random object SkewedGroupByTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index 68f151a2c47fe..191c82fd913ee 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -18,9 +18,11 @@ package org.apache.spark.examples import scala.math.sqrt -import cern.jet.math._ + import cern.colt.matrix._ import cern.colt.matrix.linalg._ +import cern.jet.math._ + import org.apache.spark._ /** diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index d8de8745c15d9..fd63ba3dbce7d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -18,12 +18,16 @@ package org.apache.spark.examples import java.util.Random + import scala.math.exp -import org.apache.spark.util.Vector + +import breeze.linalg.{Vector, DenseVector} + import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo + /** * Logistic regression based classification. */ @@ -31,7 +35,7 @@ object SparkHdfsLR { val D = 10 // Numer of dimensions val rand = new Random(42) - case class DataPoint(x: Vector, y: Double) + case class DataPoint(x: Vector[Double], y: Double) def parsePoint(line: String): DataPoint = { val tok = new java.util.StringTokenizer(line, " ") @@ -41,7 +45,7 @@ object SparkHdfsLR { while (i < D) { x(i) = tok.nextToken.toDouble; i += 1 } - DataPoint(new Vector(x), y) + DataPoint(new DenseVector(x), y) } def main(args: Array[String]) { @@ -61,13 +65,13 @@ object SparkHdfsLR { val ITERATIONS = args(2).toInt // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) + var w = DenseVector.fill(D){2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) val gradient = points.map { p => - (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x + p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y }.reduce(_ + _) w -= gradient } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 1a8b21618e23a..8aa31d7e6a2c2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -18,8 +18,10 @@ package org.apache.spark.examples import java.util.Random + +import breeze.linalg.{Vector, DenseVector, squaredDistance} + import org.apache.spark.SparkContext -import org.apache.spark.util.Vector import org.apache.spark.SparkContext._ /** @@ -29,17 +31,17 @@ object SparkKMeans { val R = 1000 // Scaling factor val rand = new Random(42) - def parseVector(line: String): Vector = { - new Vector(line.split(' ').map(_.toDouble)) + def parseVector(line: String): Vector[Double] = { + DenseVector(line.split(' ').map(_.toDouble)) } - def closestPoint(p: Vector, centers: Array[Vector]): Int = { + def closestPoint(p: Vector[Double], centers: Array[Vector[Double]]): Int = { var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity for (i <- 0 until centers.length) { - val tempDist = p.squaredDist(centers(i)) + val tempDist = squaredDistance(p, centers(i)) if (tempDist < closest) { closest = tempDist bestIndex = i @@ -69,11 +71,12 @@ object SparkKMeans { val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} - val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() + val newPoints = pointStats.map {pair => + (pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap() tempDist = 0.0 for (i <- 0 until K) { - tempDist += kPoints(i).squaredDist(newPoints(i)) + tempDist += squaredDistance(kPoints(i), newPoints(i)) } for (newP <- newPoints) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index 3a2699d4d996b..d70ce603bb71d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -18,8 +18,11 @@ package org.apache.spark.examples import java.util.Random + import scala.math.exp -import org.apache.spark.util.Vector + +import breeze.linalg.{Vector, DenseVector} + import org.apache.spark._ /** @@ -32,12 +35,12 @@ object SparkLR { val ITERATIONS = 5 val rand = new Random(42) - case class DataPoint(x: Vector, y: Double) + case class DataPoint(x: Vector[Double], y: Double) def generateData = { def generatePoint(i: Int) = { val y = if(i % 2 == 0) -1 else 1 - val x = Vector(D, _ => rand.nextGaussian + y * R) + val x = DenseVector.fill(D){rand.nextGaussian + y * R} DataPoint(x, y) } Array.tabulate(N)(generatePoint) @@ -54,13 +57,13 @@ object SparkLR { val points = sc.parallelize(generateData, numSlices).cache() // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) + var w = DenseVector.fill(D){2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) val gradient = points.map { p => - (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x + p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y }.reduce(_ + _) w -= gradient } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala index 45b6e10f3ea9e..60e4a11a21f69 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala @@ -20,7 +20,6 @@ package org.apache.spark.examples import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext - /** * Computes the PageRank of URLs from an input file. Input file should * be in format of: diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index eb47cf027cb10..65bd61abda6cd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -17,11 +17,12 @@ package org.apache.spark.examples -import org.apache.spark._ -import SparkContext._ import scala.util.Random import scala.collection.mutable +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ + /** * Transitive closure on a graph. */ diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala index 5698d4746495d..4f558929add51 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -18,13 +18,17 @@ package org.apache.spark.examples import java.util.Random + import scala.math.exp -import org.apache.spark.util.Vector + +import breeze.linalg.{Vector, DenseVector} + import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo import org.apache.spark.storage.StorageLevel + /** * Logistic regression based classification. * This example uses Tachyon to persist rdds during computation. @@ -33,7 +37,7 @@ object SparkTachyonHdfsLR { val D = 10 // Numer of dimensions val rand = new Random(42) - case class DataPoint(x: Vector, y: Double) + case class DataPoint(x: Vector[Double], y: Double) def parsePoint(line: String): DataPoint = { val tok = new java.util.StringTokenizer(line, " ") @@ -43,7 +47,7 @@ object SparkTachyonHdfsLR { while (i < D) { x(i) = tok.nextToken.toDouble; i += 1 } - DataPoint(new Vector(x), y) + DataPoint(new DenseVector(x), y) } def main(args: Array[String]) { @@ -63,13 +67,13 @@ object SparkTachyonHdfsLR { val ITERATIONS = args(2).toInt // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) + var w = DenseVector.fill(D){2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) val gradient = points.map { p => - (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x + p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y }.reduce(_ + _) w -= gradient }