Skip to content

Commit

Permalink
[SPARK-20607][CORE] Add new unit tests to ShuffleSuite
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR update to two:
1.adds the new unit tests.
  testing would be performed when there is no shuffle stage,
  shuffle will not generate the data file and the index files.
2.Modify the '[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file' unit test,
  parallelize is 1 but not is 2, Check the index file and delete.

## How was this patch tested?
The new unit test.

Author: caoxuewen <[email protected]>

Closes #17868 from heary-cao/ShuffleSuite.
  • Loading branch information
heary-cao authored and srowen committed May 19, 2017
1 parent 3f2cd51 commit f398640
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
import org.apache.spark.util.{MutablePair, Utils}

abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
Expand Down Expand Up @@ -277,19 +277,45 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// Delete one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
assert(hashFile.exists() || sortFile.exists())
val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))

if (hashFile.exists()) {
hashFile.delete()
}
if (sortFile.exists()) {
sortFile.delete()
}
if (indexFile.exists()) {
indexFile.delete()
}

// This count should retry the execution of the previous stage and rerun shuffle.
rdd.count()
}

test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)

// Cannot find one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())

rdd.count()

// Can find one of the local shuffle blocks.
val hashExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleBlockId(0, 0, 0))
val sortExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists()))
}

test("metrics for shuffle without aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val numRecords = 10000
Expand Down

0 comments on commit f398640

Please sign in to comment.