{e}} + .getOrElse(Seq.empty) + }.getOrElse(defaultContent) + + UIUtils.basicSparkPage(content, title) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index a18b39fc95d64..16aa0493370dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -21,7 +21,7 @@ import org.apache.spark.Logging import org.apache.spark.deploy.master.Master import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.AkkaUtils /** * Web UI server for the standalone master. @@ -38,6 +38,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) /** Initialize all components of the server. */ def initialize() { attachPage(new ApplicationPage(this)) + attachPage(new HistoryNotFoundPage(this)) attachPage(new MasterPage(this)) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) master.masterMetricsSystem.getServletHandlers.foreach(attachHandler) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8d31bd05fdbec..b455c9fcf4bd6 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -71,7 +71,7 @@ private[spark] class CoarseGrainedExecutorBackend( val ser = SparkEnv.get.closureSerializer.newInstance() val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) - executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask) } case KillTask(taskId, _, interruptThread) => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4d3ba11633bf5..b16133b20cc02 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -107,8 +107,9 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] - def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { - val tr = new TaskRunner(context, taskId, serializedTask) + def launchTask( + context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) { + val tr = new TaskRunner(context, taskId, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) } @@ -135,14 +136,15 @@ private[spark] class Executor( localDirs } - class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) + class TaskRunner( + execBackend: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) extends Runnable { @volatile private var killed = false @volatile private var task: Task[Any] = _ def kill(interruptThread: Boolean) { - logInfo("Executor is trying to kill task " + taskId) + logInfo(s"Executor is trying to kill $taskName (TID $taskId)") killed = true if (task != null) { task.kill(interruptThread) @@ -154,7 +156,7 @@ private[spark] class Executor( SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) val ser = SparkEnv.get.closureSerializer.newInstance() - logInfo("Running task ID " + taskId) + logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var attemptedTask: Option[Task[Any]] = None var taskStart: Long = 0 @@ -207,25 +209,30 @@ private[spark] class Executor( val accumUpdates = Accumulators.values - val directResult = new DirectTaskResult(valueBytes, accumUpdates, - task.metrics.getOrElse(null)) + val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) - logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) - val serializedResult = { - if (serializedDirectResult.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { - logInfo("Storing result for " + taskId + " in local BlockManager") + val resultSize = serializedDirectResult.limit + + // directSend = sending directly back to the driver + val (serializedResult, directSend) = { + if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) - ser.serialize(new IndirectTaskResult[Any](blockId)) + (ser.serialize(new IndirectTaskResult[Any](blockId)), false) } else { - logInfo("Sending result for " + taskId + " directly to driver") - serializedDirectResult + (serializedDirectResult, true) } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) - logInfo("Finished task ID " + taskId) + + if (directSend) { + logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") + } else { + logInfo( + s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") + } } catch { case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason @@ -233,7 +240,7 @@ private[spark] class Executor( } case _: TaskKilledException | _: InterruptedException if task.killed => { - logInfo("Executor killed task " + taskId) + logInfo(s"Executor killed $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) } @@ -241,7 +248,7 @@ private[spark] class Executor( // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. - logError("Exception in task ID " + taskId, t) + logError(s"Exception in $taskName (TID $taskId)", t) val serviceTime = System.currentTimeMillis() - taskStart val metrics = attemptedTask.flatMap(t => t.metrics) @@ -249,7 +256,7 @@ private[spark] class Executor( m.executorRunTime = serviceTime m.jvmGCTime = gcTime - startGCTime } - val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) + val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // Don't forcibly exit unless the exception was inherently fatal, to avoid diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 2232e6237bf26..a42c8b43bbf7f 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -64,7 +64,7 @@ private[spark] class MesosExecutorBackend if (executor == null) { logError("Received launchTask but executor was null") } else { - executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + executor.launchTask(this, taskId, taskInfo.getName, taskInfo.getData.asReadOnlyByteBuffer) } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index ac73288442a74..5d59e00636ee6 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -75,7 +75,9 @@ class TaskMetrics extends Serializable { /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ - var shuffleReadMetrics: Option[ShuffleReadMetrics] = None + private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None + + def shuffleReadMetrics = _shuffleReadMetrics /** * If this task writes to shuffle output, metrics on the written shuffle data will be collected @@ -87,6 +89,22 @@ class TaskMetrics extends Serializable { * Storage statuses of any blocks that have been updated as a result of this task. */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + + /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */ + def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized { + _shuffleReadMetrics match { + case Some(existingMetrics) => + existingMetrics.shuffleFinishTime = math.max( + existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime) + existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime + existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched + existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched + existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched + existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead + case None => + _shuffleReadMetrics = Some(newMetrics) + } + } } private[spark] object TaskMetrics { diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 4b0fe1ab82999..1b66218d86dd9 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -20,6 +20,7 @@ package org.apache.spark.io import java.io.{InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -55,7 +56,28 @@ private[spark] object CompressionCodec { ctor.newInstance(conf).asInstanceOf[CompressionCodec] } - val DEFAULT_COMPRESSION_CODEC = classOf[LZFCompressionCodec].getName + val DEFAULT_COMPRESSION_CODEC = classOf[SnappyCompressionCodec].getName +} + + +/** + * :: DeveloperApi :: + * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]]. + * Block size can be configured by `spark.io.compression.lz4.block.size`. + * + * Note: The wire protocol for this codec is not guaranteed to be compatible across versions + * of Spark. This is intended for use as an internal compression utility within a single Spark + * application. + */ +@DeveloperApi +class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { + + override def compressedOutputStream(s: OutputStream): OutputStream = { + val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768) + new LZ4BlockOutputStream(s, blockSize) + } + + override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s) } @@ -81,7 +103,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { /** * :: DeveloperApi :: * Snappy implementation of [[org.apache.spark.io.CompressionCodec]]. - * Block size can be configured by spark.io.compression.snappy.block.size. + * Block size can be configured by `spark.io.compression.snappy.block.size`. * * Note: The wire protocol for this codec is not guaranteed to be compatible across versions * of Spark. This is intended for use as an internal compression utility within a single Spark diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 5951865e56c9d..aca235a62a6a8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -170,17 +170,22 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: val createCombiner: (CoGroupValue => CoGroupCombiner) = value => { val newCombiner = Array.fill(numRdds)(new CoGroup) - value match { case (v, depNum) => newCombiner(depNum) += v } + newCombiner(value._2) += value._1 newCombiner } val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (combiner, value) => { - value match { case (v, depNum) => combiner(depNum) += v } + combiner(value._2) += value._1 combiner } val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (combiner1, combiner2) => { - combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 } + var depNum = 0 + while (depNum < numRdds) { + combiner1(depNum) ++= combiner2(depNum) + depNum += 1 + } + combiner1 } new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( createCombiner, mergeValue, mergeCombiners) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index c45b759f007cc..e7221e3032c11 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup addPartToPGroup(nxt_part, pgroup) - groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple + groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple numCreated += 1 } } @@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc var (nxt_replica, nxt_part) = rotIt.next() val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup - groupHash.get(nxt_replica).get += pgroup + groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup var tries = 0 while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part nxt_part = rotIt.next()._2 diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 041028514399b..e521612ffc27c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -140,8 +140,8 @@ class HadoopRDD[K, V]( // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) - conf.synchronized { + // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456). + HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { val newJobConf = new JobConf(conf) initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) @@ -246,6 +246,9 @@ class HadoopRDD[K, V]( } private[spark] object HadoopRDD { + /** Constructing Configuration objects is not threadsafe, use this lock to serialize. */ + val CONFIGURATION_INSTANTIATION_LOCK = new Object() + /** * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala index 2bc47eb9fcd74..a60952eee5901 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala @@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U) override val partitioner = firstParent[Product2[K, U]].partitioner override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = { - firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) } + firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index fc9beb166befe..a6b920467283e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -125,7 +125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() - def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) + val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner) } @@ -171,7 +171,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() - def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) + val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner) } @@ -214,22 +214,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) throw new SparkException("reduceByKeyLocally() does not support array keys") } - def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { + val reducePartition = (iter: Iterator[(K, V)]) => { val map = new JHashMap[K, V] - iter.foreach { case (k, v) => - val old = map.get(k) - map.put(k, if (old == null) v else func(old, v)) + iter.foreach { pair => + val old = map.get(pair._1) + map.put(pair._1, if (old == null) pair._2 else func(old, pair._2)) } Iterator(map) - } + } : Iterator[JHashMap[K, V]] - def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = { - m2.foreach { case (k, v) => - val old = m1.get(k) - m1.put(k, if (old == null) v else func(old, v)) + val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => { + m2.foreach { pair => + val old = m1.get(pair._1) + m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2)) } m1 - } + } : JHashMap[K, V] self.mapPartitions(reducePartition).reduce(mergeMaps) } @@ -353,19 +353,19 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * - * Note: If you are grouping in order to perform an aggregation (such as a sum or average) over - * each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]] - * will provide much better performance. + * Note: This operation may be very expensive. If you are grouping in order to perform an + * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] + * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 + val createCombiner = (v: V) => ArrayBuffer(v) + val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v + val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) + createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false) bufs.mapValues(_.toIterable) } @@ -373,9 +373,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. * - * Note: If you are grouping in order to perform an aggregation (such as a sum or average) over - * each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]] - * will provide much better performance. + * Note: This operation may be very expensive. If you are grouping in order to perform an + * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] + * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions)) @@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { - this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - for (v <- vs; w <- ws) yield (v, w) - } + this.cogroup(other, partitioner).flatMapValues( pair => + for (v <- pair._1; w <- pair._2) yield (v, w) + ) } /** @@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * partition the output RDD. */ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { - this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - if (ws.isEmpty) { - vs.map(v => (v, None)) + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._2.isEmpty) { + pair._1.map(v => (v, None)) } else { - for (v <- vs; w <- ws) yield (v, Some(w)) + for (v <- pair._1; w <- pair._2) yield (v, Some(w)) } } } @@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { - this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - if (vs.isEmpty) { - ws.map(w => (None, w)) + this.cogroup(other, partitioner).flatMapValues { pair => + if (pair._1.isEmpty) { + pair._2.map(w => (None, w)) } else { - for (v <- vs; w <- ws) yield (Some(v), w) + for (v <- pair._1; w <- pair._2) yield (Some(v), w) } } } @@ -462,9 +462,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. * - * Note: If you are grouping in order to perform an aggregation (such as a sum or average) over - * each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]] - * will provide much better performance, + * Note: This operation may be very expensive. If you are grouping in order to perform an + * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] + * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupByKey(): RDD[(K, Iterable[V])] = { groupByKey(defaultPartitioner(self)) @@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val data = self.collect() val map = new mutable.HashMap[K, V] map.sizeHint(data.length) - data.foreach { case (k, v) => map.put(k, v) } + data.foreach { pair => map.put(pair._1, pair._2) } map } @@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner) cg.mapValues { case Seq(vs, w1s, w2s, w3s) => - (vs.asInstanceOf[Seq[V]], - w1s.asInstanceOf[Seq[W1]], - w2s.asInstanceOf[Seq[W2]], - w3s.asInstanceOf[Seq[W3]]) + (vs.asInstanceOf[Seq[V]], + w1s.asInstanceOf[Seq[W1]], + w2s.asInstanceOf[Seq[W2]], + w3s.asInstanceOf[Seq[W3]]) } } @@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) - cg.mapValues { case Seq(vs, ws) => - (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) + cg.mapValues { case Seq(vs, w1s) => + (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]]) } } @@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Seq(vs, w1s, w2s) => (vs.asInstanceOf[Seq[V]], - w1s.asInstanceOf[Seq[W1]], - w2s.asInstanceOf[Seq[W2]]) + w1s.asInstanceOf[Seq[W1]], + w2s.asInstanceOf[Seq[W2]]) } } @@ -710,14 +710,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) self.partitioner match { case Some(p) => val index = p.getPartition(key) - def process(it: Iterator[(K, V)]): Seq[V] = { + val process = (it: Iterator[(K, V)]) => { val buf = new ArrayBuffer[V] - for ((k, v) <- it if k == key) { - buf += v + for (pair <- it if pair._1 == key) { + buf += pair._2 } buf - } - val res = self.context.runJob(self, process _, Array(index), false) + } : Seq[V] + val res = self.context.runJob(self, process, Array(index), false) res(0) case None => self.filter(_._1 == key).map(_._2).collect() @@ -840,7 +840,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) jobFormat.checkOutputSpecs(job) } - def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { + val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -858,22 +858,21 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] try { while (iter.hasNext) { - val (k, v) = iter.next() - writer.write(k, v) + val pair = iter.next() + writer.write(pair._1, pair._2) } - } - finally { + } finally { writer.close(hadoopContext) } committer.commitTask(hadoopContext) - return 1 - } + 1 + } : Int val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) - self.context.runJob(self, writeShard _) + self.context.runJob(self, writeShard) jobCommitter.commitJob(jobTaskContext) } @@ -912,7 +911,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() - def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { + val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -921,19 +920,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.open() try { var count = 0 - while(iter.hasNext) { + while (iter.hasNext) { val record = iter.next() count += 1 writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) } - } - finally { + } finally { writer.close() } writer.commit() } - self.context.runJob(self, writeToFile _) + self.context.runJob(self, writeToFile) writer.commitJob() } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4e841bc992bff..88a918aebf763 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -328,7 +328,7 @@ abstract class RDD[T: ClassTag]( : RDD[T] = { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ - def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = { + val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner @@ -336,7 +336,7 @@ abstract class RDD[T: ClassTag]( position = position + 1 (position, t) } - } + } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( @@ -509,6 +509,10 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. + * + * Note: This operation may be very expensive. If you are grouping in order to perform an + * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] + * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) @@ -516,6 +520,10 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. + * + * Note: This operation may be very expensive. If you are grouping in order to perform an + * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] + * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy(f, new HashPartitioner(numPartitions)) @@ -523,6 +531,10 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. + * + * Note: This operation may be very expensive. If you are grouping in order to perform an + * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] + * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) : RDD[(K, Iterable[T])] = { @@ -907,19 +919,19 @@ abstract class RDD[T: ClassTag]( throw new SparkException("countByValue() does not support arrays") } // TODO: This should perhaps be distributed by default. - def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = { + val countPartition = (iter: Iterator[T]) => { val map = new OpenHashMap[T,Long] iter.foreach { t => map.changeValue(t, 1L, _ + 1L) } Iterator(map) - } - def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = { + }: Iterator[OpenHashMap[T,Long]] + val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => { m2.foreach { case (key, value) => m1.changeValue(key, value, _ + value) } m1 - } + }: OpenHashMap[T,Long] val myResult = mapPartitions(countPartition).reduce(mergeMaps) // Convert to a Scala mutable map val mutableResult = scala.collection.mutable.Map[T,Long]() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f72bfde572c96..ede3c7d9f01ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -816,7 +816,6 @@ class DAGScheduler( } event.reason match { case Success => - logInfo("Completed " + task) if (event.accumUpdates != null) { // TODO: fail the stage if the accumulator update fails... Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 6a6d8e609bc39..e41e0a9841691 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend { def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException + def isReady(): Boolean = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 29de0453ac19a..ca0595f35143e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -84,6 +84,8 @@ class TaskInfo( } } + def id: String = s"$index.$attempt" + def duration: Long = { if (!finished) { throw new UnsupportedOperationException("duration() called on unfinished task") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5ed2803d76afc..be3673c48eda8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -88,6 +88,8 @@ private[spark] class TaskSchedulerImpl( // in turn is used to decide when we can attain data locality on a given host private val executorsByHost = new HashMap[String, HashSet[String]] + protected val hostsByRack = new HashMap[String, HashSet[String]] + private val executorIdToHost = new HashMap[String, String] // Listener object to pass upcalls into @@ -145,6 +147,10 @@ private[spark] class TaskSchedulerImpl( } } + override def postStartHook() { + waitBackendReady() + } + override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") @@ -219,6 +225,9 @@ private[spark] class TaskSchedulerImpl( executorAdded(o.executorId, o.host) newExecAvail = true } + for (rack <- getRackForHost(o.host)) { + hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host + } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. @@ -414,6 +423,12 @@ private[spark] class TaskSchedulerImpl( execs -= executorId if (execs.isEmpty) { executorsByHost -= host + for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) { + hosts -= host + if (hosts.isEmpty) { + hostsByRack -= rack + } + } } executorIdToHost -= executorId rootPool.executorLost(executorId, host) @@ -431,12 +446,27 @@ private[spark] class TaskSchedulerImpl( executorsByHost.contains(host) } + def hasHostAliveOnRack(rack: String): Boolean = synchronized { + hostsByRack.contains(rack) + } + def isExecutorAlive(execId: String): Boolean = synchronized { activeExecutorIds.contains(execId) } // By default, rack is unknown def getRackForHost(value: String): Option[String] = None + + private def waitBackendReady(): Unit = { + if (backend.isReady) { + return + } + while (!backend.isReady) { + synchronized { + this.wait(100) + } + } + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 059cc9085a2e7..8b5e8cb802a45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -26,8 +26,7 @@ import scala.collection.mutable.HashSet import scala.math.max import scala.math.min -import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, - SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} +import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.{Clock, SystemClock} @@ -52,8 +51,8 @@ private[spark] class TaskSetManager( val taskSet: TaskSet, val maxTaskFailures: Int, clock: Clock = SystemClock) - extends Schedulable with Logging -{ + extends Schedulable with Logging { + val conf = sched.sc.conf /* @@ -191,7 +190,9 @@ private[spark] class TaskSetManager( addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) - hadAliveLocations = true + if(sched.hasHostAliveOnRack(rack)){ + hadAliveLocations = true + } } } @@ -401,14 +402,11 @@ private[spark] class TaskSetManager( // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() - // Figure out whether this should count as a preferred launch - logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format( - taskSet.id, index, taskId, execId, host, taskLocality)) // Do various bookkeeping copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size - val info = new TaskInfo( - taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative) + val info = new TaskInfo(taskId, index, attemptNum, curTime, + execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling @@ -427,11 +425,15 @@ private[spark] class TaskSetManager( s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } - val timeTaken = clock.getTime() - startTime addRunningTask(taskId) - logInfo("Serialized task %s:%d as %d bytes in %d ms".format( - taskSet.id, index, serializedTask.limit, timeTaken)) - val taskName = "task %s:%d".format(taskSet.id, index) + + // We used to log the time it takes to serialize the task, but task size is already + // a good proxy to task serialization time. + // val timeTaken = clock.getTime() - startTime + val taskName = s"task ${info.id} in stage ${taskSet.id}" + logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format( + taskName, taskId, host, taskLocality, serializedTask.limit)) + sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) } @@ -490,19 +492,19 @@ private[spark] class TaskSetManager( info.markSuccessful() removeRunningTask(tid) sched.dagScheduler.taskEnded( - tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) if (!successful(index)) { tasksSuccessful += 1 - logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format( - tid, info.duration, info.host, tasksSuccessful, numTasks)) + logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( + info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { - logInfo("Ignorning task-finished event for TID " + tid + " because task " + - index + " has already completed successfully") + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + + " because task " + index + " has already completed successfully") } failedExecutors.remove(index) maybeFinishTaskSet() @@ -521,14 +523,13 @@ private[spark] class TaskSetManager( info.markFailed() val index = info.index copiesRunning(index) -= 1 - if (!isZombie) { - logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) - } var taskMetrics : TaskMetrics = null - var failureReason: String = null + + val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + + reason.asInstanceOf[TaskFailedReason].toErrorString reason match { case fetchFailed: FetchFailed => - logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress) + logWarning(failureReason) if (!successful(index)) { successful(index) = true tasksSuccessful += 1 @@ -536,23 +537,17 @@ private[spark] class TaskSetManager( // Not adding to failed executors for FetchFailed. isZombie = true - case TaskKilled => - // Not adding to failed executors for TaskKilled. - logWarning("Task %d was killed.".format(tid)) - case ef: ExceptionFailure => - taskMetrics = ef.metrics.getOrElse(null) - if (ef.className == classOf[NotSerializableException].getName()) { + taskMetrics = ef.metrics.orNull + if (ef.className == classOf[NotSerializableException].getName) { // If the task result wasn't serializable, there's no point in trying to re-execute it. - logError("Task %s:%s had a not serializable result: %s; not retrying".format( - taskSet.id, index, ef.description)) - abort("Task %s:%s had a not serializable result: %s".format( - taskSet.id, index, ef.description)) + logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying" + .format(info.id, taskSet.id, tid, ef.description)) + abort("Task %s in stage %s (TID %d) had a not serializable result: %s".format( + info.id, taskSet.id, tid, ef.description)) return } val key = ef.description - failureReason = "Exception failure in TID %s on host %s: %s\n%s".format( - tid, info.host, ef.description, ef.stackTrace.map(" " + _).mkString("\n")) val now = clock.getTime() val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -570,19 +565,18 @@ private[spark] class TaskSetManager( } } if (printFull) { - val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) - logWarning("Loss was due to %s\n%s\n%s".format( - ef.className, ef.description, locs.mkString("\n"))) + logWarning(failureReason) } else { - logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) + logInfo( + s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on executor ${info.host}: " + + s"${ef.className} (${ef.description}) [duplicate $dupCount]") } - case TaskResultLost => - failureReason = "Lost result for TID %s on host %s".format(tid, info.host) + case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) - case _ => - failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host) + case e: TaskEndReason => + logError("Unknown TaskEndReason: " + e) } // always add to failed executors failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). @@ -593,10 +587,10 @@ private[spark] class TaskSetManager( assert (null != failureReason) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { - logError("Task %s:%d failed %d times; aborting job".format( - taskSet.id, index, maxTaskFailures)) - abort("Task %s:%d failed %d times, most recent failure: %s\nDriver stacktrace:".format( - taskSet.id, index, maxTaskFailures, failureReason)) + logError("Task %d in stage %s failed %d times; aborting job".format( + index, taskSet.id, maxTaskFailures)) + abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" + .format(index, taskSet.id, maxTaskFailures, failureReason)) return } } @@ -709,8 +703,8 @@ private[spark] class TaskSetManager( if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { logInfo( - "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format( - taskSet.id, index, info.host, threshold)) + "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" + .format(index, taskSet.id, info.host, threshold)) speculatableTasks += index foundTasks = true } @@ -748,7 +742,8 @@ private[spark] class TaskSetManager( pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } - if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) { + if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && + pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } levels += ANY @@ -761,7 +756,8 @@ private[spark] class TaskSetManager( def newLocAvail(index: Int): Boolean = { for (loc <- tasks(index).preferredLocations) { if (sched.hasExecutorsAliveOnHost(loc.host) || - sched.getRackForHost(loc.host).isDefined) { + (sched.getRackForHost(loc.host).isDefined && + sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) { return true } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 318e16552201c..6abf6d930c155 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage + case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String) + extends CoarseGrainedClusterMessage + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 05d01b0c821f9..9f085eef46720 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} +import org.apache.spark.ui.JettyUtils /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -46,9 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExpectedExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value, that is double between 0 and 1. + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + if (minRegisteredRatio > 1) minRegisteredRatio = 1 + // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). + val maxRegisteredWaitingTime = + conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) + val createTime = System.currentTimeMillis() + var ready = if (minRegisteredRatio <= 0) true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -83,6 +94,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) + if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) { + ready = true + logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " + + executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() + + ", minRegisteredExecutorsRatio: " + minRegisteredRatio) + } makeOffers() } @@ -120,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A removeExecutor(executorId, reason) sender ! true + case AddWebUIFilter(filterName, filterParams, proxyBase) => + addWebUIFilter(filterName, filterParams, proxyBase) + sender ! true case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) @@ -247,6 +267,33 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { + if (ready) { + return true + } + if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { + ready = true + logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + + "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime) + return true + } + false + } + + // Add filters to the SparkUI + def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { + if (proxyBase != null && proxyBase.nonEmpty) { + System.setProperty("spark.ui.proxyBase", proxyBase) + } + + if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) { + logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") + conf.set("spark.ui.filters", filterName) + conf.set(s"spark.$filterName.params", filterParams) + JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf) + } + } } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9c07b3f7b695a..bf2dc88e29048 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { + totalExpectedExecutors.addAndGet(1) logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 9b95ccca0443e..e9f6273bfd9f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -69,7 +69,7 @@ private[spark] class LocalActor( val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= 1 - executor.launchTask(executorBackend, task.taskId, task.serializedTask) + executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask) } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index a932455776e34..3795994cd920f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -84,7 +84,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks - context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics) + context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics) }) new InterruptibleIterator[T](context, completionIter) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 408a797088059..2f0296c20f2e2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -180,9 +180,9 @@ object BlockFetcherIterator { if (curRequestSize >= targetRequestSize) { // Add this FetchRequest remoteRequests += new FetchRequest(address, curBlocks) - curRequestSize = 0 curBlocks = new ArrayBuffer[(BlockId, Long)] logDebug(s"Creating fetch request of $curRequestSize at $address") + curRequestSize = 0 } } // Add in the final request diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 6aed322eeb185..de1cc5539fb48 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -336,6 +336,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case None => blockManagerIdByExecutor(id.executorId) = id } + + logInfo("Registering block manager %s with %s RAM".format( + id.hostPort, Utils.bytesToString(maxMemSize))) + blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) } @@ -432,9 +436,6 @@ private[spark] class BlockManagerInfo( // Mapping from block id to its status. private val _blocks = new JHashMap[BlockId, BlockStatus] - logInfo("Registering block manager %s with %s RAM".format( - blockManagerId.hostPort, Utils.bytesToString(maxMem))) - def getStatus(blockId: BlockId) = Option(_blocks.get(blockId)) def updateLastSeenMs() { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 9cb50d9b83dda..e07aa2ee3a5a2 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging { } // Yarn has to go through a proxy so the base uri is provided and has to be on all links - val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("") + def uiRoot: String = { + if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) { + System.getenv("APPLICATION_WEB_PROXY_BASE") + } else if (System.getProperty("spark.ui.proxyBase") != null) { + System.getProperty("spark.ui.proxyBase") + } + else { + "" + } + } def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 52020954ea57c..0cc51c873727d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import scala.xml.Node import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils /** Page showing executor summary */ @@ -64,11 +65,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { executorIdToAddress.put(executorId, address) } - val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) - executorIdToSummary match { - case Some(x) => - x.toSeq.sortBy(_._1).map { case (k, v) => { - // scalastyle:off + listener.stageIdToData.get(stageId) match { + case Some(stageData: StageUIData) => + stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
{s.details}} - listener.stageIdToDescription.get(s.stageId) - .map(d =>
spark.io.compression.codec
org.apache.spark.io.LZFCompressionCodec
- and org.apache.spark.io.SnappyCompressionCodec
. Of these two choices,
- Snappy offers faster compression and decompression, while LZF offers a better compression
- ratio.
+ By default, Spark provides three codecs: org.apache.spark.io.LZ4CompressionCodec
,
+ org.apache.spark.io.LZFCompressionCodec
,
+ and org.apache.spark.io.SnappyCompressionCodec
.
spark.io.compression.lz4.block.size
spark.broadcast.factory
spark.scheduler.minRegisteredExecutorsRatio
spark.scheduler.maxRegisteredExecutorsWaitingTime
+ spark.scheduler.maxRegisteredExecutorsWaitingTime
spark.streaming.receiver.maxRate
spark.streaming.unpersist
Property Name | Default | Meaning |
---|---|---|
spark.deploy.retainedApplications |
+ 200 | +
+ The maximum number of completed applications to display. Older applications will be dropped from the UI to maintain this limit. + |
+
spark.deploy.retainedDrivers |
+ 200 | +
+ The maximum number of completed drivers to display. Older drivers will be dropped from the UI to maintain this limit. + |
+
spark.deploy.spreadOut |
true | diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 522c83884ef42..38728534a46e0 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -474,7 +474,7 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. -In order to use Hive you must first run '`SPARK_HIVE=true sbt/sbt assembly/assembly`' (or use `-Phive` for maven). +In order to use Hive you must first run '`sbt/sbt -Phive assembly/assembly`' (or use `-Phive` for maven). This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to acccess data stored in Hive. diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 44775ea479ece..02cfe4ec39c7d 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -240,7 +240,10 @@ def get_spark_ami(opts): "r3.xlarge": "hvm", "r3.2xlarge": "hvm", "r3.4xlarge": "hvm", - "r3.8xlarge": "hvm" + "r3.8xlarge": "hvm", + "t2.micro": "hvm", + "t2.small": "hvm", + "t2.medium": "hvm" } if opts.instance_type in instance_types: instance_type = instance_types[opts.instance_type] diff --git a/examples/pom.xml b/examples/pom.xml index 4f6d7fdb87d47..bd1c387c2eb91 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -27,6 +27,9 @@