Skip to content

Commit

Permalink
add escaping support
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Gummelt committed Jun 6, 2017
1 parent 9c21758 commit e09f55a
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 24 deletions.
10 changes: 5 additions & 5 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for information on Spark config
<td>(none)</td>
<td>
Set the Mesos labels to add to each task. Labels are free-form key-value pairs.
Key-value pairs should be separated by a colon, and commas used to list more than one.
Ex. key:value,key2:value2.
Key-value pairs should be separated by a colon, and commas used to
list more than one. If your label includes a colon or comma, you
can escape it with a backslash. Ex. key:value,key2:a\:b.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -472,9 +473,8 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.mesos.driver.labels</code></td>
<td><code>(none)</code></td>
<td>
Mesos labels to add to the driver. Labels are free-form key-value
pairs. Key-value pairs should be separated by a colon, and commas used to
list more than one. Ex. key:value,key2:value2.
Mesos labels to add to the driver. See spark.mesos.task.labels
for formatting information.
</td>
</tr>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ private[spark] class MesosClusterScheduler(
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
.setLabels(buildMesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
.setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
.build
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName(s"${sc.appName} $taskId")
.setLabels(buildMesosLabels(taskLabels))
.setLabels(MesosProtoUtils.mesosLabels(taskLabels))
.addAllResources(resourcesToUse.asJava)
.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import org.apache.mesos.Protos

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging

object MesosProtoUtils extends Logging {

/** Parses a label string of the format specified in spark.mesos.task.labels. */
def mesosLabels(labelsStr: String): Protos.Labels.Builder = {
var key: Option[String] = None
var value: Option[String] = None
var currStr = ""
var i = 0
val labels = Protos.Labels.newBuilder()

// 0 -> parsing key
// 1 -> parsing value
var state = 0

def addLabel() = {
value = Some(currStr)
if (key.isEmpty) {
throw new SparkException(s"Error while parsing label string: ${labelsStr}. " +
s"Empty label key.")
} else {
val label = Protos.Label.newBuilder().setKey(key.get).setValue(value.get)
labels.addLabels(label)

key = None
value = None
currStr = ""
state = 0
}
}

while(i < labelsStr.length) {
val c = labelsStr(i)

if (c == ',') {
addLabel()
} else if (c == ':') {
key = Some(currStr)
currStr = ""
state = 1
} else if (c == '\\') {
if (i == labelsStr.length - 1) {
if (state == 1) {
value = value.map(_ + '\\')
} else {
throw new SparkException(s"Error while parsing label string: ${labelsStr}. " +
"Key has no value.")
}
} else {
val c2 = labelsStr(i + 1)
if (c2 == ',' || c2 == ':') {
currStr += c2
i += 1
} else {
currStr += c
}
}
} else {
currStr += c
}

i += 1
}

addLabel()
labels
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -527,23 +527,6 @@ trait MesosSchedulerUtils extends Logging {
case TaskState.LOST => MesosTaskState.TASK_LOST
}

def buildMesosLabels(labelsStr: String): Labels.Builder = {
val labels = labelsStr.split(",").flatMap(label =>
label.split(":") match {
case Array(key, value) =>
Some(Label.newBuilder()
.setKey(key)
.setValue(value)
.build())
case _ =>
logWarning(s"Unable to parse $label into a key:value label for the task.")
None
}
).toList.asJava

Labels.newBuilder().addAllLabels(labels)
}

protected def declineOffer(
driver: org.apache.mesos.SchedulerDriver,
offer: Offer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.SparkFunSuite

class MesosProtoUtilsSuite extends SparkFunSuite {
test("mesosLabels") {
val labels = MesosProtoUtils.mesosLabels("key:value")
assert(labels.getLabelsCount == 1)
val label = labels.getLabels(0)
assert(label.getKey == "key")
assert(label.getValue == "value")

val labels2 = MesosProtoUtils.mesosLabels("key:value\\:value")
assert(labels2.getLabelsCount == 1)
val label2 = labels2.getLabels(0)
assert(label2.getKey == "key")
assert(label2.getValue == "value:value")

val labels3 = MesosProtoUtils.mesosLabels("key:value,key2:value2")
assert(labels3.getLabelsCount == 2)
assert(labels3.getLabels(0).getKey == "key")
assert(labels3.getLabels(0).getValue == "value")
assert(labels3.getLabels(1).getKey == "key2")
assert(labels3.getLabels(1).getValue == "value2")

val labels4 = MesosProtoUtils.mesosLabels("key:value\\,value")
assert(labels4.getLabelsCount == 1)
assert(labels4.getLabels(0).getKey == "key")
assert(labels4.getLabels(0).getValue == "value,value")
}
}

0 comments on commit e09f55a

Please sign in to comment.