Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into demarcate-tests
Browse files Browse the repository at this point in the history
Conflicts:
	mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
	sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
	sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
  • Loading branch information
Andrew Or committed May 27, 2015
2 parents 12d1e1b + db3fd05 commit fa9450e
Show file tree
Hide file tree
Showing 118 changed files with 1,610 additions and 1,805 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ function renderDagVizForJob(svgContainer) {
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a")
.select("a.name-link")
.attr("href") + "&expandDagViz=true";
container = svgContainer
.append("a")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {
};

$(this).click(function() {
var jobPagePath = $(getSelectorForJobEntry(this)).find("a").attr("href")
var jobPagePath = $(getSelectorForJobEntry(this)).find("a.name-link").attr("href")
window.location.href = jobPagePath
});

Expand Down Expand Up @@ -105,7 +105,7 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {
};

$(this).click(function() {
var stagePagePath = $(getSelectorForStageEntry(this)).find("a").attr("href")
var stagePagePath = $(getSelectorForStageEntry(this)).find("a.name-link").attr("href")
window.location.href = stagePagePath
});

Expand Down
58 changes: 27 additions & 31 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,17 @@ class DAGScheduler(

/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage = {
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
registerShuffleDependencies(shuffleDep, firstJobId)
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, jobId)
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage

stage
Expand All @@ -230,37 +228,35 @@ class DAGScheduler(
/**
* Helper function to eliminate some code re-use when creating new stages.
*/
private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, jobId)
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}

/**
* Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
* newOrUsedShuffleStage. The stage will be associated with the provided jobId.
* newOrUsedShuffleStage. The stage will be associated with the provided firstJobId.
* Production of shuffle map stages should always use newOrUsedShuffleStage, not
* newShuffleMapStage directly.
*/
private def newShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
firstJobId: Int,
callSite: CallSite): ShuffleMapStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
jobId, callSite, shuffleDep)
firstJobId, callSite, shuffleDep)

stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
updateJobIdStageIdMaps(firstJobId, stage)
stage
}

/**
* Create a ResultStage -- either directly for use as a result stage, or as part of the
* (re)-creation of a shuffle map stage in newOrUsedShuffleStage. The stage will be associated
* with the provided jobId.
* Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
rdd: RDD[_],
Expand All @@ -277,16 +273,16 @@ class DAGScheduler(

/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
* provided firstJobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage = {
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.size
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
Expand All @@ -304,10 +300,10 @@ class DAGScheduler(
}

/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided jobId if they haven't already been created with a lower jobId.
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
Expand All @@ -321,7 +317,7 @@ class DAGScheduler(
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, jobId)
parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
Expand All @@ -336,11 +332,11 @@ class DAGScheduler(
}

/** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage = newOrUsedShuffleStage(currentShufDep, jobId)
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}
Expand Down Expand Up @@ -390,7 +386,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
Expand Down Expand Up @@ -577,7 +573,7 @@ class DAGScheduler(

private[scheduler] def doCancelAllJobs() {
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation(_,
runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
Expand All @@ -603,7 +599,7 @@ class DAGScheduler(
clearCacheLocs()
val failedStagesCopy = failedStages.toArray
failedStages.clear()
for (stage <- failedStagesCopy.sortBy(_.jobId)) {
for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
Expand All @@ -623,7 +619,7 @@ class DAGScheduler(
logTrace("failed: " + failedStages)
val waitingStagesCopy = waitingStages.toArray
waitingStages.clear()
for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
Expand Down Expand Up @@ -843,7 +839,7 @@ class DAGScheduler(
}
}

val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down Expand Up @@ -909,7 +905,7 @@ class DAGScheduler(
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
Expand Down Expand Up @@ -1323,7 +1319,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
waitingForVisit.push(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ private[spark] class ResultStage(
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
jobId: Int,
firstJobId: Int,
callSite: CallSite)
extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

// The active job for this result stage. Will be empty if the job has already finished
// (e.g., because the job was cancelled).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ private[spark] class ShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
jobId: Int,
firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _])
extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

override def toString: String = "ShuffleMapStage " + id

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.CallSite
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
Expand All @@ -51,7 +51,7 @@ private[spark] abstract class Stage(
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val jobId: Int,
val firstJobId: Int,
val callSite: CallSite)
extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.scheduler

import java.nio.ByteBuffer
import java.util.{TimerTask, Timer}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
Expand All @@ -32,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId

Expand Down Expand Up @@ -64,6 +64,9 @@ private[spark] class TaskSchedulerImpl(
// How often to check for speculative tasks
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")

private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

Expand Down Expand Up @@ -142,10 +145,11 @@ private[spark] class TaskSchedulerImpl(

if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}(sc.env.actorSystem.dispatcher)
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}

Expand Down Expand Up @@ -412,6 +416,7 @@ private[spark] class TaskSchedulerImpl(
}

override def stop() {
speculationScheduler.shutdown()
if (backend != null) {
backend.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}

/**
Expand Down Expand Up @@ -115,11 +116,9 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(sc.env.actorSystem),
val driverUrl = sc.env.rpcEnv.uriOf(
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)

val uri = conf.getOption("spark.executor.uri")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
</td>
<td>
<span class="description-input" title={lastStageDescription}>{lastStageDescription}</span>
<a href={detailUrl}>{lastStageName}</a>
<a href={detailUrl} class="name-link">{lastStageName}</a>
</td>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
{formattedSubmissionTime}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[ui] class StageTableBase(
}

val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}"
val nameLink = <a href={nameLinkUri}>{s.name}</a>
val nameLink = <a href={nameLinkUri} class="name-link">{s.name}</a>

val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
val details = if (s.details.nonEmpty) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private[spark] object AkkaUtils extends Logging {
protocol: String,
systemName: String,
host: String,
port: Any,
port: Int,
actorName: String): String = {
s"$protocol://$systemName@$host:$port/user/$actorName"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"running app list json" -> "applications?status=running",
"minDate app list json" -> "applications?minDate=2015-02-10",
"maxDate app list json" -> "applications?maxDate=2015-02-10",
"maxDate2 app list json" -> "applications?maxDate=2015-02-03T10:42:40.000CST",
"maxDate2 app list json" -> "applications?maxDate=2015-02-03T16:42:40.000GMT",
"one app json" -> "applications/local-1422981780767",
"one app multi-attempt json" -> "applications/local-1426533911241",
"job list json" -> "applications/local-1422981780767/jobs",
Expand Down
6 changes: 1 addition & 5 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,10 @@ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests
# Building With Hive and JDBC Support
To enable Hive integration for Spark SQL along with its JDBC server and CLI,
add the `-Phive` and `Phive-thriftserver` profiles to your existing build options.
By default Spark will build with Hive 0.13.1 bindings. You can also build for
Hive 0.12.0 using the `-Phive-0.12.0` profile.
By default Spark will build with Hive 0.13.1 bindings.
{% highlight bash %}
# Apache Hadoop 2.4.X with Hive 13 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package

# Apache Hadoop 2.4.X with Hive 12 support
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package
{% endhighlight %}

# Building for Scala 2.11
Expand Down
Loading

0 comments on commit fa9450e

Please sign in to comment.