Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Adding support for per-task PodTemplate configuration #308

Merged
merged 27 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1ea39af
implemented
hamersaw Jan 11, 2023
0231bf8
unit tests working
hamersaw Jan 11, 2023
d1047f7
updated flyteidl
hamersaw Jan 11, 2023
ff69656
Merge branch 'master' into feature/task-pod-template
hamersaw Jan 11, 2023
ec010e7
fixed lint issues
hamersaw Jan 11, 2023
602feac
updated documentation
hamersaw Jan 13, 2023
2f486a8
removing unnecessarily commented lines
hamersaw Jan 13, 2023
5a36297
updated unit tests
hamersaw Jan 13, 2023
57351fb
fixed lint issues
hamersaw Jan 13, 2023
9dbd1fc
updated docs
hamersaw Jan 13, 2023
1b65e71
if user provides PodTemplate name and it doesn't exist we should fail
hamersaw Jan 14, 2023
6c25a83
updated for new flyteidl definition
hamersaw Jan 20, 2023
f19716f
updated flyteidl
hamersaw Jan 20, 2023
c8528b9
working with either container or pod passed in task template
hamersaw Jan 24, 2023
2cc6563
cleaned up definitions
hamersaw Jan 26, 2023
be7c455
fixing up unit tests
hamersaw Jan 26, 2023
2d88777
fixed unit tests
hamersaw Jan 26, 2023
c410d97
fixed lint issues
hamersaw Jan 26, 2023
100f2d0
removed dead code
hamersaw Jan 26, 2023
94f5236
removing more dead code
hamersaw Jan 26, 2023
63118d6
remove more dead code
hamersaw Jan 26, 2023
0718751
correctly applying resources for container and pod tasks
hamersaw Jan 30, 2023
c33fed2
updated flyteidl version
hamersaw Jan 30, 2023
70040ae
moved sidecar parsing to pod plugin
hamersaw Feb 2, 2023
a04d903
Merge branch 'master' into feature/task-pod-template
hamersaw Feb 2, 2023
8f6f3a3
fixed unit tests
hamersaw Feb 2, 2023
02854c4
updating semantics to address review comments
hamersaw Feb 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.3.2
github.com/flyteorg/flyteidl v1.3.5
github.com/flyteorg/flytestdlib v1.0.11
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v1.3.2 h1:s4DC8go2ou5LtZ+CFcS31r0mhv3baelNV81C1KZS26U=
github.com/flyteorg/flyteidl v1.3.2/go.mod h1:OJAq333OpInPnMhvVz93AlEjmlQ+t0FAD4aakIYE4OU=
github.com/flyteorg/flyteidl v1.3.5 h1:rSaWMndeENr0QxRKj02kp6N/qQdbgDwpFeZsZbvU45A=
github.com/flyteorg/flyteidl v1.3.5/go.mod h1:OJAq333OpInPnMhvVz93AlEjmlQ+t0FAD4aakIYE4OU=
github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
github.com/flyteorg/flytestdlib v1.0.11 h1:f7B8x2/zMuimEVi4Jx0zqzvNhdi7aq7+ZWoqHsbp4F4=
github.com/flyteorg/flytestdlib v1.0.11/go.mod h1:nIBmBHtjTJvhZEn3e/EwVC/iMkR2tUX8hEiXjRBpH/s=
Expand Down
68 changes: 50 additions & 18 deletions go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package flytek8s
import (
"context"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
pluginscore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template"
"k8s.io/apimachinery/pkg/util/validation"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"k8s.io/apimachinery/pkg/util/validation"
)

const resourceGPU = "gpu"
Expand Down Expand Up @@ -193,22 +194,16 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements
return resources
}

// ToK8sContainer transforms a task template target of type core.Container into a bare-bones kubernetes container, which
// can be further modified with flyte-specific customizations specified by various static and run-time attributes.
func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *core.TypedInterface, parameters template.Parameters) (*v1.Container, error) {
// Perform preliminary validations
if parameters.TaskExecMetadata.GetOverrides() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "platform/compiler error, overrides not set for task")
}
if parameters.TaskExecMetadata.GetOverrides() == nil || parameters.TaskExecMetadata.GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}
// BuildRawContainer constructs a Container based on the definition passed by the taskContainer and
// TaskExecutionMetadata.
func BuildRawContainer(ctx context.Context, taskContainer *core.Container, taskExecMetadata pluginscore.TaskExecutionMetadata) (*v1.Container, error) {
// Make the container name the same as the pod name, unless it violates K8s naming conventions
// Container names are subject to the DNS-1123 standard
containerName := parameters.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
containerName := taskExecMetadata.GetTaskExecutionID().GetGeneratedName()
if errs := validation.IsDNS1123Label(containerName); len(errs) > 0 {
containerName = rand.String(4)
}

container := &v1.Container{
Name: containerName,
Image: taskContainer.GetImage(),
Expand All @@ -217,12 +212,49 @@ func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *c
Env: ToK8sEnvVar(taskContainer.GetEnv()),
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
}
if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, container, iFace, taskContainer.DataConfig); err != nil {

return container, nil
}

