diff --git a/internal/scheduler/yunikorn/resourceusage/memory.go b/internal/scheduler/yunikorn/resourceusage/memory.go index 397f1e209..88e17cdc9 100644 --- a/internal/scheduler/yunikorn/resourceusage/memory.go +++ b/internal/scheduler/yunikorn/resourceusage/memory.go @@ -70,6 +70,26 @@ func memoryRequestBytes(podSpec *v1beta2.SparkPodSpec, memoryOverheadFactor floa return memoryBytes + memoryOverheadBytes, nil } +func executorPysparkMemoryBytes(app *v1beta2.SparkApplication) (int64, error) { + pysparkMemory, found := app.Spec.SparkConf["spark.executor.pyspark.memory"] + if app.Spec.Type != v1beta2.SparkApplicationTypePython || !found { + return 0, nil + } + + // This fields defaults to mebibytes if no resource suffix is specified + // https://github.com/apache/spark/blob/7de71a2ec78d985c2a045f13c1275101b126cec4/docs/configuration.md?plain=1#L289-L305 + if _, err := strconv.Atoi(pysparkMemory); err == nil { + pysparkMemory = pysparkMemory + "m" + } + + pysparkMemoryBytes, err := byteStringAsBytes(pysparkMemory) + if err != nil { + return 0, nil + } + + return pysparkMemoryBytes, nil +} + func bytesToMi(b int64) string { // this floors the value to the nearest mebibyte return fmt.Sprintf("%dMi", b/1024/1024) @@ -103,6 +123,11 @@ func executorMemoryRequest(app *v1beta2.SparkApplication) (string, error) { return "", err } + pysparkMemoryBytes, err := executorPysparkMemoryBytes(app) + if err != nil { + return "", err + } + // See comment above in driver - return bytesToMi(requestBytes), nil + return bytesToMi(requestBytes + pysparkMemoryBytes), nil } diff --git a/internal/scheduler/yunikorn/scheduler_test.go b/internal/scheduler/yunikorn/scheduler_test.go index ce98cbbc8..8d44e9df9 100644 --- a/internal/scheduler/yunikorn/scheduler_test.go +++ b/internal/scheduler/yunikorn/scheduler_test.go @@ -223,6 +223,49 @@ func TestSchedule(t *testing.T) { }, }, }, + { + name: "spark.executor.pyspark.memory", + app: &v1beta2.SparkApplication{ + Spec: v1beta2.SparkApplicationSpec{ + Type: v1beta2.SparkApplicationTypePython, + Driver: v1beta2.DriverSpec{ + SparkPodSpec: v1beta2.SparkPodSpec{ + Cores: util.Int32Ptr(1), + Memory: util.StringPtr("512m"), + }, + }, + Executor: v1beta2.ExecutorSpec{ + Instances: util.Int32Ptr(2), + SparkPodSpec: v1beta2.SparkPodSpec{ + Cores: util.Int32Ptr(1), + Memory: util.StringPtr("512m"), + }, + }, + SparkConf: map[string]string{ + "spark.executor.pyspark.memory": "500m", + }, + }, + }, + expected: []taskGroup{ + { + Name: "spark-driver", + MinMember: 1, + MinResource: map[string]string{ + "cpu": "1", + "memory": "896Mi", // 512Mi + 384Mi min overhead + }, + }, + { + Name: "spark-executor", + MinMember: 2, + MinResource: map[string]string{ + "cpu": "1", + // 512Mi + 384Mi min overhead + 500Mi spark.executor.pyspark.memory + "memory": "1396Mi", + }, + }, + }, + }, } scheduler := &Scheduler{}