Skip to content


Merge branch 'master' into api
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Apr 9, 2014
2 parents ef1a717 + 9689b66 commit 555e0fe
Show file tree
Hide file tree
Showing 191 changed files with 6,494 additions and 2,719 deletions.
2 changes: 1 addition & 1 deletion .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ work
20 changes: 12 additions & 8 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,31 @@ object Bagel extends Logging {
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
val processed = grouped.flatMapValues {
case (_, vs) if vs.size == 0 => None
case (c, vs) =>
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
.flatMapValues {
case (_, vs) if !vs.hasNext => None
case (c, vs) => {
val (newVert, newMsgs) =
compute(vs(0), c match {
case Seq(comb) => Some(comb)
case Seq() => None
c.hasNext match {
case true => Some(
case false => None

numMsgs += newMsgs.size
if ( {
numActiveVerts += 1

Some((newVert, newMsgs))

// Force evaluation of processed RDD for accurate performance measurements
Expand Down
2 changes: 1 addition & 1 deletion bin/
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fi
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ | grep "datanucleus-.*\\.jar" | wc -l)
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ 2>/dev/null | grep "datanucleus-.*\\.jar" | wc -l)
if [ $num_datanucleus_jars -gt 0 ]; then
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
Expand Down
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ set -o posix
FWDIR="$(cd `dirname $0`/..; pwd)"


Expand Down Expand Up @@ -64,7 +64,7 @@ ${txtbld}OPTIONS${txtrst}:
is followed by m for megabytes or g for gigabytes, e.g. "1g".
-dm --driver-memory : The memory used by the Spark Shell, the number is followed
by m for megabytes or g for gigabytes, e.g. "1g".
-m --master : A full string that describes the Spark Master, defaults to "local"
-m --master : A full string that describes the Spark Master, defaults to "local[*]"
e.g. "spark://localhost:7077".
--log-conf : Enables logging of the supplied SparkConf as INFO at start of the
Spark Context.
Expand Down
192 changes: 192 additions & 0 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package org.apache.spark

import java.lang.ref.{ReferenceQueue, WeakReference}

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

* Classes that represent cleaning tasks.
private sealed trait CleanupTask
private case class CleanRDD(rddId: Int) extends CleanupTask
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask

* A WeakReference associated with a CleanupTask.
* When the referent object becomes only weakly reachable, the corresponding
* CleanupTaskWeakReference is automatically added to the given reference queue.
private class CleanupTaskWeakReference(
val task: CleanupTask,
referent: AnyRef,
referenceQueue: ReferenceQueue[AnyRef])
extends WeakReference(referent, referenceQueue)

* An asynchronous cleaner for RDD, shuffle, and broadcast state.
* This maintains a weak reference for each RDD, ShuffleDependency, and Broadcast of interest,
* to be processed when the associated object goes out of scope of the application. Actual
* cleanup is performed in a separate daemon thread.
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference]
with SynchronizedBuffer[CleanupTaskWeakReference]

private val referenceQueue = new ReferenceQueue[AnyRef]

private val listeners = new ArrayBuffer[CleanerListener]
with SynchronizedBuffer[CleanerListener]

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

* Whether the cleaning thread will block on cleanup tasks.
* This is set to true only for tests.
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", false)

@volatile private var stopped = false

/** Attach a listener object to get information of when objects are cleaned. */
def attachListener(listener: CleanerListener) {
listeners += listener

/** Start the cleaner. */
def start() {
cleaningThread.setName("Spark Context Cleaner")

/** Stop the cleaner. */
def stop() {
stopped = true

/** Register a RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]) {
registerForCleanup(rdd, CleanRDD(

/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _]) {
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))

/** Register a Broadcast for cleanup when it is garbage collected. */
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
registerForCleanup(broadcast, CleanBroadcast(

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning() {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference]) { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
} catch {
case t: Throwable => logError("Error in cleaning thread", t)

/** Perform RDD cleanup. */
def doCleanupRDD(rddId: Int, blocking: Boolean) {
try {
logDebug("Cleaning RDD " + rddId)
sc.unpersistRDD(rddId, blocking)
logInfo("Cleaned RDD " + rddId)
} catch {
case t: Throwable => logError("Error cleaning RDD " + rddId, t)

/** Perform shuffle cleanup, asynchronously. */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
try {
logDebug("Cleaning shuffle " + shuffleId)
blockManagerMaster.removeShuffle(shuffleId, blocking)
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)

/** Perform broadcast cleanup. */
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
try {
logDebug("Cleaning broadcast " + broadcastId)
broadcastManager.unbroadcast(broadcastId, true, blocking)
logInfo("Cleaned broadcast " + broadcastId)
} catch {
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

// Used for testing. These methods explicitly blocks until cleanup is completed
// to ensure that more reliable testing.

private object ContextCleaner {
private val REF_QUEUE_POLL_TIMEOUT = 100

* Listener class used for testing when any item has been cleaned by the Cleaner class.
private[spark] trait CleanerListener {
def rddCleaned(rddId: Int)
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ShuffleDependency[K, V](
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()


Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
private def awaitResult(): Try[T] = {
jobWaiter.awaitResult() match {
case JobSucceeded => scala.util.Success(resultFunc)
case JobFailed(e: Exception, _) => scala.util.Failure(e)
case JobFailed(e: Exception) => scala.util.Failure(e)
Expand Down

0 comments on commit 555e0fe

Please sign in to comment.