Skip to content

Commit

Permalink
SPARK-1462: Examples of ML algorithms are using deprecated APIs
Browse files Browse the repository at this point in the history
This will also fix SPARK-1464: Update MLLib Examples to Use Breeze.

Author: Sandeep <[email protected]>

Closes apache#416 from techaddict/1462 and squashes the following commits:

a43638e [Sandeep] Some Style Changes
3ce69c3 [Sandeep] Fix Ordering and Naming of Imports in Examples
6c7e543 [Sandeep] SPARK-1462: Examples of ML algorithms are using deprecated APIs
  • Loading branch information
techaddict authored and pdeyhim committed Jun 25, 2014
1 parent 6c9dcb7 commit 664f38d
Show file tree
Hide file tree
Showing 20 changed files with 100 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.examples

import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map

import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.spark.examples

import org.apache.hadoop.mapreduce.Job
import java.nio.ByteBuffer
import java.util.SortedMap

import scala.collection.JavaConversions._

import org.apache.cassandra.db.IColumn
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
import org.apache.cassandra.thrift._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.nio.ByteBuffer
import java.util.SortedMap
import org.apache.cassandra.db.IColumn
import org.apache.cassandra.utils.ByteBufferUtil
import scala.collection.JavaConversions._
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/*
* This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.examples

import java.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.util.Random

object GroupByTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.examples

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD

object HBaseTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HBaseTest",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.spark.examples

import scala.math.sqrt
import cern.jet.math._

import cern.colt.matrix._
import cern.colt.matrix.linalg._
import cern.jet.math._

/**
* Alternating least squares matrix factorization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector

import breeze.linalg.{Vector, DenseVector}

object LocalFileLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
DataPoint(new Vector(nums.slice(1, D + 1)), nums(0))
DataPoint(new DenseVector(nums.slice(1, D + 1)), nums(0))
}

def main(args: Array[String]) {
Expand All @@ -37,15 +38,15 @@ object LocalFileLR {
val ITERATIONS = args(1).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
var gradient = Vector.zeros(D)
var gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
}
w -= gradient
}
Expand Down
24 changes: 14 additions & 10 deletions examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector
import org.apache.spark.SparkContext._

import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet

import breeze.linalg.{Vector, DenseVector, squaredDistance}

import org.apache.spark.SparkContext._

/**
* K-means clustering.
*/
Expand All @@ -36,19 +39,19 @@ object LocalKMeans {

def generateData = {
def generatePoint(i: Int) = {
Vector(D, _ => rand.nextDouble * R)
DenseVector.fill(D){rand.nextDouble * R}
}
Array.tabulate(N)(generatePoint)
}

def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity

for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get
val tempDist = p.squaredDist(vCurr)
val tempDist = squaredDistance(p, vCurr)
if (tempDist < closest) {
closest = tempDist
bestIndex = i
Expand All @@ -60,8 +63,8 @@ object LocalKMeans {

def main(args: Array[String]) {
val data = generateData
var points = new HashSet[Vector]
var kPoints = new HashMap[Int, Vector]
var points = new HashSet[Vector[Double]]
var kPoints = new HashMap[Int, Vector[Double]]
var tempDist = 1.0

while (points.size < K) {
Expand All @@ -81,16 +84,17 @@ object LocalKMeans {
var mappings = closest.groupBy[Int] (x => x._1)

var pointStats = mappings.map { pair =>
pair._2.reduceLeft [(Int, (Vector, Int))] {
pair._2.reduceLeft [(Int, (Vector[Double], Int))] {
case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2))
}
}

var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
var newPoints = pointStats.map {mapping =>
(mapping._1, mapping._2._1 * (1.0 / mapping._2._2))}

tempDist = 0.0
for (mapping <- newPoints) {
tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2)
}

for (newP <- newPoints) {
Expand Down
15 changes: 8 additions & 7 deletions examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector

import breeze.linalg.{Vector, DenseVector}

/**
* Logistic regression based classification.
Expand All @@ -30,12 +31,12 @@ object LocalLR {
val ITERATIONS = 5
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def generateData = {
def generatePoint(i: Int) = {
val y = if(i % 2 == 0) -1 else 1
val x = Vector(D, _ => rand.nextGaussian + y * R)
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
Array.tabulate(N)(generatePoint)
Expand All @@ -45,15 +46,15 @@ object LocalLR {
val data = generateData

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
var gradient = Vector.zeros(D)
var gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
}
w -= gradient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.examples

import scala.math.random

import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkContext._

object LocalPi {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/**
* Executes a roll up-style query against Apache logs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

object MultiBroadcastTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.examples

import java.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.util.Random

object SimpleSkewedGroupByTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.examples

import java.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.util.Random

object SkewedGroupByTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.examples

import scala.math.sqrt
import cern.jet.math._

import cern.colt.matrix._
import cern.colt.matrix.linalg._
import cern.jet.math._

import org.apache.spark._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
package org.apache.spark.examples

import java.util.Random

import scala.math.exp
import org.apache.spark.util.Vector

import breeze.linalg.{Vector, DenseVector}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo


/**
* Logistic regression based classification.
*/
object SparkHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val tok = new java.util.StringTokenizer(line, " ")
Expand All @@ -41,7 +45,7 @@ object SparkHdfsLR {
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
DataPoint(new Vector(x), y)
DataPoint(new DenseVector(x), y)
}

def main(args: Array[String]) {
Expand All @@ -61,13 +65,13 @@ object SparkHdfsLR {
val ITERATIONS = args(2).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
val gradient = points.map { p =>
(1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}
Expand Down
Loading

0 comments on commit 664f38d

Please sign in to comment.