Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

add support for batchScheduler in Spark Tasks #216

Merged
merged 2 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
},
}

if val, ok := sparkConfig["spark.batchScheduler"]; ok {
j.Spec.BatchScheduler = &val
}

if sparkJob.MainApplicationFile != "" {
j.Spec.MainApplicationFile = &sparkJob.MainApplicationFile
}
Expand Down
2 changes: 2 additions & 0 deletions go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
"spark.flyte.feature1.enabled": "true",
"spark.flyteorg.feature2.enabled": "true",
"spark.flyteorg.feature3.enabled": "true",
"spark.batchScheduler": "volcano",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add it to spark-conf? Can we add it to just regular config? And I would prefer the config to say
scheduler: default - as default
and users can then suplement it as scheduler: volcano

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for spark conf is that for our use case we don't want to enable it for everyone, we'd like to try it out with a specific workflow. Adding it to regular config will set it for all unless we move to a config for each project/workflow. And I have no preference on whether batchScheduler or scheduler. Spark uses batch scheduler so figured I'd keep the same naming convention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kumare3 @akhurana001 any thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the spark feature support here as well instead of explicitly handling this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if the SparkOperator lets you set the scheduler via a conf or is its CRD spec only ?

If its only the spec, then I don,t think we have a perfect solution here but it probably still makes sense to have the same UX for customers even if we have to handle the impl. separately (unless @kumare3 has any other thoughts ? )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time looking at the various configs and tried setting it as part of the config in our deployment but looks like it can only be set in the CRD spec.

I agree that there is no perfect solution. For this reason I proposed the change where a user can specify spark.batchScheduler and that would be used in the CRD spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kumare3 @akhurana001 in that case are there any further changes needed for this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

}

dummyEnvVars = []*core.KeyValuePair{
Expand Down Expand Up @@ -384,6 +385,7 @@ func TestBuildResourceSpark(t *testing.T) {
assert.Equal(t, int32(execInstances), *sparkApp.Spec.Executor.Instances)
assert.Equal(t, dummySparkConf["spark.driver.memory"], *sparkApp.Spec.Driver.Memory)
assert.Equal(t, dummySparkConf["spark.executor.memory"], *sparkApp.Spec.Executor.Memory)
assert.Equal(t, dummySparkConf["spark.batchScheduler"], *sparkApp.Spec.BatchScheduler)

// Validate Interruptible Toleration and NodeSelector set for Executor but not Driver.
assert.Equal(t, 0, len(sparkApp.Spec.Driver.Tolerations))
Expand Down