Skip to content

Commit

Permalink
[SPARK-26180][CORE][TEST] Reuse withTempDir function to the SparkCore…
Browse files Browse the repository at this point in the history
… test case

## What changes were proposed in this pull request?

Currently, the common `withTempDir` function is used in Spark SQL test cases. To handle `val dir = Utils. createTempDir()` and `Utils. deleteRecursively (dir)`. Unfortunately, the `withTempDir` function cannot be used in the Spark Core test case. This PR Sharing `withTempDir` function in Spark Sql and SparkCore  to clean up SparkCore test cases. thanks.

## How was this patch tested?

N / A

Closes #23151 from heary-cao/withCreateTempDir.

Authored-by: caoxuewen <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
heary-cao authored and HyukjinKwon committed Dec 1, 2018
1 parent 2f6e88f commit 327ac83
Show file tree
Hide file tree
Showing 23 changed files with 858 additions and 887 deletions.
5 changes: 1 addition & 4 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,7 @@ object CheckpointSuite {
class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {

test("checkpoint compression") {
val checkpointDir = Utils.createTempDir()
try {
withTempDir { checkpointDir =>
val conf = new SparkConf()
.set("spark.checkpoint.compress", "true")
.set("spark.ui.enabled", "false")
Expand Down Expand Up @@ -616,8 +615,6 @@ class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {

// Verify that the compressed content can be read back
assert(rdd.collect().toSeq === (1 to 20))
} finally {
Utils.deleteRecursively(checkpointDir)
}
}
}
97 changes: 49 additions & 48 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,54 +207,55 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}

test("automatically cleanup normal checkpoint") {
val checkpointDir = Utils.createTempDir()
checkpointDir.delete()
var rdd = newPairRDD()
sc.setCheckpointDir(checkpointDir.toString)
rdd.checkpoint()
rdd.cache()
rdd.collect()
var rddId = rdd.id

// Confirm the checkpoint directory exists
assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
val fs = path.getFileSystem(sc.hadoopConfiguration)
assert(fs.exists(path))

// the checkpoint is not cleaned by default (without the configuration set)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope, ok if collected earlier
runGC()
postGCTester.assertCleanup()
assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))

// Verify that checkpoints are NOT cleaned up if the config is not enabled
sc.stop()
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("cleanupCheckpoint")
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
sc = new SparkContext(conf)
rdd = newPairRDD()
sc.setCheckpointDir(checkpointDir.toString)
rdd.checkpoint()
rdd.cache()
rdd.collect()
rddId = rdd.id

// Confirm the checkpoint directory exists
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))

// Reference rdd to defeat any early collection by the JVM
rdd.count()

// Test that GC causes checkpoint data cleanup after dereferencing the RDD
postGCTester = new CleanerTester(sc, Seq(rddId))
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
withTempDir { checkpointDir =>
checkpointDir.delete()
var rdd = newPairRDD()
sc.setCheckpointDir(checkpointDir.toString)
rdd.checkpoint()
rdd.cache()
rdd.collect()
var rddId = rdd.id

// Confirm the checkpoint directory exists
assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
val fs = path.getFileSystem(sc.hadoopConfiguration)
assert(fs.exists(path))

// the checkpoint is not cleaned by default (without the configuration set)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope, ok if collected earlier
runGC()
postGCTester.assertCleanup()
assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))

// Verify that checkpoints are NOT cleaned up if the config is not enabled
sc.stop()
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("cleanupCheckpoint")
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
sc = new SparkContext(conf)
rdd = newPairRDD()
sc.setCheckpointDir(checkpointDir.toString)
rdd.checkpoint()
rdd.cache()
rdd.collect()
rddId = rdd.id

// Confirm the checkpoint directory exists
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))

// Reference rdd to defeat any early collection by the JVM
rdd.count()

// Test that GC causes checkpoint data cleanup after dereferencing the RDD
postGCTester = new CleanerTester(sc, Seq(rddId))
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
}
}

test("automatically clean up local checkpoint") {
Expand Down
19 changes: 10 additions & 9 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,18 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
.set("spark.files.openCostInBytes", "0")
.set("spark.default.parallelism", "1"))

val tempDir = Utils.createTempDir()
val tempDirPath = tempDir.getAbsolutePath
withTempDir { tempDir =>
val tempDirPath = tempDir.getAbsolutePath

for (i <- 0 until 8) {
val tempFile = new File(tempDir, s"part-0000$i")
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
StandardCharsets.UTF_8)
}
for (i <- 0 until 8) {
val tempFile = new File(tempDir, s"part-0000$i")
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
StandardCharsets.UTF_8)
}

for (p <- Seq(1, 2, 8)) {
assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
for (p <- Seq(1, 2, 8)) {
assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
}
}
}

Expand Down
Loading

0 comments on commit 327ac83

Please sign in to comment.