Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Jul 20, 2014
2 parents 6635467 + 98ab411 commit 717c9c3
Show file tree
Hide file tree
Showing 15 changed files with 448 additions and 174 deletions.
28 changes: 10 additions & 18 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import org.apache.spark.shuffle.ShuffleHandle
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable


/**
Expand All @@ -38,47 +36,41 @@ abstract class Dependency[T] extends Serializable {
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd
}


/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleId: Int = rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
val shuffleHandle: ShuffleHandle = rdd.context.env.shuffleManager.registerShuffle(
shuffleId, rdd.partitions.size, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}


Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,8 @@ class SparkContext(config: SparkConf) extends Logging {
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:

val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
value match { case (v, depNum) => newCombiner(depNum) += v }
newCombiner(value._2) += value._1
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
value match { case (v, depNum) => combiner(depNum) += v }
combiner(value._2) += value._1
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
override val partitioner = firstParent[Product2[K, U]].partitioner

override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
}
}
60 changes: 30 additions & 30 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]

val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
m2.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
m1
} : JHashMap[K, V]
Expand Down Expand Up @@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
}
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
)
}

/**
Expand All @@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.map(v => (v, None))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
}
}
}
Expand All @@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
ws.map(w => (None, w))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.map(w => (None, w))
} else {
for (v <- vs; w <- ws) yield (Some(v), w)
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
}
}
}
Expand Down Expand Up @@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach { case (k, v) => map.put(k, v) }
data.foreach { pair => map.put(pair._1, pair._2) }
map
}

Expand Down Expand Up @@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
}
}

Expand All @@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
cg.mapValues { case Seq(vs, w1s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
}
}

Expand All @@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
}
}

Expand Down Expand Up @@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
Expand Down Expand Up @@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
val pair = iter.next()
writer.write(pair._1, pair._2)
}
} finally {
writer.close(hadoopContext)
Expand Down
30 changes: 7 additions & 23 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}

Expand Down Expand Up @@ -1196,36 +1195,21 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD has been checkpointed or not
*/
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
def isCheckpointed: Boolean = {
checkpointData.map(_.isCheckpointed).getOrElse(false)
}

/**
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile)
def getCheckpointFile: Option[String] = {
checkpointData.flatMap(_.getCheckpointFile)
}

// =======================================================================
// Other internal methods and fields
// =======================================================================

/**
* Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast
* the serialized copy of the RDD and for each task we will deserialize it, which means each
* task gets a different copy of the RDD. This provides stronger isolation between tasks that
* might modify state of objects referenced in their closures. This is necessary in Hadoop
* where the JobConf/Configuration object is not thread-safe.
*/
@transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = {
val ser = SparkEnv.get.closureSerializer.newInstance()
val bytes = ser.serialize(this).array()
val size = Utils.bytesToString(bytes.length)
if (bytes.length > (1L << 20)) {
logWarning(s"Broadcasting RDD $id ($size), which contains large objects")
} else {
logDebug(s"Broadcasting RDD $id ($size)")
}
sc.broadcast(bytes)
}

private var storageLevel: StorageLevel = StorageLevel.NONE

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
RDDCheckpointData.clearTaskCaches()
}
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}
Expand All @@ -130,5 +131,9 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
}
}

// Used for synchronization
private[spark] object RDDCheckpointData
private[spark] object RDDCheckpointData {
def clearTaskCaches() {
ShuffleMapTask.clearCache()
ResultTask.clearCache()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ class DAGScheduler(
stageIdToStage -= stageId
stageIdToJobIds -= stageId

ShuffleMapTask.removeStage(stageId)
ResultTask.removeStage(stageId)

logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}
Expand Down Expand Up @@ -720,6 +723,7 @@ class DAGScheduler(
}
}


/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down
Loading

0 comments on commit 717c9c3

Please sign in to comment.