Skip to content

Commit

Permalink
review commit
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 10, 2015
1 parent c0087e0 commit 1649850
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
30 changes: 15 additions & 15 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
@volatile private var stopped = false

/** Attach a listener object to get information of when objects are cleaned. */
def attachListener(listener: CleanerListener) {
def attachListener(listener: CleanerListener): Unit = {
listeners += listener
}

/** Start the cleaner. */
def start() {
def start(): Unit = {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
Expand All @@ -109,7 +109,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/**
* Stop the cleaning thread and wait until the thread has finished running its current task.
*/
def stop() {
def stop(): Unit = {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
Expand All @@ -122,7 +122,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Register a RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]) {
def registerRDDForCleanup(rdd: RDD[_]): Unit = {
registerForCleanup(rdd, CleanRDD(rdd.id))
}

Expand All @@ -131,22 +131,22 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}

/** Register a Broadcast for cleanup when it is garbage collected. */
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
}

/** Register a RDDCheckpointData for cleanup when it is garbage collected. */
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int) {
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
registerForCleanup(rdd, CleanCheckpoint(parentId))
}

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
}

Expand All @@ -171,7 +171,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId, blocking = blockOnCleanupTasks)
doCleanCheckpoint(rddId)
}
}
}
Expand All @@ -183,7 +183,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform RDD cleanup. */
def doCleanupRDD(rddId: Int, blocking: Boolean) {
def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning RDD " + rddId)
sc.unpersistRDD(rddId, blocking)
Expand All @@ -195,7 +195,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform shuffle cleanup, asynchronously. */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Expand All @@ -208,7 +208,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform broadcast cleanup. */
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
try {
logDebug(s"Cleaning broadcast $broadcastId")
broadcastManager.unbroadcast(broadcastId, true, blocking)
Expand All @@ -220,7 +220,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform accumulator cleanup. */
def doCleanupAccum(accId: Long, blocking: Boolean) {
def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
try {
logDebug("Cleaning accumulator " + accId)
Accumulators.remove(accId)
Expand All @@ -232,10 +232,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform checkpoint cleanup. */
def doCleanCheckpoint(rddId: Int, blocking: Boolean) {
def doCleanCheckpoint(rddId: Int): Unit = {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc,rddId, blocking)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,10 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])

private[spark] object RDDCheckpointData {
def rddCheckpointDataPath(sc: SparkContext, rddId: Int): Option[Path] = {
if (sc.checkpointDir.isDefined) {
Some(new Path(sc.checkpointDir.get, "rdd-" + rddId))
} else {
None
}
sc.checkpointDir.map { dir => new Path(dir, "rdd-" + rddId) }
}

def clearRDDCheckpointData(sc: SparkContext, rddId: Int, blocking: Boolean = true) = {
def clearRDDCheckpointData(sc: SparkContext, rddId: Int): Unit = {
rddCheckpointDataPath(sc, rddId).foreach { path =>
val fs = path.getFileSystem(sc.hadoopConfiguration)
if (fs.exists(path)) {
Expand Down

0 comments on commit 1649850

Please sign in to comment.