Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1462: Examples of ML algorithms are using deprecated APIs #416

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.examples

import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map

import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.spark.examples

import org.apache.hadoop.mapreduce.Job
import java.nio.ByteBuffer
import java.util.SortedMap

import scala.collection.JavaConversions._

import org.apache.cassandra.db.IColumn
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
import org.apache.cassandra.thrift._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.nio.ByteBuffer
import java.util.SortedMap
import org.apache.cassandra.db.IColumn
import org.apache.cassandra.utils.ByteBufferUtil
import scala.collection.JavaConversions._
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/*
* This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.examples

import java.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.util.Random

object GroupByTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.examples

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD

object HBaseTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HBaseTest",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.spark.examples

import scala.math.sqrt
import cern.jet.math._

import cern.colt.matrix._
import cern.colt.matrix.linalg._
import cern.jet.math._

/**
* Alternating least squares matrix factorization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector

import breeze.linalg.{Vector, DenseVector}

object LocalFileLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
DataPoint(new Vector(nums.slice(1, D + 1)), nums(0))
DataPoint(new DenseVector(nums.slice(1, D + 1)), nums(0))
}

def main(args: Array[String]) {
Expand All @@ -37,15 +38,15 @@ object LocalFileLR {
val ITERATIONS = args(1).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use rand.nextDouble() instead of rand.nextDouble because it changes the internal state of rand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this PR, but it would be great if you update it in the next pass.

println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
var gradient = Vector.zeros(D)
var gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
}
w -= gradient
}
Expand Down
24 changes: 14 additions & 10 deletions examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector
import org.apache.spark.SparkContext._

import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet

import breeze.linalg.{Vector, DenseVector, squaredDistance}

import org.apache.spark.SparkContext._

/**
* K-means clustering.
*/
Expand All @@ -36,19 +39,19 @@ object LocalKMeans {

def generateData = {
def generatePoint(i: Int) = {
Vector(D, _ => rand.nextDouble * R)
DenseVector.fill(D){rand.nextDouble * R}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. nextDouble().

}
Array.tabulate(N)(generatePoint)
}

def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity

for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get
val tempDist = p.squaredDist(vCurr)
val tempDist = squaredDistance(p, vCurr)
if (tempDist < closest) {
closest = tempDist
bestIndex = i
Expand All @@ -60,8 +63,8 @@ object LocalKMeans {

def main(args: Array[String]) {
val data = generateData
var points = new HashSet[Vector]
var kPoints = new HashMap[Int, Vector]
var points = new HashSet[Vector[Double]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if you import scala.collection.mutable and use mutable.HashSet and mutable.HashMap. Again, this is not related to this PR ...

var kPoints = new HashMap[Int, Vector[Double]]
var tempDist = 1.0

while (points.size < K) {
Expand All @@ -81,16 +84,17 @@ object LocalKMeans {
var mappings = closest.groupBy[Int] (x => x._1)

var pointStats = mappings.map { pair =>
pair._2.reduceLeft [(Int, (Vector, Int))] {
pair._2.reduceLeft [(Int, (Vector[Double], Int))] {
case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2))
}
}

var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
var newPoints = pointStats.map {mapping =>
(mapping._1, mapping._2._1 * (1.0 / mapping._2._2))}

tempDist = 0.0
for (mapping <- newPoints) {
tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2)
}

for (newP <- newPoints) {
Expand Down
15 changes: 8 additions & 7 deletions examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector

import breeze.linalg.{Vector, DenseVector}

/**
* Logistic regression based classification.
Expand All @@ -30,12 +31,12 @@ object LocalLR {
val ITERATIONS = 5
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def generateData = {
def generatePoint(i: Int) = {
val y = if(i % 2 == 0) -1 else 1
val x = Vector(D, _ => rand.nextGaussian + y * R)
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
Array.tabulate(N)(generatePoint)
Expand All @@ -45,15 +46,15 @@ object LocalLR {
val data = generateData

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
var gradient = Vector.zeros(D)
var gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
}
w -= gradient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.examples

import scala.math.random

import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkContext._

object LocalPi {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/**
* Executes a roll up-style query against Apache logs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

object MultiBroadcastTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.examples

import java.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.util.Random

object SimpleSkewedGroupByTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.examples

import java.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.util.Random

object SkewedGroupByTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.examples

import scala.math.sqrt
import cern.jet.math._

import cern.colt.matrix._
import cern.colt.matrix.linalg._
import cern.jet.math._

import org.apache.spark._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
package org.apache.spark.examples

import java.util.Random

import scala.math.exp
import org.apache.spark.util.Vector

import breeze.linalg.{Vector, DenseVector}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo


/**
* Logistic regression based classification.
*/
object SparkHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val tok = new java.util.StringTokenizer(line, " ")
Expand All @@ -41,7 +45,7 @@ object SparkHdfsLR {
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
DataPoint(new Vector(x), y)
DataPoint(new DenseVector(x), y)
}

def main(args: Array[String]) {
Expand All @@ -61,13 +65,13 @@ object SparkHdfsLR {
val ITERATIONS = args(2).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
val gradient = points.map { p =>
(1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}
Expand Down
Loading