From 92eb16dca654f3638ddd289099531184a2f5c068 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Feb 2019 14:08:04 -0800 Subject: [PATCH 1/2] [SPARK-26420][k8s] Generate more unique IDs when creating k8s resource names. Using the current time as an ID is more prone to clashes than people generally realize, so try to make things a bit more unique without necessarily using a UUID, which would eat too much space in the names otherwise. The implemented approach uses some bits from the current time, plus some random bits, which should be more resistant to clashes. --- .../spark/deploy/k8s/KubernetesConf.scala | 4 +-- .../spark/deploy/k8s/KubernetesUtils.scala | 23 ++++++++++++++ .../features/DriverServiceFeatureStep.scala | 9 ++---- .../DriverServiceFeatureStepSuite.scala | 30 +++++++------------ 4 files changed, 38 insertions(+), 28 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 6febad981af56..1539dd15628c9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -194,8 +194,8 @@ private[spark] object KubernetesConf { } def getResourceNamePrefix(appName: String): String = { - val launchTime = System.currentTimeMillis() - s"$appName-$launchTime" + val id = KubernetesUtils.uniqueID() + s"$appName-$id" .trim .toLowerCase(Locale.ROOT) .replaceAll("\\s+", "-") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 6fafac3ee13c9..878a616ec78fb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -17,11 +17,13 @@ package org.apache.spark.deploy.k8s import java.io.File +import java.security.SecureRandom import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient +import org.apache.commons.codec.binary.Hex import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging @@ -29,6 +31,8 @@ import org.apache.spark.util.Utils private[spark] object KubernetesUtils extends Logging { + private lazy val RNG = new SecureRandom() + /** * Extract and parse Spark configuration properties with a given name prefix and * return the result as a Map. Keys must not have more than one value. @@ -205,4 +209,23 @@ private[spark] object KubernetesUtils extends Logging { def formatTime(time: String): String = { if (time != null) time else "N/A" } + + /** + * Generates a unique ID to be used as part of identifiers. The returned ID is a hex string + * of a 64-bit value containing the 40 LSBs from the current time + 24 random bits from a + * cryptographically strong RNG. (40 bits gives about 30 years worth of "unique" timestamps.) + * + * This avoids using a UUID for uniqueness (too long), and relying solely on the current time + * (not unique enough). + */ + def uniqueID(): String = { + val random = new Array[Byte](3) + synchronized { + RNG.nextBytes(random) + } + + val time = java.lang.Long.toHexString(System.currentTimeMillis() & 0xFFFFFFFFFFL) + Hex.encodeHexString(random) + time + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 15671179b18b3..55b2dd8067a59 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -20,14 +20,11 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} -import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} -import org.apache.spark.util.{Clock, SystemClock} -private[spark] class DriverServiceFeatureStep( - kubernetesConf: KubernetesDriverConf, - clock: Clock = new SystemClock) +private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverConf) extends KubernetesFeatureConfigStep with Logging { import DriverServiceFeatureStep._ @@ -42,7 +39,7 @@ private[spark] class DriverServiceFeatureStep( private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { preferredServiceName } else { - val randomServiceId = clock.getTimeMillis() + val randomServiceId = KubernetesUtils.uniqueID() val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 822f1e32968c2..da562f558b67c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.internal.config._ -import org.apache.spark.util.ManualClock class DriverServiceFeatureStepSuite extends SparkFunSuite { @@ -71,7 +70,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { val expectedServiceName = kconf.resourceNamePrefix + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc" val additionalProps = configurationStep.getAdditionalPodSystemProperties() - verifySparkConfHostNames(additionalProps, expectedHostName) + assert(additionalProps(DRIVER_HOST_ADDRESS.key) === expectedHostName) } test("Ports should resolve to defaults in SparkConf and in the service.") { @@ -92,25 +91,22 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { } test("Long prefixes should switch to using a generated name.") { - val clock = new ManualClock() - clock.setTime(10000) val sparkConf = new SparkConf(false) .set(KUBERNETES_NAMESPACE, "my-namespace") - val configurationStep = new DriverServiceFeatureStep( - KubernetesTestConf.createDriverConf( - sparkConf = sparkConf, - resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), - labels = DRIVER_LABELS), - clock) + val kconf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), + labels = DRIVER_LABELS) + val configurationStep = new DriverServiceFeatureStep(kconf) + val driverService = configurationStep .getAdditionalKubernetesResources() .head .asInstanceOf[Service] - val expectedServiceName = s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}" - assert(driverService.getMetadata.getName === expectedServiceName) - val expectedHostName = s"$expectedServiceName.my-namespace.svc" + assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix)) + val additionalProps = configurationStep.getAdditionalPodSystemProperties() - verifySparkConfHostNames(additionalProps, expectedHostName) + assert(!additionalProps(DRIVER_HOST_ADDRESS.key).startsWith(kconf.resourceNamePrefix)) } test("Disallow bind address and driver host to be set explicitly.") { @@ -156,10 +152,4 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(driverServicePorts(1).getPort.intValue() === blockManagerPort) assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort) } - - private def verifySparkConfHostNames( - driverSparkConf: Map[String, String], expectedHostName: String): Unit = { - assert(driverSparkConf( - org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName) - } } From fd4c1fae576889bfa67c5571887572ced44a1ec8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 28 Feb 2019 19:29:14 -0800 Subject: [PATCH 2/2] Test uniqueness. --- .../spark/deploy/k8s/KubernetesUtils.scala | 7 ++-- .../features/DriverServiceFeatureStep.scala | 7 ++-- .../DriverServiceFeatureStepSuite.scala | 34 ++++++++++++++----- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 878a616ec78fb..06e0d3aa39286 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -27,10 +27,11 @@ import org.apache.commons.codec.binary.Hex import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} private[spark] object KubernetesUtils extends Logging { + private val systemClock = new SystemClock() private lazy val RNG = new SecureRandom() /** @@ -218,13 +219,13 @@ private[spark] object KubernetesUtils extends Logging { * This avoids using a UUID for uniqueness (too long), and relying solely on the current time * (not unique enough). */ - def uniqueID(): String = { + def uniqueID(clock: Clock = systemClock): String = { val random = new Array[Byte](3) synchronized { RNG.nextBytes(random) } - val time = java.lang.Long.toHexString(System.currentTimeMillis() & 0xFFFFFFFFFFL) + val time = java.lang.Long.toHexString(clock.getTimeMillis() & 0xFFFFFFFFFFL) Hex.encodeHexString(random) + time } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 55b2dd8067a59..cec8769b8378e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -23,8 +23,11 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.{Clock, SystemClock} -private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverConf) +private[spark] class DriverServiceFeatureStep( + kubernetesConf: KubernetesDriverConf, + clock: Clock = new SystemClock()) extends KubernetesFeatureConfigStep with Logging { import DriverServiceFeatureStep._ @@ -39,7 +42,7 @@ private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverCo private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { preferredServiceName } else { - val randomServiceId = KubernetesUtils.uniqueID() + val randomServiceId = KubernetesUtils.uniqueID(clock = clock) val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index da562f558b67c..fbd99b73b37a4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ +import com.google.common.net.InternetDomainName import io.fabric8.kubernetes.api.model.Service import org.apache.spark.{SparkConf, SparkFunSuite} @@ -26,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.internal.config._ +import org.apache.spark.util.ManualClock class DriverServiceFeatureStepSuite extends SparkFunSuite { @@ -90,23 +92,37 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString) } - test("Long prefixes should switch to using a generated name.") { + test("Long prefixes should switch to using a generated unique name.") { val sparkConf = new SparkConf(false) .set(KUBERNETES_NAMESPACE, "my-namespace") val kconf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), labels = DRIVER_LABELS) - val configurationStep = new DriverServiceFeatureStep(kconf) + val clock = new ManualClock() - val driverService = configurationStep - .getAdditionalKubernetesResources() - .head - .asInstanceOf[Service] - assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix)) + // Ensure that multiple services created at the same time generate unique names. + val services = (1 to 10).map { _ => + val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock) + val serviceName = configurationStep + .getAdditionalKubernetesResources() + .head + .asInstanceOf[Service] + .getMetadata + .getName - val additionalProps = configurationStep.getAdditionalPodSystemProperties() - assert(!additionalProps(DRIVER_HOST_ADDRESS.key).startsWith(kconf.resourceNamePrefix)) + val hostAddress = configurationStep + .getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key) + + (serviceName -> hostAddress) + }.toMap + + assert(services.size === 10) + services.foreach { case (name, address) => + assert(!name.startsWith(kconf.resourceNamePrefix)) + assert(!address.startsWith(kconf.resourceNamePrefix)) + assert(InternetDomainName.isValid(address)) + } } test("Disallow bind address and driver host to be set explicitly.") {