Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 6, 2014
2 parents cde6af9 + 8d85359 commit d150431
Show file tree
Hide file tree
Showing 69 changed files with 1,184 additions and 468 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))


/**
* Randomly splits this RDD with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1
*
* @return split RDDs in an array
*/
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
randomSplit(weights, Utils.random.nextLong)

/**
* Randomly splits this RDD with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1
* @param seed random seed
*
* @return split RDDs in an array
*/
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
rdd.randomSplit(weights, seed).map(wrapRDD)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
Expand Down Expand Up @@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
import org.apache.spark.util.Utils
import org.apache.spark.util.{SystemClock, Clock, Utils}

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
Expand All @@ -61,7 +61,8 @@ class DAGScheduler(
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv)
env: SparkEnv,
clock: Clock = SystemClock)
extends Logging {

import DAGScheduler._
Expand Down Expand Up @@ -781,7 +782,7 @@ class DAGScheduler(
logDebug("New pending tasks: " + myPending)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
stageToInfos(stage).submissionTime = Some(clock.getTime())
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
Expand All @@ -807,11 +808,11 @@ class DAGScheduler(

def markStageAsFinished(stage: Stage) = {
val serviceTime = stageToInfos(stage).submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
case _ => "Unknown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
stageToInfos(stage).completionTime = Some(clock.getTime())
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
runningStages -= stage
}
Expand Down Expand Up @@ -1015,7 +1016,7 @@ class DAGScheduler(
return
}
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
stageToInfos(failedStage).completionTime = Some(clock.getTime())
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ private[spark] class FileLogger(
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None

// The Hadoop APIs have changed over time, so we use reflection to figure out
// the correct method to use to flush a hadoop data stream. See SPARK-1518
// for details.
private val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
}

private var writer: Option[PrintWriter] = None

/**
Expand Down Expand Up @@ -149,13 +157,13 @@ private[spark] class FileLogger(
/**
* Flush the writer to disk manually.
*
* If the Hadoop FileSystem is used, the underlying FSDataOutputStream (r1.0.4) must be
* sync()'ed manually as it does not support flush(), which is invoked by when higher
* level streams are flushed.
* When using a Hadoop filesystem, we need to invoke the hflush or sync
* method. In HDFS, hflush guarantees that the data gets to all the
* DataNodes.
*/
def flush() {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.sync())
hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
import java.util.Comparator

import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
private val inputStreams = Seq(sortedMap) ++ spilledMaps
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
Expand All @@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
while (it.hasNext && kc._1.hashCode() == minHash) {
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
Expand Down Expand Up @@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
private class StreamBuffer(
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {

def isEmpty = pairs.length == 0
Expand Down
Loading

0 comments on commit d150431

Please sign in to comment.