Skip to content

Commit

Permalink
Test uniqueness.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Mar 1, 2019
1 parent 92eb16d commit fd4c1fa
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import org.apache.commons.codec.binary.Hex

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

private[spark] object KubernetesUtils extends Logging {

private val systemClock = new SystemClock()
private lazy val RNG = new SecureRandom()

/**
Expand Down Expand Up @@ -218,13 +219,13 @@ private[spark] object KubernetesUtils extends Logging {
* This avoids using a UUID for uniqueness (too long), and relying solely on the current time
* (not unique enough).
*/
def uniqueID(): String = {
def uniqueID(clock: Clock = systemClock): String = {
val random = new Array[Byte](3)
synchronized {
RNG.nextBytes(random)
}

val time = java.lang.Long.toHexString(System.currentTimeMillis() & 0xFFFFFFFFFFL)
val time = java.lang.Long.toHexString(clock.getTimeMillis() & 0xFFFFFFFFFFL)
Hex.encodeHexString(random) + time
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{Clock, SystemClock}

private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverConf)
private[spark] class DriverServiceFeatureStep(
kubernetesConf: KubernetesDriverConf,
clock: Clock = new SystemClock())
extends KubernetesFeatureConfigStep with Logging {
import DriverServiceFeatureStep._

Expand All @@ -39,7 +42,7 @@ private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverCo
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID()
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import com.google.common.net.InternetDomainName
import io.fabric8.kubernetes.api.model.Service

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -26,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.internal.config._
import org.apache.spark.util.ManualClock

class DriverServiceFeatureStepSuite extends SparkFunSuite {

Expand Down Expand Up @@ -90,23 +92,37 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString)
}

test("Long prefixes should switch to using a generated name.") {
test("Long prefixes should switch to using a generated unique name.") {
val sparkConf = new SparkConf(false)
.set(KUBERNETES_NAMESPACE, "my-namespace")
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS)
val configurationStep = new DriverServiceFeatureStep(kconf)
val clock = new ManualClock()

val driverService = configurationStep
.getAdditionalKubernetesResources()
.head
.asInstanceOf[Service]
assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix))
// Ensure that multiple services created at the same time generate unique names.
val services = (1 to 10).map { _ =>
val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock)
val serviceName = configurationStep
.getAdditionalKubernetesResources()
.head
.asInstanceOf[Service]
.getMetadata
.getName

val additionalProps = configurationStep.getAdditionalPodSystemProperties()
assert(!additionalProps(DRIVER_HOST_ADDRESS.key).startsWith(kconf.resourceNamePrefix))
val hostAddress = configurationStep
.getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key)

(serviceName -> hostAddress)
}.toMap

assert(services.size === 10)
services.foreach { case (name, address) =>
assert(!name.startsWith(kconf.resourceNamePrefix))
assert(!address.startsWith(kconf.resourceNamePrefix))
assert(InternetDomainName.isValid(address))
}
}

test("Disallow bind address and driver host to be set explicitly.") {
Expand Down

0 comments on commit fd4c1fa

Please sign in to comment.