Skip to content

Commit

Permalink
Add Mesos labels support to the Spark Dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Gummelt committed Jun 6, 2017
1 parent 88a23d3 commit ee10af6
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 28 deletions.
10 changes: 10 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.labels</code></td>
<td><code>(none)</code></td>
<td>
Mesos labels to add to the driver. Labels are free-form key-value
pairs. Key-value pairs should be separated by a colon, and commas used to
list more than one. Ex. key:value,key2:value2.
</td>
</tr>

<tr>
<td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
<td><code>(none)</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@ package object config {
.stringConf
.createOptional

private [spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
import org.apache.spark.deploy.mesos.config

/**
* Tracks the current state of a Mesos Task that runs a Spark driver.
Expand Down Expand Up @@ -525,15 +526,17 @@ private[spark] class MesosClusterScheduler(
offer.remainingResources = finalResources.asJava

val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()

TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
taskInfo.build
.setLabels(buildMesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
.build
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName(s"${sc.appName} $taskId")

taskBuilder.addAllResources(resourcesToUse.asJava)
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))

val labelsBuilder = taskBuilder.getLabelsBuilder
val labels = buildMesosLabels().asJava

labelsBuilder.addAllLabels(labels)

taskBuilder.setLabels(labelsBuilder)
.setLabels(buildMesosLabels(taskLabels))
.addAllResources(resourcesToUse.asJava)
.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))

tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
Expand All @@ -444,21 +437,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}

private def buildMesosLabels(): List[Label] = {
taskLabels.split(",").flatMap(label =>
label.split(":") match {
case Array(key, value) =>
Some(Label.newBuilder()
.setKey(key)
.setValue(value)
.build())
case _ =>
logWarning(s"Unable to parse $label into a key:value label for the task.")
None
}
).toList
}

/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(
resources: JList[Resource],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,23 @@ trait MesosSchedulerUtils extends Logging {
case TaskState.LOST => MesosTaskState.TASK_LOST
}

def buildMesosLabels(labelsStr: String): Labels.Builder = {
val labels = labelsStr.split(",").flatMap(label =>
label.split(":") match {
case Array(key, value) =>
Some(Label.newBuilder()
.setKey(key)
.setValue(value)
.build())
case _ =>
logWarning(s"Unable to parse $label into a key:value label for the task.")
None
}
).toList.asJava

Labels.newBuilder().addAllLabels(labels)
}

protected def declineOffer(
driver: org.apache.mesos.SchedulerDriver,
offer: Offer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.get(0).getName == "test-network-name")
}

test("supports spark.mesos.driver.labels") {
setScheduler()

val mem = 1000
val cpu = 1

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.labels" -> "key:value"),
"s1",
new Date()))

assert(response.success)

val offer = Utils.createOffer("o1", "s1", mem, cpu)
scheduler.resourceOffers(driver, List(offer).asJava)

val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
val labels = launchedTasks.head.getLabels
assert(labels.getLabelsCount == 1)
assert(labels.getLabels(0).getKey == "key")
assert(labels.getLabels(0).getValue == "value")
}

test("can kill supervised drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
Expand Down

0 comments on commit ee10af6

Please sign in to comment.