Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Use the new initContainers field instead of the deprecated annotation #528

Merged
merged 2 commits into from
Oct 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>2.2.13</kubernetes.client.version>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ package object constants {
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"

// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH =
"/mnt/secrets/spark-init"
private[spark] val INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,15 @@
*/
package org.apache.spark.deploy.k8s.submit

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.deploy.k8s.constants._

private[spark] object InitContainerUtil {

private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)

def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
val resolvedInitContainers = originalPodSpec
.getMetadata
.getAnnotations
.asScala
.get(INIT_CONTAINER_ANNOTATION)
.map { existingInitContainerAnnotation =>
val existingInitContainers = OBJECT_MAPPER.readValue(
existingInitContainerAnnotation, classOf[List[Container]])
existingInitContainers ++ Seq(initContainer)
}.getOrElse(Seq(initContainer))
val resolvedSerializedInitContainers = OBJECT_MAPPER.writeValueAsString(resolvedInitContainers)
new PodBuilder(originalPodSpec)
.editMetadata()
.removeFromAnnotations(INIT_CONTAINER_ANNOTATION)
.addToAnnotations(INIT_CONTAINER_ANNOTATION, resolvedSerializedInitContainers)
.endMetadata()
.editOrNewSpec()
.addToInitContainers(initContainer)
.endSpec()
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import java.util.concurrent.{CountDownLatch, TimeUnit}

import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -109,15 +109,15 @@ private[k8s] class LoggingPodStatusWatcherImpl(
("namespace", pod.getMetadata.getNamespace()),
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", pod.getMetadata.getCreationTimestamp()),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),

// spec details
("service account name", pod.getSpec.getServiceAccountName()),
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName()),

// status
("start time", pod.getStatus.getStartTime),
("start time", formatTime(pod.getStatus.getStartTime)),
("container images",
pod.getStatus.getContainerStatuses()
.asScala
Expand Down Expand Up @@ -162,7 +162,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", running.getStartedAt))
("Container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
Expand All @@ -175,4 +175,8 @@ private[k8s] class LoggingPodStatusWatcherImpl(
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("Container state", "N/A")))
}

private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
import org.apache.spark.util.Utils

private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
private[spark] class InitContainerBootstrapStepSuite extends SparkFunSuite {

private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)
private val CONFIG_MAP_NAME = "spark-init-config-map"
private val CONFIG_MAP_KEY = "spark-init-config-map-key"

Expand All @@ -59,12 +58,9 @@ private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvKey)
assert(additionalDriverEnv.head.getValue ===
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvValue)
val driverAnnotations = preparedDriverSpec.driverPod.getMetadata.getAnnotations.asScala
assert(driverAnnotations.size === 1)
val initContainers = OBJECT_MAPPER.readValue(
driverAnnotations(INIT_CONTAINER_ANNOTATION), classOf[Array[Container]])
assert(initContainers.length === 1)
val initContainerEnv = initContainers.head.getEnv.asScala
val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
assert(initContainers.size() === 1)
val initContainerEnv = initContainers.get(0).getEnv.asScala
assert(initContainerEnv.size === 1)
assert(initContainerEnv.head.getName ===
SecondTestInitContainerConfigurationStep$.additionalInitContainerEnvKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
verify(nodeAffinityExecutorPodModifier, times(1))
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))

assert(executor.getMetadata.getAnnotations.size() === 1)
assert(executor.getMetadata.getAnnotations.containsKey(INIT_CONTAINER_ANNOTATION))
assert(executor.getSpec.getInitContainers.size() === 1)
checkOwnerReferences(executor, driverPodUid)
}

Expand Down
4 changes: 2 additions & 2 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-linux-amd64</url>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
Expand All @@ -363,7 +363,7 @@
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-darwin-amd64</url>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ private[spark] object Minikube extends Logging {
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
val statusString = executeMinikube("status")
.filter(_.contains("minikubeVM: "))
.filter(_.contains("minikube: "))
.head
.replaceFirst("minikubeVM: ", "")
.replaceFirst("minikube: ", "")
MinikubeStatus.unapply(statusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
}
Expand All @@ -78,7 +78,7 @@ private[spark] object Minikube extends Logging {

def deleteMinikube(): Unit = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
if (getMinikubeStatus != MinikubeStatus.DOES_NOT_EXIST) {
if (getMinikubeStatus != MinikubeStatus.NONE) {
executeMinikube("delete")
} else {
logInfo("Minikube was already not running.")
Expand Down Expand Up @@ -115,10 +115,17 @@ private[spark] object Minikube extends Logging {

private[spark] object MinikubeStatus extends Enumeration {

// The following states are listed according to
// https://github.com/docker/machine/blob/master/libmachine/state/state.go.
val STARTING = status("Starting")
val RUNNING = status("Running")
val PAUSED = status("Paused")
val STOPPING = status("Stopping")
val STOPPED = status("Stopped")
val DOES_NOT_EXIST = status("Does Not Exist")
val ERROR = status("Error")
val TIMEOUT = status("Timeout")
val SAVED = status("Saved")
val NONE = status("")

def status(value: String): Value = new Val(nextId, value)
def unapply(s: String): Option[Value] = values.find(s == _.toString)
Expand Down