diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 0f9378155c68d..939aa88b07973 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -88,7 +88,6 @@ private[spark] class BasicExecutorFeatureStep( val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCoresRequest) .build() -<<<<<<< HEAD val executorEnv: Seq[EnvVar] = { (Seq( @@ -98,8 +97,8 @@ private[spark] class BasicExecutorFeatureStep( (ENV_APPLICATION_ID, kubernetesConf.appId), // This is to set the SPARK_CONF_DIR to be /opt/spark/conf (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), - (ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId) - ) ++ kubernetesConf.roleEnvs).map { case (k, v) => + (ENV_EXECUTOR_ID, kubernetesConf.executorId) + ) ++ kubernetesConf.environment).map { case (k, v) => new EnvVarBuilder() .withName(k) .withValue(v) @@ -111,43 +110,6 @@ private[spark] class BasicExecutorFeatureStep( .withValueFrom(new EnvVarSourceBuilder() .withNewFieldRef("v1", "status.podIP") .build()) -======= - val executorExtraClasspathEnv = executorExtraClasspath.map { cp => - new EnvVarBuilder() - .withName(ENV_CLASSPATH) - .withValue(cp) - .build() - } - val executorExtraJavaOptionsEnv = kubernetesConf - .get(EXECUTOR_JAVA_OPTIONS) - .map { opts => - val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, - kubernetesConf.executorId) - val delimitedOpts = Utils.splitCommandString(subsOpts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) - val executorEnv = (Seq( - (ENV_DRIVER_URL, driverUrl), - (ENV_EXECUTOR_CORES, executorCores.toString), - (ENV_EXECUTOR_MEMORY, executorMemoryString), - (ENV_APPLICATION_ID, kubernetesConf.appId), - // This is to set the SPARK_CONF_DIR to be /opt/spark/conf - (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), - (ENV_EXECUTOR_ID, kubernetesConf.executorId)) ++ - kubernetesConf.environment) - .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build() - ) ++ Seq( - new EnvVarBuilder() - .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom(new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") ->>>>>>> master .build()) } ++ { Option(secMgr.getSecretKey()).map { authSecret => @@ -166,7 +128,7 @@ private[spark] class BasicExecutorFeatureStep( } ++ { val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts => val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, - kubernetesConf.roleSpecificConf.executorId) + kubernetesConf.executorId) Utils.splitCommandString(subsOpts) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 4e16ec20d227b..ba273cad6a8e5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -27,7 +27,7 @@ import org.apache.spark.deploy.k8s.features._ private[spark] class KubernetesExecutorBuilder( provideBasicStep: (KubernetesExecutorConf, SecurityManager) => BasicExecutorFeatureStep = - new BasicExecutorFeatureStep(_), + new BasicExecutorFeatureStep(_, _), provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = new MountSecretsFeatureStep(_), provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 2428ff7aff3d5..6aa862643c788 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -126,7 +126,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) - checkEnv(executor, conf, + checkEnv(executor, baseConf, Map("SPARK_JAVA_OPT_0" -> "foo=bar", ENV_CLASSPATH -> "bar=baz", "qux" -> "quux")) @@ -151,19 +151,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val secMgr = new SecurityManager(conf) secMgr.initializeAuth() - val step = new BasicExecutorFeatureStep( - KubernetesConf( - conf, - KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), - RESOURCE_NAME_PREFIX, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None), + val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf), secMgr) val executor = step.configurePod(SparkPod.initialPod())