Skip to content

Commit

Permalink
[SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Add Mesos labels support to the Spark Dispatcher

## How was this patch tested?

unit tests

Author: Michael Gummelt <[email protected]>

Closes apache#18220 from mgummelt/SPARK-21000-dispatcher-labels.
  • Loading branch information
Michael Gummelt authored and wangzejie committed Jun 16, 2017
1 parent 0f2da15 commit e5940a0
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 53 deletions.
14 changes: 12 additions & 2 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for information on Spark config
<td>(none)</td>
<td>
Set the Mesos labels to add to each task. Labels are free-form key-value pairs.
Key-value pairs should be separated by a colon, and commas used to list more than one.
Ex. key:value,key2:value2.
Key-value pairs should be separated by a colon, and commas used to
list more than one. If your label includes a colon or comma, you
can escape it with a backslash. Ex. key:value,key2:a\:b.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -468,6 +469,15 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.labels</code></td>
<td><code>(none)</code></td>
<td>
Mesos labels to add to the driver. See <code>spark.mesos.task.labels</code>
for formatting information.
</td>
</tr>

<tr>
<td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
<td><code>(none)</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@ package object config {
.stringConf
.createOptional

private [spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason

import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils


/**
* Tracks the current state of a Mesos Task that runs a Spark driver.
* @param driverDescription Submitted driver description from
Expand Down Expand Up @@ -525,15 +527,17 @@ private[spark] class MesosClusterScheduler(
offer.remainingResources = finalResources.asJava

val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()

TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
taskInfo.build
.setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
.build
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName(s"${sc.appName} $taskId")

taskBuilder.addAllResources(resourcesToUse.asJava)
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))

val labelsBuilder = taskBuilder.getLabelsBuilder
val labels = buildMesosLabels().asJava

labelsBuilder.addAllLabels(labels)

taskBuilder.setLabels(labelsBuilder)
.setLabels(MesosProtoUtils.mesosLabels(taskLabels))
.addAllResources(resourcesToUse.asJava)
.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))

tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
Expand All @@ -444,21 +437,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}

private def buildMesosLabels(): List[Label] = {
taskLabels.split(",").flatMap(label =>
label.split(":") match {
case Array(key, value) =>
Some(Label.newBuilder()
.setKey(key)
.setValue(value)
.build())
case _ =>
logWarning(s"Unable to parse $label into a key:value label for the task.")
None
}
).toList
}

/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(
resources: JList[Resource],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import scala.collection.JavaConverters._

import org.apache.mesos.Protos

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging

object MesosProtoUtils extends Logging {

/** Parses a label string of the format specified in spark.mesos.task.labels. */
def mesosLabels(labelsStr: String): Protos.Labels.Builder = {
val labels: Seq[Protos.Label] = if (labelsStr == "") {
Seq()
} else {
labelsStr.split("""(?<!\\),""").toSeq.map { labelStr =>
val parts = labelStr.split("""(?<!\\):""")
if (parts.length != 2) {
throw new SparkException(s"Malformed label: ${labelStr}")
}

val cleanedParts = parts
.map(part => part.replaceAll("""\\,""", ","))
.map(part => part.replaceAll("""\\:""", ":"))

Protos.Label.newBuilder()
.setKey(cleanedParts(0))
.setValue(cleanedParts(1))
.build()
}
}

Protos.Labels.newBuilder().addAllLabels(labels.asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.get(0).getName == "test-network-name")
}

test("supports spark.mesos.driver.labels") {
setScheduler()

val mem = 1000
val cpu = 1

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.labels" -> "key:value"),
"s1",
new Date()))

assert(response.success)

val offer = Utils.createOffer("o1", "s1", mem, cpu)
scheduler.resourceOffers(driver, List(offer).asJava)

val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
val labels = launchedTasks.head.getLabels
assert(labels.getLabelsCount == 1)
assert(labels.getLabels(0).getKey == "key")
assert(labels.getLabels(0).getValue == "value")
}

test("can kill supervised drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,29 +532,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getLabels.equals(taskLabels))
}

test("mesos ignored invalid labels and sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
setBackend(Map(
"spark.mesos.task.labels" -> taskLabelsString
))

// Build up the labels
val taskLabels = Protos.Labels.newBuilder()
.addLabels(Protos.Label.newBuilder()
.setKey("mesos").setValue("test").build())
.addLabels(Protos.Label.newBuilder()
.setKey("label").setValue("test").build())
.build()

val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")

val labels = launchedTasks.head.getLabels

assert(launchedTasks.head.getLabels.equals(taskLabels))
}

test("mesos supports spark.mesos.network.name") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.SparkFunSuite

class MesosProtoUtilsSuite extends SparkFunSuite {
test("mesosLabels") {
val labels = MesosProtoUtils.mesosLabels("key:value")
assert(labels.getLabelsCount == 1)
val label = labels.getLabels(0)
assert(label.getKey == "key")
assert(label.getValue == "value")

val labels2 = MesosProtoUtils.mesosLabels("key:value\\:value")
assert(labels2.getLabelsCount == 1)
val label2 = labels2.getLabels(0)
assert(label2.getKey == "key")
assert(label2.getValue == "value:value")

val labels3 = MesosProtoUtils.mesosLabels("key:value,key2:value2")
assert(labels3.getLabelsCount == 2)
assert(labels3.getLabels(0).getKey == "key")
assert(labels3.getLabels(0).getValue == "value")
assert(labels3.getLabels(1).getKey == "key2")
assert(labels3.getLabels(1).getValue == "value2")

val labels4 = MesosProtoUtils.mesosLabels("key:value\\,value")
assert(labels4.getLabelsCount == 1)
assert(labels4.getLabels(0).getKey == "key")
assert(labels4.getLabels(0).getValue == "value,value")
}
}

0 comments on commit e5940a0

Please sign in to comment.