From e09f55af9df27e616105421d6e1492c5b391f07c Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Tue, 6 Jun 2017 13:24:21 -0700 Subject: [PATCH] add escaping support --- docs/running-on-mesos.md | 10 +- .../cluster/mesos/MesosClusterScheduler.scala | 2 +- .../MesosCoarseGrainedSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosProtoUtils.scala | 91 +++++++++++++++++++ .../cluster/mesos/MesosSchedulerUtils.scala | 17 ---- .../cluster/mesos/MesosProtoUtilsSuite.scala | 48 ++++++++++ 6 files changed, 146 insertions(+), 24 deletions(-) create mode 100644 resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala create mode 100644 resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index e459a023b56df..6844285a9dcb8 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for information on Spark config (none) Set the Mesos labels to add to each task. Labels are free-form key-value pairs. - Key-value pairs should be separated by a colon, and commas used to list more than one. - Ex. key:value,key2:value2. + Key-value pairs should be separated by a colon, and commas used to + list more than one. If your label includes a colon or comma, you + can escape it with a backslash. Ex. key:value,key2:a\:b. @@ -472,9 +473,8 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.driver.labels (none) - Mesos labels to add to the driver. Labels are free-form key-value - pairs. Key-value pairs should be separated by a colon, and commas used to - list more than one. Ex. key:value,key2:value2. + Mesos labels to add to the driver. See spark.mesos.task.labels + for formatting information. diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 1ea13f7ba1042..577f9a876b381 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -535,7 +535,7 @@ private[spark] class MesosClusterScheduler( .setCommand(buildDriverCommand(desc)) .addAllResources(cpuResourcesToUse.asJava) .addAllResources(memResourcesToUse.asJava) - .setLabels(buildMesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse(""))) + .setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse(""))) .setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf)) .build } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d9bcea5c766c2..871685c6cccc0 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -419,7 +419,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) .setName(s"${sc.appName} $taskId") - .setLabels(buildMesosLabels(taskLabels)) + .setLabels(MesosProtoUtils.mesosLabels(taskLabels)) .addAllResources(resourcesToUse.asJava) .setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala new file mode 100644 index 0000000000000..e08813928c34f --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.apache.mesos.Protos + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging + +object MesosProtoUtils extends Logging { + + /** Parses a label string of the format specified in spark.mesos.task.labels. */ + def mesosLabels(labelsStr: String): Protos.Labels.Builder = { + var key: Option[String] = None + var value: Option[String] = None + var currStr = "" + var i = 0 + val labels = Protos.Labels.newBuilder() + + // 0 -> parsing key + // 1 -> parsing value + var state = 0 + + def addLabel() = { + value = Some(currStr) + if (key.isEmpty) { + throw new SparkException(s"Error while parsing label string: ${labelsStr}. " + + s"Empty label key.") + } else { + val label = Protos.Label.newBuilder().setKey(key.get).setValue(value.get) + labels.addLabels(label) + + key = None + value = None + currStr = "" + state = 0 + } + } + + while(i < labelsStr.length) { + val c = labelsStr(i) + + if (c == ',') { + addLabel() + } else if (c == ':') { + key = Some(currStr) + currStr = "" + state = 1 + } else if (c == '\\') { + if (i == labelsStr.length - 1) { + if (state == 1) { + value = value.map(_ + '\\') + } else { + throw new SparkException(s"Error while parsing label string: ${labelsStr}. " + + "Key has no value.") + } + } else { + val c2 = labelsStr(i + 1) + if (c2 == ',' || c2 == ':') { + currStr += c2 + i += 1 + } else { + currStr += c + } + } + } else { + currStr += c + } + + i += 1 + } + + addLabel() + labels + } +} diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 016a192da9a5e..062ed1f93fa52 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -527,23 +527,6 @@ trait MesosSchedulerUtils extends Logging { case TaskState.LOST => MesosTaskState.TASK_LOST } - def buildMesosLabels(labelsStr: String): Labels.Builder = { - val labels = labelsStr.split(",").flatMap(label => - label.split(":") match { - case Array(key, value) => - Some(Label.newBuilder() - .setKey(key) - .setValue(value) - .build()) - case _ => - logWarning(s"Unable to parse $label into a key:value label for the task.") - None - } - ).toList.asJava - - Labels.newBuilder().addAllLabels(labels) - } - protected def declineOffer( driver: org.apache.mesos.SchedulerDriver, offer: Offer, diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala new file mode 100644 index 0000000000000..36a4c1ab1ad25 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.apache.spark.SparkFunSuite + +class MesosProtoUtilsSuite extends SparkFunSuite { + test("mesosLabels") { + val labels = MesosProtoUtils.mesosLabels("key:value") + assert(labels.getLabelsCount == 1) + val label = labels.getLabels(0) + assert(label.getKey == "key") + assert(label.getValue == "value") + + val labels2 = MesosProtoUtils.mesosLabels("key:value\\:value") + assert(labels2.getLabelsCount == 1) + val label2 = labels2.getLabels(0) + assert(label2.getKey == "key") + assert(label2.getValue == "value:value") + + val labels3 = MesosProtoUtils.mesosLabels("key:value,key2:value2") + assert(labels3.getLabelsCount == 2) + assert(labels3.getLabels(0).getKey == "key") + assert(labels3.getLabels(0).getValue == "value") + assert(labels3.getLabels(1).getKey == "key2") + assert(labels3.getLabels(1).getValue == "value2") + + val labels4 = MesosProtoUtils.mesosLabels("key:value\\,value") + assert(labels4.getLabelsCount == 1) + assert(labels4.getLabels(0).getKey == "key") + assert(labels4.getLabels(0).getValue == "value,value") + } +}