Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Set Max Limit on number of executors to be launched in spark plugin #101

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package spark
import (
"context"
"fmt"
"strconv"
"time"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
Expand Down Expand Up @@ -30,13 +31,20 @@ import (
const KindSparkApplication = "SparkApplication"
const sparkDriverUI = "sparkDriverUI"
const sparkHistoryUI = "sparkHistoryUI"
const sparkExecutorKey = "spark.executor.instances"

var sparkTaskType = "spark"

// Spark-specific configs
type Config struct {
DefaultSparkConfig map[string]string `json:"spark-config-default" pflag:",Key value pairs of default spark configuration that should be applied to every SparkJob"`
SparkHistoryServerURL string `json:"spark-history-server-url" pflag:",URL for SparkHistory Server that each job will publish the execution history to."`
SparkLimits Limits `json:"spark-limits" pflag:", Spark Limits that can be set for each job execution."`
}

// Spark Limits config
type Limits struct {
ExecutorCountLimit string `json:"executor-count-limit" pflag:",Max limit on number of executors allowed to be launched."`
}

var (
Expand Down Expand Up @@ -71,6 +79,29 @@ func validateSparkJob(sparkJob *plugins.SparkJob) error {
return nil
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

func checkLimits(sparkConfig map[string]string, limits Limits) error {
// Enforce executor limits
if limits.ExecutorCountLimit != "" {
userRequest, err := strconv.Atoi(sparkConfig[sparkExecutorKey])
if err != nil {
return err
}
limit, err := strconv.Atoi(limits.ExecutorCountLimit)
if err != nil {
return err
}
sparkConfig[sparkExecutorKey] = strconv.Itoa(min(limit, userRequest))
}
return nil
}

// Creates a new Job that will execute the main container as well as any generated types the result from the execution.
func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (k8s.Resource, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
Expand Down Expand Up @@ -142,6 +173,11 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
sparkConfig[k] = v
}

err = checkLimits(sparkConfig, GetSparkConfig().SparkLimits)
if err != nil {
return nil, err
}

// Set pod limits.
if sparkConfig["spark.kubernetes.driver.limit.cores"] == "" && sparkConfig["spark.driver.cores"] != "" {
sparkConfig["spark.kubernetes.driver.limit.cores"] = sparkConfig["spark.driver.cores"]
Expand Down
36 changes: 36 additions & 0 deletions go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,39 @@ func TestBuildResourceSpark(t *testing.T) {
assert.NotNil(t, err)
assert.Nil(t, resource)
}

func TestPopulateSparkConfigExecutorCountLimit(t *testing.T) {
testCases := []struct {
description string
limitsConfig Limits
sparkConfig map[string]string
want string
}{
{
"User executor config less than limit",
Limits{ExecutorCountLimit: "3"},
map[string]string{sparkExecutorKey: "5"},
"3",
},
{
"User executor config greater than limit",
Limits{ExecutorCountLimit: "1"},
map[string]string{sparkExecutorKey: "3"},
"1",
},
{
"No limit set.",
Limits{},
map[string]string{sparkExecutorKey: "3"},
"3",
},
}

for _, tc := range testCases {
_ = checkLimits(tc.sparkConfig, tc.limitsConfig)
got := tc.sparkConfig[sparkExecutorKey]
if got != tc.want {
t.Errorf("Test Case: %s, got %s, want %s", tc.description, got, tc.want)
}
}
}