Skip to content

Commit

Permalink
Add cpu logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobsalway committed Aug 12, 2024
1 parent 99df7eb commit 24293a4
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
35 changes: 27 additions & 8 deletions internal/scheduler/yunikorn/resource_usage.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package yunikorn

import "github.com/kubeflow/spark-operator/api/v1beta2"
import (
"fmt"

v1 "k8s.io/api/core/v1"

"github.com/kubeflow/spark-operator/api/v1beta2"
)

func getInitialExecutors(app *v1beta2.SparkApplication) int32 {
// Take the max of the number of executors and both the initial and minimum number of executors from
// dynamic allocation. See the upstream Spark code below for reference.
// https://github.com/apache/spark/blob/bc187013da821eba0ffff2408991e8ec6d2749fe/core/src/main/scala/org/apache/spark/util/Utils.scala#L2539-L2542
initialExecutors := int32(0)

if app.Spec.Executor.Instances != nil {
Expand All @@ -24,10 +27,26 @@ func getInitialExecutors(app *v1beta2.SparkApplication) int32 {
return initialExecutors
}

func driverPodResourceUsage(_ *v1beta2.SparkApplication) (map[string]string, error) {
return nil, nil
func cpuRequired(cores *int32, coreRequest *string) string {
if coreRequest != nil {
return *coreRequest
}
if cores != nil {
return fmt.Sprintf("%d", *cores)
}
return "1"
}

func driverPodResourceUsage(app *v1beta2.SparkApplication) (map[string]string, error) {
minResources := make(map[string]string)
minResources[string(v1.ResourceCPU)] = cpuRequired(app.Spec.Driver.Cores, app.Spec.Driver.CoreRequest)

return minResources, nil
}

func executorPodResourceUsage(_ *v1beta2.SparkApplication) (map[string]string, error) {
return nil, nil
func executorPodResourceUsage(app *v1beta2.SparkApplication) (map[string]string, error) {
minResources := make(map[string]string)
minResources[string(v1.ResourceCPU)] = cpuRequired(app.Spec.Executor.Cores, app.Spec.Executor.CoreRequest)

return minResources, nil
}
5 changes: 3 additions & 2 deletions internal/scheduler/yunikorn/resource_usage_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package yunikorn

import (
"github.com/kubeflow/spark-operator/pkg/util"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"

"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/pkg/util"
)

func TestGetInitialExecutors(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/yunikorn/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func mergeMaps(m1, m2 map[string]string) map[string]string {
maps.Copy(out, m1)
maps.Copy(out, m2)

// Return nil if there are no keys so the struct field is skipped JSON marshalling
// Return nil if there are no entries in the map so that the field is skipped during JSON marshalling
if len(out) == 0 {
return nil
}
Expand Down

0 comments on commit 24293a4

Please sign in to comment.