diff --git a/cmd/operator/controller/start.go b/cmd/operator/controller/start.go index 8fb54d7ea..19db4fed9 100644 --- a/cmd/operator/controller/start.go +++ b/cmd/operator/controller/start.go @@ -53,6 +53,7 @@ import ( "github.com/kubeflow/spark-operator/internal/metrics" "github.com/kubeflow/spark-operator/internal/scheduler" "github.com/kubeflow/spark-operator/internal/scheduler/volcano" + "github.com/kubeflow/spark-operator/internal/scheduler/yunikorn" "github.com/kubeflow/spark-operator/pkg/common" "github.com/kubeflow/spark-operator/pkg/util" // +kubebuilder:scaffold:imports @@ -206,9 +207,8 @@ func start() { var registry *scheduler.Registry if enableBatchScheduler { registry = scheduler.GetRegistry() - - // Register volcano scheduler. registry.Register(common.VolcanoSchedulerName, volcano.Factory) + registry.Register(yunikorn.SchedulerName, yunikorn.Factory) } // Setup controller for SparkApplication. diff --git a/examples/spark-pi-yunikorn.yaml b/examples/spark-pi-yunikorn.yaml new file mode 100644 index 000000000..43ed1cfde --- /dev/null +++ b/examples/spark-pi-yunikorn.yaml @@ -0,0 +1,45 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: spark-pi-yunikorn + namespace: default +spec: + type: Scala + mode: cluster + image: spark:3.5.0 + imagePullPolicy: IfNotPresent + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar + sparkVersion: 3.5.0 + driver: + labels: + version: 3.5.0 + cores: 1 + coreLimit: 1200m + memory: 512m + serviceAccount: spark-operator-spark + executor: + labels: + version: 3.5.0 + instances: 2 + cores: 1 + coreLimit: 1200m + memory: 512m + batchScheduler: yunikorn + batchSchedulerOptions: + queue: root.default \ No newline at end of file diff --git a/go.mod b/go.mod index 72c1d2548..df65323b3 100644 --- a/go.mod +++ b/go.mod @@ -227,8 +227,11 @@ replace ( k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.29.3 k8s.io/code-generator => k8s.io/code-generator v0.29.3 k8s.io/component-base => k8s.io/component-base v0.29.3 + k8s.io/controller-manager => k8s.io/controller-manager v0.29.3 k8s.io/cri-api => k8s.io/cri-api v0.29.3 k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.29.3 + k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.29.3 + k8s.io/endpointslice => k8s.io/endpointslice v0.29.3 k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.29.3 k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.29.3 k8s.io/kube-proxy => k8s.io/kube-proxy v0.29.3 @@ -237,7 +240,9 @@ replace ( k8s.io/kubelet => k8s.io/kubelet v0.29.3 k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.29.3 k8s.io/metrics => k8s.io/metrics v0.29.3 + k8s.io/mount-utils => k8s.io/mount-utils v0.29.3 k8s.io/node-api => k8s.io/node-api v0.29.3 + k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.29.3 k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.29.3 k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.29.3 k8s.io/sample-controller => k8s.io/sample-controller v0.29.3 diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 753108a90..f938e4c96 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -43,6 +43,7 @@ import ( "github.com/kubeflow/spark-operator/internal/metrics" "github.com/kubeflow/spark-operator/internal/scheduler" "github.com/kubeflow/spark-operator/internal/scheduler/volcano" + "github.com/kubeflow/spark-operator/internal/scheduler/yunikorn" "github.com/kubeflow/spark-operator/pkg/common" "github.com/kubeflow/spark-operator/pkg/util" ) @@ -1197,6 +1198,8 @@ func (r *Reconciler) shouldDoBatchScheduling(app *v1beta2.SparkApplication) (boo RestConfig: r.manager.GetConfig(), } scheduler, err = r.registry.GetScheduler(schedulerName, config) + case yunikorn.SchedulerName: + scheduler, err = r.registry.GetScheduler(schedulerName, nil) } if err != nil || scheduler == nil { diff --git a/internal/scheduler/yunikorn/resourceusage/java.go b/internal/scheduler/yunikorn/resourceusage/java.go new file mode 100644 index 000000000..8b56d64aa --- /dev/null +++ b/internal/scheduler/yunikorn/resourceusage/java.go @@ -0,0 +1,56 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceusage + +import ( + "fmt" + "regexp" + "strconv" + "strings" +) + +var ( + javaStringSuffixes = map[string]int64{ + "b": 1, + "kb": 1 << 10, + "k": 1 << 10, + "mb": 1 << 20, + "m": 1 << 20, + "gb": 1 << 30, + "g": 1 << 30, + "tb": 1 << 40, + "t": 1 << 40, + "pb": 1 << 50, + "p": 1 << 50, + } + + javaStringPattern = regexp.MustCompile(`^([0-9]+)([a-z]+)?$`) +) + +func byteStringAsBytes(byteString string) (int64, error) { + matches := javaStringPattern.FindStringSubmatch(strings.ToLower(byteString)) + if matches != nil { + value, err := strconv.ParseInt(matches[1], 10, 64) + if err != nil { + return 0, err + } + if multiplier, present := javaStringSuffixes[matches[2]]; present { + return value * multiplier, nil + } + } + return 0, fmt.Errorf("unable to parse byte string: %s", byteString) +} diff --git a/internal/scheduler/yunikorn/resourceusage/java_test.go b/internal/scheduler/yunikorn/resourceusage/java_test.go new file mode 100644 index 000000000..d9d1ae59e --- /dev/null +++ b/internal/scheduler/yunikorn/resourceusage/java_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceusage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestByteStringAsMb(t *testing.T) { + testCases := []struct { + input string + expected int + }{ + {"1k", 1024}, + {"1m", 1024 * 1024}, + {"1g", 1024 * 1024 * 1024}, + {"1t", 1024 * 1024 * 1024 * 1024}, + {"1p", 1024 * 1024 * 1024 * 1024 * 1024}, + } + + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + actual, err := byteStringAsBytes(tc.input) + assert.Nil(t, err) + assert.Equal(t, int64(tc.expected), actual) + }) + } +} + +func TestByteStringAsMbInvalid(t *testing.T) { + invalidInputs := []string{ + "0.064", + "0.064m", + "500ub", + "This breaks 600b", + "This breaks 600", + "600gb This breaks", + "This 123mb breaks", + } + + for _, input := range invalidInputs { + t.Run(input, func(t *testing.T) { + _, err := byteStringAsBytes(input) + assert.NotNil(t, err) + }) + } +} diff --git a/internal/scheduler/yunikorn/resourceusage/memory.go b/internal/scheduler/yunikorn/resourceusage/memory.go new file mode 100644 index 000000000..397f1e209 --- /dev/null +++ b/internal/scheduler/yunikorn/resourceusage/memory.go @@ -0,0 +1,108 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceusage + +import ( + "fmt" + "math" + "strconv" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/pkg/common" +) + +func isJavaApp(appType v1beta2.SparkApplicationType) bool { + return appType == v1beta2.SparkApplicationTypeJava || appType == v1beta2.SparkApplicationTypeScala +} + +func getMemoryOverheadFactor(app *v1beta2.SparkApplication) (float64, error) { + if app.Spec.MemoryOverheadFactor != nil { + parsed, err := strconv.ParseFloat(*app.Spec.MemoryOverheadFactor, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse memory overhead factor as float: %w", err) + } + return parsed, nil + } else if isJavaApp(app.Spec.Type) { + return common.DefaultJVMMemoryOverheadFactor, nil + } + + return common.DefaultNonJVMMemoryOverheadFactor, nil +} + +func memoryRequestBytes(podSpec *v1beta2.SparkPodSpec, memoryOverheadFactor float64) (int64, error) { + var memoryBytes, memoryOverheadBytes int64 + + if podSpec.Memory != nil { + parsed, err := byteStringAsBytes(*podSpec.Memory) + if err != nil { + return 0, err + } + memoryBytes = parsed + } + + if podSpec.MemoryOverhead != nil { + parsed, err := byteStringAsBytes(*podSpec.MemoryOverhead) + if err != nil { + return 0, err + } + memoryOverheadBytes = parsed + } else { + memoryOverheadBytes = int64(math.Max( + float64(memoryBytes)*memoryOverheadFactor, + common.MinMemoryOverhead, + )) + } + + return memoryBytes + memoryOverheadBytes, nil +} + +func bytesToMi(b int64) string { + // this floors the value to the nearest mebibyte + return fmt.Sprintf("%dMi", b/1024/1024) +} + +func driverMemoryRequest(app *v1beta2.SparkApplication) (string, error) { + memoryOverheadFactor, err := getMemoryOverheadFactor(app) + if err != nil { + return "", err + } + + requestBytes, err := memoryRequestBytes(&app.Spec.Driver.SparkPodSpec, memoryOverheadFactor) + if err != nil { + return "", err + } + + // Convert memory quantity to mebibytes even if larger than a gibibyte to match Spark + // https://github.com/apache/spark/blob/11b682cf5b7c5360a02410be288b7905eecc1d28/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala#L88 + // https://github.com/apache/spark/blob/11b682cf5b7c5360a02410be288b7905eecc1d28/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala#L121 + return bytesToMi(requestBytes), nil +} + +func executorMemoryRequest(app *v1beta2.SparkApplication) (string, error) { + memoryOverheadFactor, err := getMemoryOverheadFactor(app) + if err != nil { + return "", err + } + + requestBytes, err := memoryRequestBytes(&app.Spec.Executor.SparkPodSpec, memoryOverheadFactor) + if err != nil { + return "", err + } + + // See comment above in driver + return bytesToMi(requestBytes), nil +} diff --git a/internal/scheduler/yunikorn/resourceusage/memory_test.go b/internal/scheduler/yunikorn/resourceusage/memory_test.go new file mode 100644 index 000000000..f7fa64b7d --- /dev/null +++ b/internal/scheduler/yunikorn/resourceusage/memory_test.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceusage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBytesToMi(t *testing.T) { + testCases := []struct { + input int64 + expected string + }{ + {(2 * 1024 * 1024) - 1, "1Mi"}, + {2 * 1024 * 1024, "2Mi"}, + {(1024 * 1024 * 1024) - 1, "1023Mi"}, + {1024 * 1024 * 1024, "1024Mi"}, + } + + for _, tc := range testCases { + assert.Equal(t, tc.expected, bytesToMi(tc.input)) + } +} diff --git a/internal/scheduler/yunikorn/resourceusage/resource_usage.go b/internal/scheduler/yunikorn/resourceusage/resource_usage.go new file mode 100644 index 000000000..adc3e0e98 --- /dev/null +++ b/internal/scheduler/yunikorn/resourceusage/resource_usage.go @@ -0,0 +1,76 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceusage + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/kubeflow/spark-operator/api/v1beta2" +) + +func cpuRequest(cores *int32, coreRequest *string) (string, error) { + // coreRequest takes precedence over cores if specified + // coreLimit is not relevant as pods are scheduled based on request values + if coreRequest != nil { + // Fail fast by validating coreRequest before app submission even though + // both Spark and Yunikorn validate this field anyway + if _, err := resource.ParseQuantity(*coreRequest); err != nil { + return "", fmt.Errorf("failed to parse %s: %w", *coreRequest, err) + } + return *coreRequest, nil + } + if cores != nil { + return fmt.Sprintf("%d", *cores), nil + } + return "1", nil +} + +func DriverPodRequests(app *v1beta2.SparkApplication) (map[string]string, error) { + cpuValue, err := cpuRequest(app.Spec.Driver.Cores, app.Spec.Driver.CoreRequest) + if err != nil { + return nil, err + } + + memoryValue, err := driverMemoryRequest(app) + if err != nil { + return nil, err + } + + return map[string]string{ + "cpu": cpuValue, + "memory": memoryValue, + }, nil +} + +func ExecutorPodRequests(app *v1beta2.SparkApplication) (map[string]string, error) { + cpuValue, err := cpuRequest(app.Spec.Executor.Cores, app.Spec.Executor.CoreRequest) + if err != nil { + return nil, err + } + + memoryValue, err := executorMemoryRequest(app) + if err != nil { + return nil, err + } + + return map[string]string{ + "cpu": cpuValue, + "memory": memoryValue, + }, nil +} diff --git a/internal/scheduler/yunikorn/resourceusage/resource_usage_test.go b/internal/scheduler/yunikorn/resourceusage/resource_usage_test.go new file mode 100644 index 000000000..8cadb800f --- /dev/null +++ b/internal/scheduler/yunikorn/resourceusage/resource_usage_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceusage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/kubeflow/spark-operator/pkg/util" +) + +func TestCpuRequest(t *testing.T) { + testCases := []struct { + cores *int32 + coreRequest *string + expected string + }{ + {nil, nil, "1"}, + {util.Int32Ptr(1), nil, "1"}, + {nil, util.StringPtr("1"), "1"}, + {util.Int32Ptr(1), util.StringPtr("500m"), "500m"}, + } + + for _, tc := range testCases { + actual, err := cpuRequest(tc.cores, tc.coreRequest) + assert.Nil(t, err) + assert.Equal(t, tc.expected, actual) + } +} + +func TestCpuRequestInvalid(t *testing.T) { + invalidInputs := []string{ + "", + "asd", + "Random 500m", + } + + for _, input := range invalidInputs { + _, err := cpuRequest(nil, &input) + assert.NotNil(t, err) + } +} diff --git a/internal/scheduler/yunikorn/scheduler.go b/internal/scheduler/yunikorn/scheduler.go new file mode 100644 index 000000000..664ab44c7 --- /dev/null +++ b/internal/scheduler/yunikorn/scheduler.go @@ -0,0 +1,178 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package yunikorn + +import ( + "encoding/json" + "fmt" + "maps" + + v1 "k8s.io/api/core/v1" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/internal/scheduler" + "github.com/kubeflow/spark-operator/internal/scheduler/yunikorn/resourceusage" + "github.com/kubeflow/spark-operator/pkg/util" +) + +const ( + SchedulerName = "yunikorn" + + // The names are set to match the Yunikorn gang scheduling example for Spark, but these can be any + // value as long as what's on the pod matches the task group definition + // https://yunikorn.apache.org/docs/next/user_guide/gang_scheduling/#enable-gang-scheduling-for-spark-jobs + driverTaskGroupName = "spark-driver" + executorTaskGroupName = "spark-executor" + + // https://yunikorn.apache.org/docs/next/user_guide/labels_and_annotations_in_yunikorn/ + taskGroupNameAnnotation = "yunikorn.apache.org/task-group-name" + taskGroupsAnnotation = "yunikorn.apache.org/task-groups" + queueLabel = "queue" +) + +// This struct has been defined separately rather than imported so that tags can be included for JSON marshalling +// https://github.com/apache/yunikorn-k8shim/blob/207e4031c6484c965fca4018b6b8176afc5956b4/pkg/cache/amprotocol.go#L47-L56 +type taskGroup struct { + Name string `json:"name"` + MinMember int32 `json:"minMember"` + MinResource map[string]string `json:"minResource,omitempty"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + Tolerations []v1.Toleration `json:"tolerations,omitempty"` + Affinity *v1.Affinity `json:"affinity,omitempty"` + Labels map[string]string `json:"labels,omitempty"` +} + +type Scheduler struct{} + +func Factory(_ scheduler.Config) (scheduler.Interface, error) { + return &Scheduler{}, nil +} + +func (s *Scheduler) Name() string { + return SchedulerName +} + +func (s *Scheduler) ShouldSchedule(_ *v1beta2.SparkApplication) bool { + // Yunikorn gets all the information it needs from pod annotations on the originating pod, + // so no additional resources need to be created + return true +} + +func (s *Scheduler) Schedule(app *v1beta2.SparkApplication) error { + driverMinResources, err := resourceusage.DriverPodRequests(app) + if err != nil { + return fmt.Errorf("failed to calculate driver minResources: %w", err) + } + + taskGroups := []taskGroup{ + { + Name: driverTaskGroupName, + MinMember: 1, + MinResource: driverMinResources, + NodeSelector: mergeNodeSelector(app.Spec.NodeSelector, app.Spec.Driver.NodeSelector), + Tolerations: app.Spec.Driver.Tolerations, + Affinity: app.Spec.Driver.Affinity, + Labels: app.Spec.Driver.Labels, + }, + } + + // A minMember of zero is not a valid config for a Yunikorn task group, so we should leave out + // the executor task group completely if the initial number of executors is zero + if numInitialExecutors := util.GetInitialExecutorNumber(app); numInitialExecutors > 0 { + executorMinResources, err := resourceusage.ExecutorPodRequests(app) + if err != nil { + return fmt.Errorf("failed to calculate executor minResources: %w", err) + } + + taskGroups = append(taskGroups, taskGroup{ + Name: executorTaskGroupName, + MinMember: numInitialExecutors, + MinResource: executorMinResources, + NodeSelector: mergeNodeSelector(app.Spec.NodeSelector, app.Spec.Executor.NodeSelector), + Tolerations: app.Spec.Executor.Tolerations, + Affinity: app.Spec.Executor.Affinity, + Labels: app.Spec.Executor.Labels, + }) + } + + // Yunikorn re-uses the application ID set by the driver under the label "spark-app-selector", + // so there is no need to set an application ID + // https://github.com/apache/yunikorn-k8shim/blob/2278b3217c702ccb796e4d623bc7837625e5a4ec/pkg/common/utils/utils.go#L168-L171 + addQueueLabels(app) + if err := addTaskGroupAnnotations(app, taskGroups); err != nil { + return fmt.Errorf("failed to add task group annotations: %w", err) + } + + return nil +} + +func (s *Scheduler) Cleanup(_ *v1beta2.SparkApplication) error { + // No additional resources are created so there's nothing to be cleaned up + return nil +} + +func addTaskGroupAnnotations(app *v1beta2.SparkApplication, taskGroups []taskGroup) error { + marshalledTaskGroups, err := json.Marshal(taskGroups) + if err != nil { + return fmt.Errorf("failed to marshal taskGroups: %w", err) + } + + if app.Spec.Driver.Annotations == nil { + app.Spec.Driver.Annotations = make(map[string]string) + } + if app.Spec.Executor.Annotations == nil { + app.Spec.Executor.Annotations = make(map[string]string) + } + + app.Spec.Driver.Annotations[taskGroupNameAnnotation] = driverTaskGroupName + app.Spec.Executor.Annotations[taskGroupNameAnnotation] = executorTaskGroupName + + // The task group definition only needs to be present on the originating pod + // https://yunikorn.apache.org/docs/next/user_guide/gang_scheduling/#app-configuration + app.Spec.Driver.Annotations[taskGroupsAnnotation] = string(marshalledTaskGroups) + + return nil +} + +func addQueueLabels(app *v1beta2.SparkApplication) { + if app.Spec.BatchSchedulerOptions != nil && app.Spec.BatchSchedulerOptions.Queue != nil { + if app.Spec.Driver.Labels == nil { + app.Spec.Driver.Labels = make(map[string]string) + } + if app.Spec.Executor.Labels == nil { + app.Spec.Executor.Labels = make(map[string]string) + } + + app.Spec.Driver.Labels[queueLabel] = *app.Spec.BatchSchedulerOptions.Queue + app.Spec.Executor.Labels[queueLabel] = *app.Spec.BatchSchedulerOptions.Queue + } +} + +func mergeNodeSelector(appNodeSelector map[string]string, podNodeSelector map[string]string) map[string]string { + // app.Spec.NodeSelector is passed "spark.kubernetes.node.selector.%s", which means it will be present + // in the pod definition before the mutating webhook. The mutating webhook merges the driver/executor-specific + // NodeSelector with what's already present + nodeSelector := make(map[string]string) + maps.Copy(nodeSelector, appNodeSelector) + maps.Copy(nodeSelector, podNodeSelector) + + // Return nil if there are no entries in the map so that the field is skipped during JSON marshalling + if len(nodeSelector) == 0 { + return nil + } + return nodeSelector +} diff --git a/internal/scheduler/yunikorn/scheduler_test.go b/internal/scheduler/yunikorn/scheduler_test.go new file mode 100644 index 000000000..4213848fe --- /dev/null +++ b/internal/scheduler/yunikorn/scheduler_test.go @@ -0,0 +1,285 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package yunikorn + +import ( + "encoding/json" + "testing" + + v1 "k8s.io/api/core/v1" + + "github.com/stretchr/testify/assert" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/pkg/util" +) + +func TestSchedule(t *testing.T) { + testCases := []struct { + name string + app *v1beta2.SparkApplication + expected []taskGroup + }{ + { + name: "spark-pi-yunikorn", + app: &v1beta2.SparkApplication{ + Spec: v1beta2.SparkApplicationSpec{ + Type: v1beta2.SparkApplicationTypeScala, + Driver: v1beta2.DriverSpec{ + SparkPodSpec: v1beta2.SparkPodSpec{ + Cores: util.Int32Ptr(1), + CoreLimit: util.StringPtr("1200m"), + Memory: util.StringPtr("512m"), + }, + }, + Executor: v1beta2.ExecutorSpec{ + Instances: util.Int32Ptr(2), + SparkPodSpec: v1beta2.SparkPodSpec{ + Cores: util.Int32Ptr(1), + CoreLimit: util.StringPtr("1200m"), + Memory: util.StringPtr("512m"), + }, + }, + BatchSchedulerOptions: &v1beta2.BatchSchedulerConfiguration{ + Queue: util.StringPtr("root.default"), + }, + }, + }, + expected: []taskGroup{ + { + Name: "spark-driver", + MinMember: 1, + MinResource: map[string]string{ + "cpu": "1", + "memory": "896Mi", // 512Mi + 384Mi min overhead + }, + }, + { + Name: "spark-executor", + MinMember: 2, + MinResource: map[string]string{ + "cpu": "1", + "memory": "896Mi", // 512Mi + 384Mi min overhead + }, + }, + }, + }, + { + name: "Dynamic allocation and memory overhead", + app: &v1beta2.SparkApplication{ + Spec: v1beta2.SparkApplicationSpec{ + Type: v1beta2.SparkApplicationTypePython, + MemoryOverheadFactor: util.StringPtr("0.3"), + Driver: v1beta2.DriverSpec{ + CoreRequest: util.StringPtr("2000m"), + SparkPodSpec: v1beta2.SparkPodSpec{ + Cores: util.Int32Ptr(4), + Memory: util.StringPtr("8g"), + }, + }, + Executor: v1beta2.ExecutorSpec{ + Instances: util.Int32Ptr(4), + SparkPodSpec: v1beta2.SparkPodSpec{ + MemoryOverhead: util.StringPtr("2g"), + Cores: util.Int32Ptr(8), + Memory: util.StringPtr("64g"), + }, + }, + DynamicAllocation: &v1beta2.DynamicAllocation{ + Enabled: true, + InitialExecutors: util.Int32Ptr(8), + MinExecutors: util.Int32Ptr(2), + }, + BatchSchedulerOptions: &v1beta2.BatchSchedulerConfiguration{ + Queue: util.StringPtr("root.default"), + }, + }, + }, + expected: []taskGroup{ + { + Name: "spark-driver", + MinMember: 1, + MinResource: map[string]string{ + "cpu": "2000m", // CoreRequest takes precedence over Cores + "memory": "10649Mi", // 1024Mi * 8 * 1.3 (manually specified overhead) + }, + }, + { + Name: "spark-executor", + MinMember: 8, // Max of instances, dynamic allocation min and initial + MinResource: map[string]string{ + "cpu": "8", + "memory": "67584Mi", // 1024Mi * 64 + 1024 * 2 (executor memory overhead takes precedence) + }, + }, + }, + }, + { + name: "Node selectors, tolerations, affinity and labels", + app: &v1beta2.SparkApplication{ + Spec: v1beta2.SparkApplicationSpec{ + Type: v1beta2.SparkApplicationTypePython, + NodeSelector: map[string]string{"key": "value"}, + Driver: v1beta2.DriverSpec{ + SparkPodSpec: v1beta2.SparkPodSpec{ + Cores: util.Int32Ptr(1), + Memory: util.StringPtr("1g"), + NodeSelector: map[string]string{"key": "newvalue", "key2": "value2"}, + Tolerations: []v1.Toleration{ + { + Key: "example-key", + Operator: v1.TolerationOpEqual, + Value: "example-value", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }, + Executor: v1beta2.ExecutorSpec{ + Instances: util.Int32Ptr(1), + SparkPodSpec: v1beta2.SparkPodSpec{ + Cores: util.Int32Ptr(1), + Memory: util.StringPtr("1g"), + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "another-key", + Operator: v1.NodeSelectorOpIn, + Values: []string{"value1", "value2"}, + }, + }, + }, + }, + }, + }, + }, + Labels: map[string]string{"label": "value"}, + }, + }, + }, + }, + expected: []taskGroup{ + { + Name: "spark-driver", + MinMember: 1, + MinResource: map[string]string{ + "cpu": "1", + "memory": "1433Mi", // 1024Mi * 1.4 non-JVM overhead + }, + NodeSelector: map[string]string{"key": "newvalue", "key2": "value2"}, + Tolerations: []v1.Toleration{ + { + Key: "example-key", + Operator: v1.TolerationOpEqual, + Value: "example-value", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + { + Name: "spark-executor", + MinMember: 1, + MinResource: map[string]string{ + "cpu": "1", + "memory": "1433Mi", // 1024Mi * 1.4 non-JVM overhead + }, + NodeSelector: map[string]string{"key": "value"}, // No executor specific node-selector + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "another-key", + Operator: v1.NodeSelectorOpIn, + Values: []string{"value1", "value2"}, + }, + }, + }, + }, + }, + }, + }, + Labels: map[string]string{"label": "value"}, + }, + }, + }, + } + + scheduler := &Scheduler{} + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + marshalledExpected, err := json.Marshal(tc.expected) + if err != nil { + t.Fatalf("Failed to marshal expected task groups: %v", err) + } + + err = scheduler.Schedule(tc.app) + assert.Nil(t, err) + assert.JSONEq(t, string(marshalledExpected), tc.app.Spec.Driver.Annotations[taskGroupsAnnotation]) + + options := tc.app.Spec.BatchSchedulerOptions + if options != nil && options.Queue != nil { + assert.Equal(t, *options.Queue, tc.app.Spec.Driver.Labels[queueLabel]) + assert.Equal(t, *options.Queue, tc.app.Spec.Executor.Labels[queueLabel]) + } + }) + } +} + +func TestMergeNodeSelector(t *testing.T) { + testCases := []struct { + appNodeSelector map[string]string + podNodeSelector map[string]string + expected map[string]string + }{ + { + appNodeSelector: map[string]string{}, + podNodeSelector: map[string]string{}, + expected: nil, + }, + { + appNodeSelector: map[string]string{"key1": "value1"}, + podNodeSelector: map[string]string{}, + expected: map[string]string{"key1": "value1"}, + }, + { + appNodeSelector: map[string]string{}, + podNodeSelector: map[string]string{"key1": "value1"}, + expected: map[string]string{"key1": "value1"}, + }, + { + appNodeSelector: map[string]string{"key1": "value1"}, + podNodeSelector: map[string]string{"key2": "value2"}, + expected: map[string]string{"key1": "value1", "key2": "value2"}, + }, + { + appNodeSelector: map[string]string{"key1": "value1"}, + podNodeSelector: map[string]string{"key1": "value2", "key2": "value2"}, + expected: map[string]string{"key1": "value2", "key2": "value2"}, + }, + } + + for _, tc := range testCases { + assert.Equal(t, tc.expected, mergeNodeSelector(tc.appNodeSelector, tc.podNodeSelector)) + } +} diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index 273ad7401..aaa8b9456 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -428,3 +428,29 @@ func GetExecutorRequestResource(app *v1beta2.SparkApplication) corev1.ResourceLi } return SumResourceList(resourceList) } + +// GetInitialExecutorNumber calculates the initial number of executor pods that will be requested by the driver on startup. +func GetInitialExecutorNumber(app *v1beta2.SparkApplication) int32 { + // The reference for this implementation: https://github.com/apache/spark/blob/ba208b9ca99990fa329c36b28d0aa2a5f4d0a77e/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala#L31 + var initialNumExecutors int32 + + dynamicAllocationEnabled := app.Spec.DynamicAllocation != nil && app.Spec.DynamicAllocation.Enabled + if dynamicAllocationEnabled { + if app.Spec.Executor.Instances != nil { + initialNumExecutors = max(initialNumExecutors, *app.Spec.Executor.Instances) + } + if app.Spec.DynamicAllocation.InitialExecutors != nil { + initialNumExecutors = max(initialNumExecutors, *app.Spec.DynamicAllocation.InitialExecutors) + } + if app.Spec.DynamicAllocation.MinExecutors != nil { + initialNumExecutors = max(initialNumExecutors, *app.Spec.DynamicAllocation.MinExecutors) + } + } else { + initialNumExecutors = 2 + if app.Spec.Executor.Instances != nil { + initialNumExecutors = *app.Spec.Executor.Instances + } + } + + return initialNumExecutors +}