From fd4c1fae576889bfa67c5571887572ced44a1ec8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 28 Feb 2019 19:29:14 -0800 Subject: [PATCH] Test uniqueness. --- .../spark/deploy/k8s/KubernetesUtils.scala | 7 ++-- .../features/DriverServiceFeatureStep.scala | 7 ++-- .../DriverServiceFeatureStepSuite.scala | 34 ++++++++++++++----- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 878a616ec78fb..06e0d3aa39286 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -27,10 +27,11 @@ import org.apache.commons.codec.binary.Hex import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} private[spark] object KubernetesUtils extends Logging { + private val systemClock = new SystemClock() private lazy val RNG = new SecureRandom() /** @@ -218,13 +219,13 @@ private[spark] object KubernetesUtils extends Logging { * This avoids using a UUID for uniqueness (too long), and relying solely on the current time * (not unique enough). */ - def uniqueID(): String = { + def uniqueID(clock: Clock = systemClock): String = { val random = new Array[Byte](3) synchronized { RNG.nextBytes(random) } - val time = java.lang.Long.toHexString(System.currentTimeMillis() & 0xFFFFFFFFFFL) + val time = java.lang.Long.toHexString(clock.getTimeMillis() & 0xFFFFFFFFFFL) Hex.encodeHexString(random) + time } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 55b2dd8067a59..cec8769b8378e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -23,8 +23,11 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.{Clock, SystemClock} -private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverConf) +private[spark] class DriverServiceFeatureStep( + kubernetesConf: KubernetesDriverConf, + clock: Clock = new SystemClock()) extends KubernetesFeatureConfigStep with Logging { import DriverServiceFeatureStep._ @@ -39,7 +42,7 @@ private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverCo private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { preferredServiceName } else { - val randomServiceId = KubernetesUtils.uniqueID() + val randomServiceId = KubernetesUtils.uniqueID(clock = clock) val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index da562f558b67c..fbd99b73b37a4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ +import com.google.common.net.InternetDomainName import io.fabric8.kubernetes.api.model.Service import org.apache.spark.{SparkConf, SparkFunSuite} @@ -26,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.internal.config._ +import org.apache.spark.util.ManualClock class DriverServiceFeatureStepSuite extends SparkFunSuite { @@ -90,23 +92,37 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString) } - test("Long prefixes should switch to using a generated name.") { + test("Long prefixes should switch to using a generated unique name.") { val sparkConf = new SparkConf(false) .set(KUBERNETES_NAMESPACE, "my-namespace") val kconf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), labels = DRIVER_LABELS) - val configurationStep = new DriverServiceFeatureStep(kconf) + val clock = new ManualClock() - val driverService = configurationStep - .getAdditionalKubernetesResources() - .head - .asInstanceOf[Service] - assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix)) + // Ensure that multiple services created at the same time generate unique names. + val services = (1 to 10).map { _ => + val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock) + val serviceName = configurationStep + .getAdditionalKubernetesResources() + .head + .asInstanceOf[Service] + .getMetadata + .getName - val additionalProps = configurationStep.getAdditionalPodSystemProperties() - assert(!additionalProps(DRIVER_HOST_ADDRESS.key).startsWith(kconf.resourceNamePrefix)) + val hostAddress = configurationStep + .getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key) + + (serviceName -> hostAddress) + }.toMap + + assert(services.size === 10) + services.foreach { case (name, address) => + assert(!name.startsWith(kconf.resourceNamePrefix)) + assert(!address.startsWith(kconf.resourceNamePrefix)) + assert(InternetDomainName.isValid(address)) + } } test("Disallow bind address and driver host to be set explicitly.") {