Skip to content

Commit

Permalink
make coalesce test deterministic in RDDSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Apr 11, 2014
1 parent 7b4203a commit 59bc16f
Showing 1 changed file with 33 additions and 28 deletions.
61 changes: 33 additions & 28 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,37 +274,42 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("coalesced RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
val rnd = scala.util.Random
val partitions = 10000
val numMachines = 50
val machines = mutable.ListBuffer[String]()
(1 to numMachines).foreach(machines += "m"+_)

val blocks = (1 to partitions).map(i =>
{ (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )

val data2 = sc.makeRDD(blocks)
val coalesced2 = data2.coalesce(numMachines*2)

// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.0)((perc, loc) => math.min(perc,loc))
assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")

// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
.foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)

val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
val coalesced3 = data3.coalesce(numMachines*2)
val minLocality2 = coalesced3.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.0)((perc, loc) => math.min(perc,loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
(minLocality2*100.0).toInt + "%")
(1 to numMachines).foreach(machines += "m" + _)
val rnd = scala.util.Random
for (seed <- 1 to 5) {
rnd.setSeed(seed)

val blocks = (1 to partitions).map { i =>
(i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList)
}

val data2 = sc.makeRDD(blocks)
val coalesced2 = data2.coalesce(numMachines * 2)

// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.0)((perc, loc) => math.min(perc, loc))
assert(minLocality >= 0.90, "Expected 90% locality but got " +
(minLocality * 100.0).toInt + "%")

// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
.foldLeft(0)((dev, curr) => math.max(math.abs(100 - curr), dev))
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)

val data3 = sc.makeRDD(blocks).map(i => i * 2) // derived RDD to test *current* pref locs
val coalesced3 = data3.coalesce(numMachines * 2)
val minLocality2 = coalesced3.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.0)((perc, loc) => math.min(perc, loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
(minLocality2 * 100.0).toInt + "%")
}
}

test("zipped RDDs") {
Expand Down

0 comments on commit 59bc16f

Please sign in to comment.