// ToK8sContainer builds a Container based on the definition passed by the TaskExecutionContext. This involves applying
// all Flyte configuration including k8s plugins and resource requests.
func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error) {
taskTemplate, err := tCtx.TaskReader().Read(ctx)
if err != nil {
logger.Warnf(ctx, "failed to read task information when trying to construct container, err: %s", err.Error())
return nil, err
}

// validate arguments
if taskTemplate.GetContainer() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to create container with no definition in TaskTemplate")
}
if tCtx.TaskExecutionMetadata().GetOverrides() == nil || tCtx.TaskExecutionMetadata().GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}

// build raw container
container, err := BuildRawContainer(ctx, taskTemplate.GetContainer(), tCtx.TaskExecutionMetadata())
if err != nil {
return nil, err
}

if container.SecurityContext == nil && config.GetK8sPluginConfig().DefaultSecurityContext != nil {
container.SecurityContext = config.GetK8sPluginConfig().DefaultSecurityContext.DeepCopy()
}

// add flyte resource customizations to the container
templateParameters := template.Parameters{
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
Inputs: tCtx.InputReader(),
OutputPath: tCtx.OutputWriter(),
Task: tCtx.TaskReader(),
}

if err := AddFlyteCustomizationsToContainer(ctx, templateParameters, ResourceCustomizationModeAssignResources, container); err != nil {
return nil, err
}

return container, nil
}

Expand Down
63 changes: 43 additions & 20 deletions go/tasks/pluginmachinery/flytek8s/container_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,26 +335,45 @@ func TestMergeResources_PartialResourceKeys(t *testing.T) {
}

func TestToK8sContainer(t *testing.T) {
taskContainer := &core.Container{
Image: "myimage",
Args: []string{
"arg1",
"arg2",
"arg3",
},
Command: []string{
"com1",
"com2",
"com3",
},
Env: []*core.KeyValuePair{
{
Key: "k",
Value: "v",
taskTemplate := &core.TaskTemplate{
Type: "test",
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "myimage",
Args: []string{
"arg1",
"arg2",
"arg3",
},
Command: []string{
"com1",
"com2",
"com3",
},
Env: []*core.KeyValuePair{
{
Key: "k",
Value: "v",
},
},
},
},
}

taskReader := &mocks.TaskReader{}
taskReader.On("Read", mock.Anything).Return(taskTemplate, nil)

inputReader := &mocks2.InputReader{}
inputReader.OnGetInputPath().Return(storage.DataReference("test-data-reference"))
inputReader.OnGetInputPrefixPath().Return(storage.DataReference("test-data-reference-prefix"))
inputReader.OnGetMatch(mock.Anything).Return(&core.LiteralMap{}, nil)

outputWriter := &mocks2.OutputWriter{}
outputWriter.OnGetOutputPrefixPath().Return("")
outputWriter.OnGetRawOutputPrefix().Return("")
outputWriter.OnGetCheckpointPrefix().Return("/checkpoint")
outputWriter.OnGetPreviousCheckpointsPrefix().Return("/prev")

mockTaskExecMetadata := mocks.TaskExecutionMetadata{}
mockTaskOverrides := mocks.TaskOverrides{}
mockTaskOverrides.OnGetResources().Return(&v1.ResourceRequirements{
Expand All @@ -364,12 +383,16 @@ func TestToK8sContainer(t *testing.T) {
})
mockTaskExecMetadata.OnGetOverrides().Return(&mockTaskOverrides)
mockTaskExecutionID := mocks.TaskExecutionID{}
mockTaskExecutionID.OnGetID().Return(core.TaskExecutionIdentifier{})
mockTaskExecutionID.OnGetGeneratedName().Return("gen_name")
mockTaskExecMetadata.OnGetTaskExecutionID().Return(&mockTaskExecutionID)
mockTaskExecMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{})

templateParameters := template.Parameters{
TaskExecMetadata: &mockTaskExecMetadata,
}
tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata)
tCtx.OnInputReader().Return(inputReader)
tCtx.OnTaskReader().Return(taskReader)
tCtx.OnOutputWriter().Return(outputWriter)

cfg := config.GetK8sPluginConfig()
allow := false
Expand All @@ -378,7 +401,7 @@ func TestToK8sContainer(t *testing.T) {
}
assert.NoError(t, config.SetK8sPluginConfig(cfg))

container, err := ToK8sContainer(context.TODO(), taskContainer, nil, templateParameters)
container, err := ToK8sContainer(context.TODO(), tCtx)
assert.NoError(t, err)
assert.Equal(t, container.Image, "myimage")
assert.EqualValues(t, []string{
Expand Down
Loading