From 886c8384ed2dcf88d6e53cb1c78232f67b45bf75 Mon Sep 17 00:00:00 2001 From: Miguel Toledo Date: Tue, 26 Oct 2021 19:15:08 -0400 Subject: [PATCH] add support for batchScheduler in Spark Tasks (#216) * add support for batchScheduler Signed-off-by: Miguel * upd test Signed-off-by: Miguel --- flyteplugins/go/tasks/plugins/k8s/spark/spark.go | 4 ++++ flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark.go index 11801dcb83..48ad9ee9ab 100755 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark.go @@ -210,6 +210,10 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo }, } + if val, ok := sparkConfig["spark.batchScheduler"]; ok { + j.Spec.BatchScheduler = &val + } + if sparkJob.MainApplicationFile != "" { j.Spec.MainApplicationFile = &sparkJob.MainApplicationFile } diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go index 394d06d5d2..4b6a662c15 100755 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go @@ -47,6 +47,7 @@ var ( "spark.flyte.feature1.enabled": "true", "spark.flyteorg.feature2.enabled": "true", "spark.flyteorg.feature3.enabled": "true", + "spark.batchScheduler": "volcano", } dummyEnvVars = []*core.KeyValuePair{ @@ -384,6 +385,7 @@ func TestBuildResourceSpark(t *testing.T) { assert.Equal(t, int32(execInstances), *sparkApp.Spec.Executor.Instances) assert.Equal(t, dummySparkConf["spark.driver.memory"], *sparkApp.Spec.Driver.Memory) assert.Equal(t, dummySparkConf["spark.executor.memory"], *sparkApp.Spec.Executor.Memory) + assert.Equal(t, dummySparkConf["spark.batchScheduler"], *sparkApp.Spec.BatchScheduler) // Validate Interruptible Toleration and NodeSelector set for Executor but not Driver. assert.Equal(t, 0, len(sparkApp.Spec.Driver.Tolerations))