Skip to content

Commit

Permalink
Fix some style concerns
Browse files Browse the repository at this point in the history
  • Loading branch information
foxish committed Oct 18, 2017
1 parent 488c535 commit 82b79a7
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ private[spark] object ConfigurationUtils {
}

def requireBothOrNeitherDefined(
opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
opt1: Option[_],
opt2: Option[_],
errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,38 @@ private[spark] object SparkKubernetesClientFactory {
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
ConfigurationUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
val clientCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
val config = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
val baseHttpClient = HttpClientUtils.createHttpClient(config)
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
.dispatcher(dispatcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.util.Utils

// Configures executor pods. Construct one of these with a SparkConf to set up properties that are
// common across all executors. Then, pass in dynamic parameters into createExecutorPod.
/**
* Configures executor pods. Construct one of these with a SparkConf to set up properties that are
* common across all executors. Then, pass in dynamic parameters into createExecutorPod.
*/
private[spark] trait ExecutorPodFactory {
def createExecutorPod(
executorId: String,
Expand All @@ -44,52 +46,52 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
import ExecutorPodFactoryImpl._

private val executorExtraClasspath = sparkConf.get(
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)

private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX,
"executor label")
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX,
"executor label")
require(
!executorLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
!executorLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
require(
!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
s" Spark.")
!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
s" Spark.")

private val executorAnnotations =
ConfigurationUtils.parsePrefixedKeyValuePairs (
sparkConf,
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
"executor annotation")
ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
"executor annotation")
private val nodeSelector =
ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_NODE_SELECTOR_PREFIX,
"node selector")
ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_NODE_SELECTOR_PREFIX,
"node selector")

private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
private val blockmanagerPort = sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
private val kubernetesDriverPodName = sparkConf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(throw new SparkException("Must specify the driver pod name"))
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(throw new SparkException("Must specify the driver pod name"))

private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)

private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
private val executorMemoryString = sparkConf.get(
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)

private val memoryOverheadMiB = sparkConf
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB

private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d)
Expand All @@ -109,10 +111,10 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
// executorId and applicationId
val hostname = name.substring(Math.max(0, name.length - 63))
val resolvedExecutorLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> applicationId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
executorLabels
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> applicationId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
executorLabels
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${executorMemoryMiB}Mi")
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val sparkConf = sc.getConf

val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sparkConf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sparkConf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))

val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf)
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
"kubernetes-executor-requests")
new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv,
executorPodFactory,
kubernetesClient,
allocatorExecutor,
requestExecutorsService)
scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv,
executorPodFactory,
kubernetesClient,
allocatorExecutor,
requestExecutorsService)
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the driver pod name"))
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
requestExecutorsService)

private val driverPod = try {
kubernetesClient.pods()
Expand All @@ -89,9 +89,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
protected var totalExpectedExecutors = new AtomicInteger(0)

private val driverUrl = RpcEndpointAddress(
conf.get("spark.driver.host"),
conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
conf.get("spark.driver.host"),
conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

private val initialExecutors = getInitialTargetExecutorNumber()

Expand Down Expand Up @@ -121,7 +121,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
for (i <- 0 until math.min(
totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
runningExecutorsToPods.put(executorId, pod)
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
Expand Down Expand Up @@ -202,13 +202,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def start(): Unit = {
super.start()
executorWatchResource.set(
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.watch(new ExecutorPodsWatcher()))
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.watch(new ExecutorPodsWatcher()))

allocatorExecutor.scheduleWithFixedDelay(
allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)
allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)

if (!Utils.isDynamicAllocationEnabled(conf)) {
doRequestTotalExecutors(initialExecutors)
Expand Down Expand Up @@ -281,12 +281,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = {
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
val executorPod = executorPodFactory.createExecutorPod(
executorId,
applicationId(),
driverUrl,
conf.getExecutorEnv,
driverPod,
nodeToLocalTaskCount)
executorId,
applicationId(),
driverUrl,
conf.getExecutorEnv,
driverPod,
nodeToLocalTaskCount)
try {
(executorId, kubernetesClient.pods.create(executorPod))
} catch {
Expand Down
Loading

0 comments on commit 82b79a7

Please sign in to comment.