Skip to content

Commit

Permalink
updating dask plugin to use container resources with overrides (flyte…
Browse files Browse the repository at this point in the history
  • Loading branch information
hamersaw authored May 24, 2023
1 parent f32f07f commit b346ae2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
48 changes: 30 additions & 18 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,27 +298,39 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.

container.Env = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID())

// retrieve platformResources and overrideResources to use when aggregating container resources
platformResources := parameters.TaskExecMetadata.GetPlatformResources()
if platformResources == nil {
platformResources = &v1.ResourceRequirements{}
}

var overrideResources *v1.ResourceRequirements
if parameters.TaskExecMetadata.GetOverrides() != nil && parameters.TaskExecMetadata.GetOverrides().GetResources() != nil {
res := parameters.TaskExecMetadata.GetOverrides().GetResources()
platformResources := parameters.TaskExecMetadata.GetPlatformResources()
if platformResources == nil {
platformResources = &v1.ResourceRequirements{}
}
overrideResources = parameters.TaskExecMetadata.GetOverrides().GetResources()
}
if overrideResources == nil {
overrideResources = &v1.ResourceRequirements{}
}

logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+
" Resources [%v] with mode [%v]", res, platformResources, container.Resources, mode)

switch mode {
case ResourceCustomizationModeAssignResources:
container.Resources = ApplyResourceOverrides(*res, *platformResources, assignIfUnset)
case ResourceCustomizationModeMergeExistingResources:
MergeResources(*res, &container.Resources)
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, assignIfUnset)
case ResourceCustomizationModeEnsureExistingResourcesInRange:
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset)
}
logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+
" Resources [%v] with mode [%v]", overrideResources, platformResources, container.Resources, mode)

logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources)
switch mode {
case ResourceCustomizationModeAssignResources:
// this will use overrideResources to set container resources and fallback to the platformResource values.
// it is important to note that this ignores the existing container.Resources values.
container.Resources = ApplyResourceOverrides(*overrideResources, *platformResources, assignIfUnset)
case ResourceCustomizationModeMergeExistingResources:
// this merges the overrideResources on top of the existing container.Resources to apply the overrides, then it
// uses the platformResource values to set defaults for any missing resource.
MergeResources(*overrideResources, &container.Resources)
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, assignIfUnset)
case ResourceCustomizationModeEnsureExistingResourcesInRange:
// this use the platformResources defaults to ensure that the container.Resources values are within the
// platformResources limits. it will not override any existing container.Resources values.
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset)
}

logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources)
return nil
}
15 changes: 8 additions & 7 deletions flyteplugins/go/tasks/plugins/k8s/dask/dask.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ func getDefaults(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext,
}
}

defaultResources := executionMetadata.GetPlatformResources()
if executionMetadata.GetOverrides() != nil && executionMetadata.GetOverrides().GetResources() != nil {
defaultResources = executionMetadata.GetOverrides().GetResources()
containerResources, err := flytek8s.ToK8sResourceRequirements(defaultContainerSpec.GetResources())
if err != nil {
return nil, err
}

jobRunnerContainer := v1.Container{
Name: "job-runner",
Image: defaultImage,
Args: defaultContainerSpec.GetArgs(),
Env: defaultEnvVars,
Resources: *defaultResources,
Resources: *containerResources,
}

templateParameters := template.Parameters{
Expand All @@ -79,15 +79,16 @@ func getDefaults(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext,
OutputPath: taskCtx.OutputWriter(),
Task: taskCtx.TaskReader(),
}
err := flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters, flytek8s.ResourceCustomizationModeAssignResources, &jobRunnerContainer)
if err != nil {
if err = flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters,
flytek8s.ResourceCustomizationModeMergeExistingResources, &jobRunnerContainer); err != nil {

return nil, err
}

return &defaults{
Image: defaultImage,
JobRunnerContainer: jobRunnerContainer,
Resources: defaultResources,
Resources: &jobRunnerContainer.Resources,
Env: defaultEnvVars,
Annotations: executionMetadata.GetAnnotations(),
IsInterruptible: executionMetadata.IsInterruptible(),
Expand Down
14 changes: 9 additions & 5 deletions flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var (
v1.ResourceMemory: resource.MustParse("17G"),
},
}
defaultResources = v1.ResourceRequirements{
Requests: testPlatformResources.Requests,
Limits: testPlatformResources.Requests,
}
)

func dummyDaskJob(status daskAPI.JobStatus) *daskAPI.DaskJob {
Expand Down Expand Up @@ -199,7 +203,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
assert.Equal(t, "job-runner", jobSpec.Containers[0].Name)
assert.Equal(t, defaultTestImage, jobSpec.Containers[0].Image)
assert.Equal(t, testArgs, jobSpec.Containers[0].Args)
assert.Equal(t, testPlatformResources, jobSpec.Containers[0].Resources)
assert.Equal(t, defaultResources, jobSpec.Containers[0].Resources)
assert.Equal(t, defaultTolerations, jobSpec.Tolerations)
assert.Equal(t, defaultNodeSelector, jobSpec.NodeSelector)
assert.Equal(t, defaultAffinity, jobSpec.Affinity)
Expand All @@ -226,7 +230,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
}
assert.Equal(t, v1.RestartPolicyNever, schedulerSpec.RestartPolicy)
assert.Equal(t, defaultTestImage, schedulerSpec.Containers[0].Image)
assert.Equal(t, testPlatformResources, schedulerSpec.Containers[0].Resources)
assert.Equal(t, defaultResources, schedulerSpec.Containers[0].Resources)
assert.Equal(t, []string{"dask-scheduler"}, schedulerSpec.Containers[0].Args)
assert.Equal(t, expectedPorts, schedulerSpec.Containers[0].Ports)
assert.Equal(t, testEnvVars, schedulerSpec.Containers[0].Env)
Expand Down Expand Up @@ -263,7 +267,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
assert.Equal(t, "dask-worker", workerSpec.Containers[0].Name)
assert.Equal(t, v1.PullIfNotPresent, workerSpec.Containers[0].ImagePullPolicy)
assert.Equal(t, defaultTestImage, workerSpec.Containers[0].Image)
assert.Equal(t, testPlatformResources, workerSpec.Containers[0].Resources)
assert.Equal(t, defaultResources, workerSpec.Containers[0].Resources)
assert.Equal(t, testEnvVars, workerSpec.Containers[0].Env)
assert.Equal(t, defaultTolerations, workerSpec.Tolerations)
assert.Equal(t, defaultNodeSelector, workerSpec.NodeSelector)
Expand All @@ -273,9 +277,9 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
"--name",
"$(DASK_WORKER_NAME)",
"--nthreads",
"5",
"4",
"--memory-limit",
"17G",
"1Gi",
}, workerSpec.Containers[0].Args)
}

Expand Down

0 comments on commit b346ae2

Please sign in to comment.