diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 803d8a77f..e12139d69 100755 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -1,3 +1,2 @@ -This project is governed by [Lyft's code of -conduct](https://github.com/lyft/code-of-conduct). All contributors -and participants agree to abide by its terms. +This project is governed by LF AI Foundation's [code of conduct](https://lfprojects.org/policies/code-of-conduct/). +All contributors and participants agree to abide by its terms. diff --git a/boilerplate/flyte/code_of_conduct/CODE_OF_CONDUCT.md b/boilerplate/flyte/code_of_conduct/CODE_OF_CONDUCT.md new file mode 100644 index 000000000..e12139d69 --- /dev/null +++ b/boilerplate/flyte/code_of_conduct/CODE_OF_CONDUCT.md @@ -0,0 +1,2 @@ +This project is governed by LF AI Foundation's [code of conduct](https://lfprojects.org/policies/code-of-conduct/). +All contributors and participants agree to abide by its terms. diff --git a/boilerplate/flyte/code_of_conduct/README.rst b/boilerplate/flyte/code_of_conduct/README.rst new file mode 100644 index 000000000..0c9f2f1ec --- /dev/null +++ b/boilerplate/flyte/code_of_conduct/README.rst @@ -0,0 +1,2 @@ +CODE OF CONDUCT +~~~~~~~~~~~~~~~ diff --git a/boilerplate/flyte/code_of_conduct/update.sh b/boilerplate/flyte/code_of_conduct/update.sh new file mode 100755 index 000000000..42f615846 --- /dev/null +++ b/boilerplate/flyte/code_of_conduct/update.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES. +# ONLY EDIT THIS FILE FROM WITHIN THE 'FLYTEORG/BOILERPLATE' REPOSITORY: +# +# TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst + +set -e + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" + +cp ${DIR}/CODE_OF_CONDUCT.md ${DIR}/../../../CODE_OF_CONDUCT.md diff --git a/boilerplate/flyte/golang_test_targets/goimports b/boilerplate/flyte/golang_test_targets/goimports index ba0d6d871..af1829036 100755 --- a/boilerplate/flyte/golang_test_targets/goimports +++ b/boilerplate/flyte/golang_test_targets/goimports @@ -5,4 +5,4 @@ # # TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst -goimports -w $(find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./pkg/client/*") +goimports -w $(find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./pkg/client/*" -not -path "./boilerplate/*") diff --git a/boilerplate/update.cfg b/boilerplate/update.cfg index 6eb17e23c..98e151d74 100755 --- a/boilerplate/update.cfg +++ b/boilerplate/update.cfg @@ -3,3 +3,4 @@ flyte/golang_support_tools flyte/pull_request_template flyte/docker_build flyte/welcome_bot +flyte/code_of_conduct diff --git a/go.mod b/go.mod index a39cac3fa..27d940939 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,8 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.0.0 github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/coocood/freecache v1.1.1 - github.com/flyteorg/flyteidl v0.0.0-00010101000000-000000000000 - github.com/flyteorg/flytestdlib v0.3.22 + github.com/flyteorg/flyteidl v0.19.2 + github.com/flyteorg/flytestdlib v0.3.33 github.com/go-logr/zapr v0.4.0 // indirect github.com/go-test/deep v1.0.7 github.com/golang/protobuf v1.4.3 diff --git a/go.sum b/go.sum index b3907feed..aae1bca75 100644 --- a/go.sum +++ b/go.sum @@ -303,8 +303,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= -github.com/flyteorg/flytestdlib v0.3.22 h1:nJEPaCdxzXBaeg2p4fdo3I3Ua09NedFRaUwuLafLEdw= -github.com/flyteorg/flytestdlib v0.3.22/go.mod h1:1XG0DwYTUm34Yrffm1Qy9Tdr/pWQypEqTq5dUxw3/cM= +github.com/flyteorg/flytestdlib v0.3.33 h1:+oCx3zXUIldL7CWmNMD7PMFPXvGqaPgYkSKn9wB6qvY= +github.com/flyteorg/flytestdlib v0.3.33/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= diff --git a/go/tasks/pluginmachinery/flytek8s/config/config.go b/go/tasks/pluginmachinery/flytek8s/config/config.go index 9d6bd77ec..264e178f8 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -27,7 +27,7 @@ var ( }, CoPilot: FlyteCoPilotConfig{ NamePrefix: "flyte-copilot-", - Image: "docker.pkg.github.com/flyteorg/flyteplugins/operator:v0.4.0", + Image: "cr.flyte.org/flyteorg/flytecopilot:v0.0.9", DefaultInputDataPath: "/var/flyte/inputs", InputVolumeName: "flyte-inputs", DefaultOutputPath: "/var/flyte/outputs", @@ -40,6 +40,9 @@ var ( }, DefaultCPURequest: defaultCPURequest, DefaultMemoryRequest: defaultMemoryRequest, + CreateContainerErrorGracePeriod: config2.Duration{ + Duration: time.Minute * 3, + }, } // K8sPluginConfigSection provides a singular top level config section for all plugins. @@ -89,7 +92,12 @@ type K8sPluginConfig struct { InterruptibleTolerations []v1.Toleration `json:"interruptible-tolerations" pflag:"-,Tolerations to be applied for interruptible pods"` // Node Selector Labels for interruptible pods: Similar to InterruptibleTolerations, these node selector labels are added for pods that can tolerate // eviction. + // Deprecated: Please use InterruptibleNodeSelectorRequirement/NonInterruptibleNodeSelectorRequirement InterruptibleNodeSelector map[string]string `json:"interruptible-node-selector" pflag:"-,Defines a set of node selector labels to add to the interruptible pods."` + // Node Selector Requirements to be added to interruptible and non-interruptible + // pods respectively + InterruptibleNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"interruptible-node-selector-requirement" pflag:"-,Node selector requirement to add to interruptible pods"` + NonInterruptibleNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"non-interruptible-node-selector-requirement" pflag:"-,Node selector requirement to add to non-interruptible pods"` // ---------------------------------------------------------------------- // Specific tolerations that are added for certain resources. Useful for maintaining gpu resources separate in the cluster @@ -105,6 +113,11 @@ type K8sPluginConfig struct { // are kept around (potentially consuming cluster resources). This, however, will cause k8s log links to expire as // soon as the resource is finalized. DeleteResourceOnFinalize bool `json:"delete-resource-on-finalize" pflag:",Instructs the system to delete the resource on finalize. This ensures that no resources are kept around (potentially consuming cluster resources). This, however, will cause k8s log links to expire as soon as the resource is finalized."` + + // Time to wait for transient CreateContainerError errors to be resolved. If the + // error persists past this grace period, it will be inferred to be a permanent + // one, and the corresponding task marked as failed + CreateContainerErrorGracePeriod config2.Duration `json:"create-container-error-grace-period" pflag:"-,Time to wait for transient CreateContainerError errors to be resolved."` } type FlyteCoPilotConfig struct { diff --git a/go/tasks/pluginmachinery/flytek8s/container_helper.go b/go/tasks/pluginmachinery/flytek8s/container_helper.go index 45d549800..a22b4bf8f 100755 --- a/go/tasks/pluginmachinery/flytek8s/container_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/container_helper.go @@ -22,6 +22,23 @@ const resourceGPU = "gpu" // Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go const ResourceNvidiaGPU = "nvidia.com/gpu" +func MergeResources(in v1.ResourceRequirements, out *v1.ResourceRequirements) { + if out.Limits == nil { + out.Limits = in.Limits + } else if in.Limits != nil { + for key, val := range in.Limits { + out.Limits[key] = val + } + } + if out.Requests == nil { + out.Requests = in.Requests + } else if in.Requests != nil { + for key, val := range in.Requests { + out.Requests[key] = val + } + } +} + func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements { // set memory and cpu to default if not provided by user. if len(resources.Requests) == 0 { @@ -43,7 +60,7 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen if _, found := resources.Requests[v1.ResourceMemory]; !found { // use memory limit if set else default to config - if _, limitSet := resources.Limits[v1.ResourceCPU]; limitSet { + if _, limitSet := resources.Limits[v1.ResourceMemory]; limitSet { resources.Requests[v1.ResourceMemory] = resources.Limits[v1.ResourceMemory] } else { resources.Requests[v1.ResourceMemory] = resource.MustParse(config.GetK8sPluginConfig().DefaultMemoryRequest) @@ -60,13 +77,20 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen resources.Limits[v1.ResourceMemory] = resources.Requests[v1.ResourceMemory] } + // Ephemeral storage resources aren't required but if one of requests or limits is set and the other isn't, we'll + // just use the same values. + if _, requested := resources.Requests[v1.ResourceEphemeralStorage]; !requested { + if _, limitSet := resources.Limits[v1.ResourceEphemeralStorage]; limitSet { + resources.Requests[v1.ResourceEphemeralStorage] = resources.Limits[v1.ResourceEphemeralStorage] + } + } else if _, limitSet := resources.Limits[v1.ResourceEphemeralStorage]; !limitSet { + resources.Limits[v1.ResourceEphemeralStorage] = resources.Requests[v1.ResourceEphemeralStorage] + } + // TODO: Make configurable. 1/15/2019 Flyte Cluster doesn't support setting storage requests/limits. // https://github.com/kubernetes/enhancements/issues/362 delete(resources.Requests, v1.ResourceStorage) - delete(resources.Requests, v1.ResourceEphemeralStorage) - delete(resources.Limits, v1.ResourceStorage) - delete(resources.Limits, v1.ResourceEphemeralStorage) // Override GPU if res, found := resources.Requests[resourceGPU]; found { @@ -81,53 +105,74 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen return &resources } -// Returns a K8s Container for the execution +// Transforms a task template target of type core.Container into a bare-bones kubernetes container, which can be further +// modified with flyte-specific customizations specified by various static and run-time attributes. func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *core.TypedInterface, parameters template.Parameters) (*v1.Container, error) { - modifiedCommand, err := template.Render(ctx, taskContainer.GetCommand(), parameters) - if err != nil { - return nil, err - } - - modifiedArgs, err := template.Render(ctx, taskContainer.GetArgs(), parameters) - if err != nil { - return nil, err - } - - envVars := DecorateEnvVars(ctx, ToK8sEnvVar(taskContainer.GetEnv()), parameters.TaskExecMetadata.GetTaskExecutionID()) - + // Perform preliminary validations if parameters.TaskExecMetadata.GetOverrides() == nil { return nil, errors.Errorf(errors.BadTaskSpecification, "platform/compiler error, overrides not set for task") } if parameters.TaskExecMetadata.GetOverrides() == nil || parameters.TaskExecMetadata.GetOverrides().GetResources() == nil { return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!") } - - res := parameters.TaskExecMetadata.GetOverrides().GetResources() - if res != nil { - res = ApplyResourceOverrides(ctx, *res) - } - // Make the container name the same as the pod name, unless it violates K8s naming conventions // Container names are subject to the DNS-1123 standard containerName := parameters.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName() if errs := validation.IsDNS1123Label(containerName); len(errs) > 0 { containerName = rand.String(4) } - c := &v1.Container{ + container := &v1.Container{ Name: containerName, Image: taskContainer.GetImage(), - Args: modifiedArgs, - Command: modifiedCommand, - Env: envVars, + Args: taskContainer.GetArgs(), + Command: taskContainer.GetCommand(), + Env: ToK8sEnvVar(taskContainer.GetEnv()), TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError, } + if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, container, iFace, taskContainer.DataConfig); err != nil { + return nil, err + } + return container, nil +} + +type ResourceCustomizationMode int + +const ( + AssignResources ResourceCustomizationMode = iota + MergeExistingResources + LeaveResourcesUnmodified +) - if res != nil { - c.Resources = *res +// Takes a container definition which specifies how to run a Flyte task and fills in templated command and argument +// values, updates resources and decorates environment variables with platform and task-specific customizations. +func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.Parameters, + mode ResourceCustomizationMode, container *v1.Container) error { + modifiedCommand, err := template.Render(ctx, container.Command, parameters) + if err != nil { + return err } + container.Command = modifiedCommand - if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, c, iFace, taskContainer.DataConfig); err != nil { - return nil, err + modifiedArgs, err := template.Render(ctx, container.Args, parameters) + if err != nil { + return err + } + container.Args = modifiedArgs + + container.Env = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetTaskExecutionID()) + + if parameters.TaskExecMetadata.GetOverrides() != nil && parameters.TaskExecMetadata.GetOverrides().GetResources() != nil { + res := parameters.TaskExecMetadata.GetOverrides().GetResources() + switch mode { + case AssignResources: + if res = ApplyResourceOverrides(ctx, *res); res != nil { + container.Resources = *res + } + case MergeExistingResources: + MergeResources(*res, &container.Resources) + container.Resources = *ApplyResourceOverrides(ctx, container.Resources) + case LeaveResourcesUnmodified: + } } - return c, nil + return nil } diff --git a/go/tasks/pluginmachinery/flytek8s/container_helper_test.go b/go/tasks/pluginmachinery/flytek8s/container_helper_test.go index 1f1e08419..fc05e144a 100755 --- a/go/tasks/pluginmachinery/flytek8s/container_helper_test.go +++ b/go/tasks/pluginmachinery/flytek8s/container_helper_test.go @@ -4,6 +4,14 @@ import ( "context" "testing" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template" + mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/flyteorg/flytestdlib/storage" + "github.com/stretchr/testify/mock" + "k8s.io/apimachinery/pkg/util/validation" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -29,13 +37,6 @@ func TestApplyResourceOverrides_OverrideCpu(t *testing.T) { }, }) assert.EqualValues(t, cpuRequest, overrides.Requests[v1.ResourceCPU]) - - // request equals limit if not set - overrides = ApplyResourceOverrides(context.Background(), v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceCPU: cpuLimit, - }, - }) assert.EqualValues(t, cpuLimit, overrides.Limits[v1.ResourceCPU]) // request equals limit if not set @@ -45,6 +46,7 @@ func TestApplyResourceOverrides_OverrideCpu(t *testing.T) { }, }) assert.EqualValues(t, cpuLimit, overrides.Requests[v1.ResourceCPU]) + assert.EqualValues(t, cpuLimit, overrides.Limits[v1.ResourceCPU]) } func TestApplyResourceOverrides_OverrideMemory(t *testing.T) { @@ -70,15 +72,44 @@ func TestApplyResourceOverrides_OverrideMemory(t *testing.T) { assert.EqualValues(t, memoryLimit, overrides.Limits[v1.ResourceMemory]) // request equals limit if not set - cpuLimit := resource.MustParse("2") overrides = ApplyResourceOverrides(context.Background(), v1.ResourceRequirements{ Limits: v1.ResourceList{ v1.ResourceMemory: memoryLimit, - v1.ResourceCPU: cpuLimit, }, }) assert.EqualValues(t, memoryLimit, overrides.Requests[v1.ResourceMemory]) - assert.EqualValues(t, cpuLimit, overrides.Requests[v1.ResourceCPU]) + assert.EqualValues(t, memoryLimit, overrides.Limits[v1.ResourceMemory]) +} + +func TestApplyResourceOverrides_OverrideEphemeralStorage(t *testing.T) { + ephemeralStorageRequest := resource.MustParse("1") + overrides := ApplyResourceOverrides(context.Background(), v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceEphemeralStorage: ephemeralStorageRequest, + }, + }) + assert.EqualValues(t, ephemeralStorageRequest, overrides.Requests[v1.ResourceEphemeralStorage]) + assert.EqualValues(t, ephemeralStorageRequest, overrides.Limits[v1.ResourceEphemeralStorage]) + + ephemeralStorageLimit := resource.MustParse("2") + overrides = ApplyResourceOverrides(context.Background(), v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceEphemeralStorage: ephemeralStorageRequest, + }, + Limits: v1.ResourceList{ + v1.ResourceEphemeralStorage: ephemeralStorageLimit, + }, + }) + assert.EqualValues(t, ephemeralStorageRequest, overrides.Requests[v1.ResourceEphemeralStorage]) + assert.EqualValues(t, ephemeralStorageLimit, overrides.Limits[v1.ResourceEphemeralStorage]) + + // request equals limit if not set + overrides = ApplyResourceOverrides(context.Background(), v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceEphemeralStorage: ephemeralStorageLimit, + }, + }) + assert.EqualValues(t, ephemeralStorageLimit, overrides.Requests[v1.ResourceEphemeralStorage]) } func TestApplyResourceOverrides_RemoveStorage(t *testing.T) { @@ -97,13 +128,15 @@ func TestApplyResourceOverrides_RemoveStorage(t *testing.T) { }, }) assert.EqualValues(t, v1.ResourceList{ - v1.ResourceMemory: requestedResourceQuantity, - v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, }, overrides.Requests) assert.EqualValues(t, v1.ResourceList{ - v1.ResourceMemory: requestedResourceQuantity, - v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, }, overrides.Limits) } @@ -123,3 +156,212 @@ func TestApplyResourceOverrides_OverrideGpu(t *testing.T) { }) assert.EqualValues(t, gpuRequest, overrides.Limits[ResourceNvidiaGPU]) } + +func TestMergeResources_EmptyIn(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + expectedResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + Limits: v1.ResourceList{ + v1.ResourceStorage: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + } + outResources := expectedResources.DeepCopy() + MergeResources(v1.ResourceRequirements{}, outResources) + assert.EqualValues(t, *outResources, expectedResources) +} + +func TestMergeResources_EmptyOut(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + expectedResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + Limits: v1.ResourceList{ + v1.ResourceStorage: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + } + outResources := v1.ResourceRequirements{} + MergeResources(expectedResources, &outResources) + assert.EqualValues(t, outResources, expectedResources) +} + +func TestMergeResources_PartialRequirements(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + resourceList := v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + } + inResources := v1.ResourceRequirements{Requests: resourceList} + outResources := v1.ResourceRequirements{Limits: resourceList} + MergeResources(inResources, &outResources) + assert.EqualValues(t, outResources, v1.ResourceRequirements{ + Requests: resourceList, + Limits: resourceList, + }) +} + +func TestMergeResources_PartialResourceKeys(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + resourceList1 := v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + } + resourceList2 := v1.ResourceList{v1.ResourceCPU: requestedResourceQuantity} + expectedResourceList := v1.ResourceList{ + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + } + inResources := v1.ResourceRequirements{ + Requests: resourceList1, + Limits: resourceList2, + } + outResources := v1.ResourceRequirements{ + Requests: resourceList2, + Limits: resourceList1, + } + MergeResources(inResources, &outResources) + assert.EqualValues(t, outResources, v1.ResourceRequirements{ + Requests: expectedResourceList, + Limits: expectedResourceList, + }) +} + +func TestToK8sContainer(t *testing.T) { + taskContainer := &core.Container{ + Image: "myimage", + Args: []string{ + "arg1", + "arg2", + "arg3", + }, + Command: []string{ + "com1", + "com2", + "com3", + }, + Env: []*core.KeyValuePair{ + { + Key: "k", + Value: "v", + }, + }, + } + + mockTaskExecMetadata := mocks.TaskExecutionMetadata{} + mockTaskOverrides := mocks.TaskOverrides{} + mockTaskOverrides.OnGetResources().Return(&v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceEphemeralStorage: resource.MustParse("1024Mi"), + }, + }) + mockTaskExecMetadata.OnGetOverrides().Return(&mockTaskOverrides) + mockTaskExecutionID := mocks.TaskExecutionID{} + mockTaskExecutionID.OnGetGeneratedName().Return("gen_name") + mockTaskExecMetadata.OnGetTaskExecutionID().Return(&mockTaskExecutionID) + + templateParameters := template.Parameters{ + TaskExecMetadata: &mockTaskExecMetadata, + } + + container, err := ToK8sContainer(context.TODO(), taskContainer, nil, templateParameters) + assert.NoError(t, err) + assert.Equal(t, container.Image, "myimage") + assert.EqualValues(t, []string{ + "arg1", + "arg2", + "arg3", + }, container.Args) + assert.EqualValues(t, []string{ + "com1", + "com2", + "com3", + }, container.Command) + assert.EqualValues(t, []v1.EnvVar{ + { + Name: "k", + Value: "v", + }, + }, container.Env) + errs := validation.IsDNS1123Label(container.Name) + assert.Nil(t, errs) +} + +func TestAddFlyteCustomizationsToContainer(t *testing.T) { + mockTaskExecMetadata := mocks.TaskExecutionMetadata{} + mockTaskExecutionID := mocks.TaskExecutionID{} + mockTaskExecutionID.OnGetGeneratedName().Return("gen_name") + mockTaskExecutionID.OnGetID().Return(core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "p1", + Domain: "d1", + Name: "task_name", + Version: "v1", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + NodeId: "node_id", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "p2", + Domain: "d2", + Name: "n2", + }, + }, + RetryAttempt: 1, + }) + mockTaskExecMetadata.OnGetTaskExecutionID().Return(&mockTaskExecutionID) + + mockOverrides := mocks.TaskOverrides{} + mockOverrides.OnGetResources().Return(&v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceEphemeralStorage: resource.MustParse("1024Mi"), + }, + Limits: v1.ResourceList{ + v1.ResourceEphemeralStorage: resource.MustParse("2048Mi"), + }, + }) + mockTaskExecMetadata.OnGetOverrides().Return(&mockOverrides) + + mockInputReader := mocks2.InputReader{} + mockInputPath := storage.DataReference("s3://input/path") + mockInputReader.OnGetInputPath().Return(mockInputPath) + mockInputReader.OnGetInputPrefixPath().Return(mockInputPath) + mockInputReader.On("Get", mock.Anything).Return(nil, nil) + + mockOutputPath := mocks2.OutputFilePaths{} + mockOutputPathPrefix := storage.DataReference("s3://output/path") + mockOutputPath.OnGetRawOutputPrefix().Return(mockOutputPathPrefix) + mockOutputPath.OnGetOutputPrefixPath().Return(mockOutputPathPrefix) + + templateParameters := template.Parameters{ + TaskExecMetadata: &mockTaskExecMetadata, + Inputs: &mockInputReader, + OutputPath: &mockOutputPath, + } + container := &v1.Container{ + Command: []string{ + "{{ .Input }}", + }, + Args: []string{ + "{{ .OutputPrefix }}", + }, + } + err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, AssignResources, container) + assert.NoError(t, err) + assert.EqualValues(t, container.Args, []string{"s3://output/path"}) + assert.EqualValues(t, container.Command, []string{"s3://input/path"}) + assert.Len(t, container.Resources.Limits, 3) + assert.Len(t, container.Resources.Requests, 3) + assert.Len(t, container.Env, 12) +} diff --git a/go/tasks/pluginmachinery/flytek8s/copilot.go b/go/tasks/pluginmachinery/flytek8s/copilot.go index 5a0b31174..f4909867d 100644 --- a/go/tasks/pluginmachinery/flytek8s/copilot.go +++ b/go/tasks/pluginmachinery/flytek8s/copilot.go @@ -39,11 +39,10 @@ func FlyteCoPilotContainer(name string, cfg config.FlyteCoPilotConfig, args []st if err != nil { return v1.Container{}, err } - return v1.Container{ Name: cfg.NamePrefix + name, Image: cfg.Image, - Command: []string{"/bin/flyte-copilot", "--config", "/etc/flyte/config**/*"}, + Command: CopilotCommandArgs(storage.GetConfig()), Args: args, WorkingDir: "/", Resources: v1.ResourceRequirements{ @@ -62,6 +61,36 @@ func FlyteCoPilotContainer(name string, cfg config.FlyteCoPilotConfig, args []st }, nil } +func CopilotCommandArgs(storageConfig *storage.Config) []string { + var commands = []string{ + "/bin/flyte-copilot", + "--storage.limits.maxDownloadMBs=0", + } + if len(storageConfig.Stow.Config) > 0 && len(storageConfig.Stow.Kind) > 0 { + var cfg string + for key, val := range storageConfig.Stow.Config { + cfg += fmt.Sprintf("%s=%s,", key, val) + } + commands = append(commands, []string{ + fmt.Sprintf("--storage.stow.config %s", cfg), + fmt.Sprintf("--storage.stow.kind=%s", storageConfig.Stow.Kind), + }...) + } + if storageConfig.MultiContainerEnabled { + commands = append(commands, "--storage.enable-multicontainer") + } + return append(commands, []string{ + fmt.Sprintf("--storage.type=%s", storageConfig.Type), + fmt.Sprintf("--storage.enable-multicontainer=%v", storageConfig.MultiContainerEnabled), + fmt.Sprintf("--storage.container=%s", storageConfig.InitContainer), + fmt.Sprintf("--storage.connection.secret-key=%s", storageConfig.Connection.SecretKey), + fmt.Sprintf("--storage.connection.access-key=%s", storageConfig.Connection.AccessKey), + fmt.Sprintf("--storage.connection.auth-type=%s", storageConfig.Connection.AuthType), + fmt.Sprintf("--storage.connection.region=%s", storageConfig.Connection.Region), + fmt.Sprintf("--storage.connection.endpoint=%s", storageConfig.Connection.Endpoint.String()), + }...) +} + func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference, startTimeout time.Duration, iface *core.TypedInterface) ([]string, error) { if iface == nil { return nil, fmt.Errorf("interface is required for CoPilot Sidecar") diff --git a/go/tasks/pluginmachinery/flytek8s/copilot_test.go b/go/tasks/pluginmachinery/flytek8s/copilot_test.go index e208f56bd..3c6803cac 100644 --- a/go/tasks/pluginmachinery/flytek8s/copilot_test.go +++ b/go/tasks/pluginmachinery/flytek8s/copilot_test.go @@ -49,7 +49,24 @@ func TestFlyteCoPilotContainer(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "test-x", c.Name) assert.Equal(t, "test", c.Image) - assert.Equal(t, []string{"/bin/flyte-copilot", "--config", "/etc/flyte/config**/*"}, c.Command) + assert.Equal(t, CopilotCommandArgs(storage.GetConfig()), c.Command) + assert.Equal(t, []string{"hello"}, c.Args) + assert.Equal(t, 0, len(c.VolumeMounts)) + assert.Equal(t, "/", c.WorkingDir) + assert.Equal(t, 2, len(c.Resources.Limits)) + assert.Equal(t, 2, len(c.Resources.Requests)) + }) + + t.Run("happy stow backend", func(t *testing.T) { + storage.GetConfig().Stow.Kind = "S3" + storage.GetConfig().Stow.Config = map[string]string{ + "path": "config.yaml", + } + c, err := FlyteCoPilotContainer("x", cfg, []string{"hello"}) + assert.NoError(t, err) + assert.Equal(t, "test-x", c.Name) + assert.Equal(t, "test", c.Image) + assert.Equal(t, CopilotCommandArgs(storage.GetConfig()), c.Command) assert.Equal(t, []string{"hello"}, c.Args) assert.Equal(t, 0, len(c.VolumeMounts)) assert.Equal(t, "/", c.WorkingDir) diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 2585e9177..bf680f7f4 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -23,6 +23,41 @@ const OOMKilled = "OOMKilled" const Interrupted = "Interrupted" const SIGKILL = 137 +func ApplyInterruptibleNodeAffinity(interruptible bool, podSpec *v1.PodSpec) { + // Determine node selector terms to add to node affinity + var nodeSelectorRequirement v1.NodeSelectorRequirement + if interruptible { + if config.GetK8sPluginConfig().InterruptibleNodeSelectorRequirement == nil { + return + } + nodeSelectorRequirement = *config.GetK8sPluginConfig().InterruptibleNodeSelectorRequirement + } else { + if config.GetK8sPluginConfig().NonInterruptibleNodeSelectorRequirement == nil { + return + } + nodeSelectorRequirement = *config.GetK8sPluginConfig().NonInterruptibleNodeSelectorRequirement + } + + if podSpec.Affinity == nil { + podSpec.Affinity = &v1.Affinity{} + } + if podSpec.Affinity.NodeAffinity == nil { + podSpec.Affinity.NodeAffinity = &v1.NodeAffinity{} + } + if podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} + } + if len(podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 { + nodeSelectorTerms := podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + for i := range nodeSelectorTerms { + nst := &nodeSelectorTerms[i] + nst.MatchExpressions = append(nst.MatchExpressions, nodeSelectorRequirement) + } + } else { + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{nodeSelectorRequirement}}} + } +} + // Updates the base pod spec used to execute tasks. This is configured with plugins and task metadata-specific options func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, resourceRequirements []v1.ResourceRequirements, podSpec *v1.PodSpec) { @@ -41,9 +76,10 @@ func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, if taskExecutionMetadata.IsInterruptible() { podSpec.NodeSelector = utils.UnionMaps(podSpec.NodeSelector, config.GetK8sPluginConfig().InterruptibleNodeSelector) } - if podSpec.Affinity == nil { - podSpec.Affinity = config.GetK8sPluginConfig().DefaultAffinity + if podSpec.Affinity == nil && config.GetK8sPluginConfig().DefaultAffinity != nil { + podSpec.Affinity = config.GetK8sPluginConfig().DefaultAffinity.DeepCopy() } + ApplyInterruptibleNodeAffinity(taskExecutionMetadata.IsInterruptible(), podSpec) } func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, error) { @@ -56,12 +92,17 @@ func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (* logger.Errorf(ctx, "Default Pod creation logic works for default container in the task template only.") return nil, fmt.Errorf("container not specified in task template") } - c, err := ToK8sContainer(ctx, task.GetContainer(), task.Interface, template.Parameters{ + templateParameters := template.Parameters{ Task: tCtx.TaskReader(), Inputs: tCtx.InputReader(), OutputPath: tCtx.OutputWriter(), TaskExecMetadata: tCtx.TaskExecutionMetadata(), - }) + } + c, err := ToK8sContainer(ctx, task.GetContainer(), task.Interface, templateParameters) + if err != nil { + return nil, err + } + err = AddFlyteCustomizationsToContainer(ctx, templateParameters, AssignResources, c) if err != nil { return nil, err } @@ -179,8 +220,40 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // PodInitializing -> Init containers are running return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &c.LastTransitionTime.Time}), nil - case "CreateContainerConfigError", "CreateContainerError": - // This happens if for instance the command to the container is incorrect, ie doesn't run + case "CreateContainerError": + // This may consist of: + // 1. Transient errors: e.g. failed to reserve + // container name, container name [...] already in use + // by container + // 2. Permanent errors: e.g. no command specified + // To handle both types of errors gracefully without + // arbitrary pattern matching in the message, we simply + // allow a grace period for kubelet to resolve + // transient issues with the container runtime. If the + // error persists beyond this time, the corresponding + // task is marked as failed. + // NOTE: The current implementation checks for a timeout + // by comparing the condition's LastTransitionTime + // based on the corresponding kubelet's clock with the + // current time based on FlytePropeller's clock. This + // is not ideal given that these 2 clocks are NOT + // synced, and therefore, only provides an + // approximation of the elapsed time since the last + // transition. + t := c.LastTransitionTime.Time + if time.Since(t) >= config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration { + return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), nil + } + return pluginsCore.PhaseInfoInitializing( + t, + pluginsCore.DefaultPhaseVersion, + fmt.Sprintf("[%s]: %s", finalReason, finalMessage), + &pluginsCore.TaskInfo{OccurredAt: &t}, + ), nil + + case "CreateContainerConfigError": t := c.LastTransitionTime.Time return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index b0f41a3ff..483329d19 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "path/filepath" "testing" + "time" config1 "github.com/flyteorg/flytestdlib/config" "github.com/flyteorg/flytestdlib/config/viper" @@ -101,10 +102,95 @@ func TestPodSetup(t *testing.T) { err := configAccessor.UpdateConfig(context.TODO()) assert.NoError(t, err) + t.Run("ApplyInterruptibleNodeAffinity", TestApplyInterruptibleNodeAffinity) t.Run("UpdatePod", updatePod) t.Run("ToK8sPodInterruptible", toK8sPodInterruptible) } +func TestApplyInterruptibleNodeAffinity(t *testing.T) { + t.Run("WithInterruptibleNodeSelectorRequirement", func(t *testing.T) { + podSpec := v1.PodSpec{} + ApplyInterruptibleNodeAffinity(true, &podSpec) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "x/interruptible", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + }) + + t.Run("WithNonInterruptibleNodeSelectorRequirement", func(t *testing.T) { + podSpec := v1.PodSpec{} + ApplyInterruptibleNodeAffinity(false, &podSpec) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "x/interruptible", + Operator: v1.NodeSelectorOpDoesNotExist, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + }) + + t.Run("WithExistingAffinityWithInterruptibleNodeSelectorRequirement", func(t *testing.T) { + podSpec := v1.PodSpec{ + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "node selector requirement", + Operator: v1.NodeSelectorOpIn, + Values: []string{"exists"}, + }, + }, + }, + }, + }, + }, + }, + } + ApplyInterruptibleNodeAffinity(true, &podSpec) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "node selector requirement", + Operator: v1.NodeSelectorOpIn, + Values: []string{"exists"}, + }, + v1.NodeSelectorRequirement{ + Key: "x/interruptible", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + }) +} + func updatePod(t *testing.T) { taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{ Limits: v1.ResourceList{ @@ -150,6 +236,73 @@ func updatePod(t *testing.T) { "x/interruptible": "true", "user": "also configured", }, pod.Spec.NodeSelector) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "x/interruptible", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) +} + +func TestUpdatePodWithDefaultAffinityAndInterruptibleNodeSelectorRequirement(t *testing.T) { + taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{}) + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + DefaultAffinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "default node affinity", + Operator: v1.NodeSelectorOpIn, + Values: []string{"exists"}, + }, + }, + }, + }, + }, + }, + }, + InterruptibleNodeSelectorRequirement: &v1.NodeSelectorRequirement{ + Key: "x/interruptible", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + })) + for i := 0; i < 3; i++ { + podSpec := v1.PodSpec{} + UpdatePod(taskExecutionMetadata, []v1.ResourceRequirements{}, &podSpec) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "default node affinity", + Operator: v1.NodeSelectorOpIn, + Values: []string{"exists"}, + }, + v1.NodeSelectorRequirement{ + Key: "x/interruptible", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + } } func toK8sPodInterruptible(t *testing.T) { @@ -174,6 +327,21 @@ func toK8sPodInterruptible(t *testing.T) { assert.Equal(t, "interruptible", p.Tolerations[1].Value) assert.Equal(t, 1, len(p.NodeSelector)) assert.Equal(t, "true", p.NodeSelector["x/interruptible"]) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "x/interruptible", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + p.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) } func TestToK8sPod(t *testing.T) { @@ -197,8 +365,10 @@ func TestToK8sPod(t *testing.T) { ResourceTolerations: map[v1.ResourceName][]v1.Toleration{ v1.ResourceStorage: {tolStorage}, ResourceNvidiaGPU: {tolGPU}, - }}), - ) + }, + DefaultCPURequest: "1024m", + DefaultMemoryRequest: "1024Mi", + })) op := &pluginsIOMock.OutputFilePaths{} op.On("GetOutputPrefixPath").Return(storage.DataReference("")) @@ -262,7 +432,9 @@ func TestToK8sPod(t *testing.T) { DefaultNodeSelector: map[string]string{ "nodeId": "123", }, - SchedulerName: "myScheduler", + SchedulerName: "myScheduler", + DefaultCPURequest: "1024m", + DefaultMemoryRequest: "1024Mi", })) p, err := ToK8sPodSpec(ctx, x) @@ -275,6 +447,11 @@ func TestToK8sPod(t *testing.T) { } func TestDemystifyPending(t *testing.T) { + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + CreateContainerErrorGracePeriod: config1.Duration{ + Duration: time.Minute * 3, + }, + })) t.Run("PodNotScheduled", func(t *testing.T) { s := v1.PodStatus{ @@ -490,19 +667,40 @@ func TestDemystifyPending(t *testing.T) { assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) }) - t.Run("CreateContainerError", func(t *testing.T) { - s.ContainerStatuses = []v1.ContainerStatus{ + t.Run("CreateContainerErrorWithinGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime = metaV1.Now() + s2.ContainerStatuses = []v1.ContainerStatus{ { Ready: false, State: v1.ContainerState{ Waiting: &v1.ContainerStateWaiting{ Reason: "CreateContainerError", - Message: "this an error", + Message: "this is a transient error", }, }, }, } - taskStatus, err := DemystifyPending(s) + taskStatus, err := DemystifyPending(s2) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseInitializing, taskStatus.Phase()) + }) + + t.Run("CreateContainerErrorOutsideGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime.Time = metaV1.Now().Add(-config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration) + s2.ContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "CreateContainerError", + Message: "this a permanent error", + }, + }, + }, + } + taskStatus, err := DemystifyPending(s2) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) }) diff --git a/go/tasks/pluginmachinery/flytek8s/testdata/config.yaml b/go/tasks/pluginmachinery/flytek8s/testdata/config.yaml index 8037d898a..2401f1e00 100644 --- a/go/tasks/pluginmachinery/flytek8s/testdata/config.yaml +++ b/go/tasks/pluginmachinery/flytek8s/testdata/config.yaml @@ -6,6 +6,8 @@ plugins: # All k8s plugins default configuration k8s: scheduler-name: flyte-scheduler + default-cpus: 1024m + default-memory: 1024Mi default-annotations: - annotationKey1: annotationValue1 - annotationKey2: annotationValue2 @@ -30,6 +32,14 @@ plugins: value: interruptible operator: Equal effect: NoSchedule + interruptible-node-selector-requirement: + key: x/interruptible + operator: In + values: + - "true" + non-interruptible-node-selector-requirement: + key: x/interruptible + operator: DoesNotExist default-env-vars: - AWS_METADATA_SERVICE_TIMEOUT: 5 - AWS_METADATA_SERVICE_NUM_ATTEMPTS: 20 diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index ab0af8eb1..f8a1db07d 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -224,7 +224,7 @@ func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8s o, err := logPlugin.GetTaskLogs(tasklog.Input{ PodName: pod.Name, Namespace: pod.Namespace, - LogName: fmt.Sprintf(" #%d-%d", index, retryAttempt), + LogName: fmt.Sprintf(" #%d-%d", retryAttempt, index), PodUnixStartTime: pod.CreationTimestamp.Unix(), }) diff --git a/go/tasks/plugins/array/k8s/monitor_test.go b/go/tasks/plugins/array/k8s/monitor_test.go index b261f1dcf..80c4f01c4 100644 --- a/go/tasks/plugins/array/k8s/monitor_test.go +++ b/go/tasks/plugins/array/k8s/monitor_test.go @@ -169,10 +169,10 @@ func TestCheckSubTasksState(t *testing.T) { assert.NotEmpty(t, logLinks) assert.Equal(t, 10, len(logLinks)) for i := 0; i < 10; i = i + 2 { - assert.Equal(t, fmt.Sprintf("Kubernetes Logs #%d-0 (PhaseRunning)", i/2), logLinks[i].Name) + assert.Equal(t, fmt.Sprintf("Kubernetes Logs #0-%d (PhaseRunning)", i/2), logLinks[i].Name) assert.Equal(t, fmt.Sprintf("k8s/log/a-n-b/notfound-%d/pod?namespace=a-n-b", i/2), logLinks[i].Uri) - assert.Equal(t, fmt.Sprintf("Cloudwatch Logs #%d-0 (PhaseRunning)", i/2), logLinks[i+1].Name) + assert.Equal(t, fmt.Sprintf("Cloudwatch Logs #0-%d (PhaseRunning)", i/2), logLinks[i+1].Name) assert.Equal(t, fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.notfound-%d;streamFilter=typeLogStreamPrefix", i/2), logLinks[i+1].Uri) } diff --git a/go/tasks/plugins/array/k8s/task.go b/go/tasks/plugins/array/k8s/task.go index 120e6ea13..8db0ea7c3 100644 --- a/go/tasks/plugins/array/k8s/task.go +++ b/go/tasks/plugins/array/k8s/task.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -45,6 +46,29 @@ const ( LaunchReturnState ) +const finalizer = "flyte/array" + +func addPodFinalizer(pod *corev1.Pod) *corev1.Pod { + pod.Finalizers = append(pod.Finalizers, finalizer) + return pod +} + +func removeString(list []string, target string) []string { + ret := make([]string, 0) + for _, s := range list { + if s != target { + ret = append(ret, s) + } + } + + return ret +} + +func clearFinalizer(pod *corev1.Pod) *corev1.Pod { + pod.Finalizers = removeString(pod.Finalizers, finalizer) + return pod +} + const ( MonitorSuccess MonitorResult = iota MonitorError @@ -107,7 +131,7 @@ func (t Task) Launch(ctx context.Context, tCtx core.TaskExecutionContext, kubeCl pod = ApplyPodPolicies(ctx, t.Config, pod) pod = applyNodeSelectorLabels(ctx, t.Config, pod) pod = applyPodTolerations(ctx, t.Config, pod) - + pod = addPodFinalizer(pod) allocationStatus, err := allocateResource(ctx, tCtx, t.Config, podName) if err != nil { return LaunchError, err @@ -227,8 +251,34 @@ func (t Task) Finalize(ctx context.Context, tCtx core.TaskExecutionContext, kube indexStr := strconv.Itoa(t.ChildIdx) podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr) + pod := &v1.Pod{ + TypeMeta: metaV1.TypeMeta{ + Kind: PodKind, + APIVersion: v1.SchemeGroupVersion.String(), + }, + } + + err := kubeClient.GetClient().Get(ctx, k8sTypes.NamespacedName{ + Name: podName, + Namespace: GetNamespaceForExecution(tCtx, t.Config.NamespaceTemplate), + }, pod) + + if err != nil { + if !k8serrors.IsNotFound(err) { + logger.Errorf(ctx, "Error fetching pod [%s] in Finalize [%s]", podName, err) + return err + } + } else { + pod = clearFinalizer(pod) + err := kubeClient.GetClient().Update(ctx, pod) + if err != nil { + logger.Errorf(ctx, "Error updating pod finalizer [%s] in Finalize [%s]", podName, err) + return err + } + } + // Deallocate Resource - err := deallocateResource(ctx, tCtx, t.Config, t.ChildIdx) + err = deallocateResource(ctx, tCtx, t.Config, t.ChildIdx) if err != nil { logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", podName, err) return err diff --git a/go/tasks/plugins/array/k8s/task_test.go b/go/tasks/plugins/array/k8s/task_test.go index d528045ab..dd3d00afb 100644 --- a/go/tasks/plugins/array/k8s/task_test.go +++ b/go/tasks/plugins/array/k8s/task_test.go @@ -1,13 +1,52 @@ package k8s import ( + "context" "testing" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + + "github.com/stretchr/testify/mock" + + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" + + "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func TestFinalize(t *testing.T) { + ctx := context.Background() + + tCtx := getMockTaskExecutionContext(ctx) + kubeClient := mocks.KubeClient{} + kubeClient.OnGetClient().Return(mocks.NewFakeKubeClient()) + + resourceManager := mocks.ResourceManager{} + podTemplate, _, _ := FlyteArrayJobToK8sPodTemplate(ctx, tCtx, "") + pod := addPodFinalizer(&podTemplate) + pod.Name = formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), "1") + assert.NoError(t, kubeClient.GetClient().Create(ctx, pod)) + + resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + tCtx.OnResourceManager().Return(&resourceManager) + + config := Config{ + MaxArrayJobSize: 100, + ResourceConfig: ResourceConfig{ + PrimaryLabel: "p", + Limit: 10, + }, + } + + task := &Task{ + Config: &config, + ChildIdx: 1, + } + + err := task.Finalize(ctx, tCtx, &kubeClient) + assert.NoError(t, err) +} + func TestGetTaskContainerIndex(t *testing.T) { t.Run("test container target", func(t *testing.T) { pod := &v1.Pod{ diff --git a/go/tasks/plugins/array/k8s/transformer.go b/go/tasks/plugins/array/k8s/transformer.go index 612330714..52ffccc03 100644 --- a/go/tasks/plugins/array/k8s/transformer.go +++ b/go/tasks/plugins/array/k8s/transformer.go @@ -94,40 +94,6 @@ func buildPodMapTask(task *idlCore.TaskTemplate, metadata core.TaskExecutionMeta return pod, nil } -// Here we customize the k8sPod primary container by templatizing args. -// The call to ToK8sPodSpec for the task container target -// case already handles this but we must explicitly do so for K8sPod task targets. -func modifyMapPodTaskPrimaryContainer(ctx context.Context, tCtx core.TaskExecutionContext, arrTCtx *arrayTaskContext, container *v1.Container) error { - var err error - container.Args, err = template.Render(ctx, container.Args, - template.Parameters{ - TaskExecMetadata: tCtx.TaskExecutionMetadata(), - Inputs: arrTCtx.arrayInputReader, - OutputPath: tCtx.OutputWriter(), - Task: tCtx.TaskReader(), - }) - if err != nil { - return err - } - container.Command, err = template.Render(ctx, container.Command, - template.Parameters{ - TaskExecMetadata: tCtx.TaskExecutionMetadata(), - Inputs: arrTCtx.arrayInputReader, - OutputPath: tCtx.OutputWriter(), - Task: tCtx.TaskReader(), - }) - if err != nil { - return err - } - resources := flytek8s.ApplyResourceOverrides(ctx, container.Resources) - if resources != nil { - container.Resources = *resources - } - - container.Env = flytek8s.DecorateEnvVars(ctx, container.Env, tCtx.TaskExecutionMetadata().GetTaskExecutionID()) - return nil -} - // Note that Name is not set on the result object. // It's up to the caller to set the Name before creating the object in K8s. func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionContext, namespaceTemplate string) ( @@ -193,7 +159,14 @@ func FlyteArrayJobToK8sPodTemplate(ctx context.Context, tCtx core.TaskExecutionC if err != nil { return v1.Pod{}, nil, err } - err = modifyMapPodTaskPrimaryContainer(ctx, tCtx, arrTCtx, &pod.Spec.Containers[containerIndex]) + templateParameters := template.Parameters{ + TaskExecMetadata: tCtx.TaskExecutionMetadata(), + Inputs: arrTCtx.arrayInputReader, + OutputPath: tCtx.OutputWriter(), + Task: tCtx.TaskReader(), + } + err = flytek8s.AddFlyteCustomizationsToContainer( + ctx, templateParameters, flytek8s.MergeExistingResources, &pod.Spec.Containers[containerIndex]) if err != nil { return v1.Pod{}, nil, err } diff --git a/go/tasks/plugins/array/k8s/transformer_test.go b/go/tasks/plugins/array/k8s/transformer_test.go index 8741c1197..7b167a362 100644 --- a/go/tasks/plugins/array/k8s/transformer_test.go +++ b/go/tasks/plugins/array/k8s/transformer_test.go @@ -3,6 +3,7 @@ package k8s import ( "context" "encoding/json" + "fmt" "testing" "k8s.io/apimachinery/pkg/api/resource" @@ -168,6 +169,16 @@ func TestFlyteArrayJobToK8sPodTemplate(t *testing.T) { tMeta.OnGetOwnerReference().Return(v12.OwnerReference{}) tMeta.OnGetSecurityContext().Return(core.SecurityContext{}) tMeta.OnGetK8sServiceAccount().Return("sa") + mockResourceOverrides := mocks.TaskOverrides{} + mockResourceOverrides.OnGetResources().Return(&v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "ephemeral-storage": resource.MustParse("1024Mi"), + }, + Limits: v1.ResourceList{ + "ephemeral-storage": resource.MustParse("2048Mi"), + }, + }) + tMeta.OnGetOverrides().Return(&mockResourceOverrides) tID := &mocks.TaskExecutionID{} tID.OnGetID().Return(core.TaskExecutionIdentifier{ NodeExecutionId: &core.NodeExecutionIdentifier{ @@ -214,14 +225,16 @@ func TestFlyteArrayJobToK8sPodTemplate(t *testing.T) { defaultMemoryFromConfig := resource.MustParse("1024Mi") assert.EqualValues(t, v1.ResourceRequirements{ Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1"), - v1.ResourceMemory: defaultMemoryFromConfig, + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: defaultMemoryFromConfig, + v1.ResourceEphemeralStorage: resource.MustParse("1024Mi"), }, Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1"), - v1.ResourceMemory: defaultMemoryFromConfig, + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: defaultMemoryFromConfig, + v1.ResourceEphemeralStorage: resource.MustParse("2048Mi"), }, - }, pod.Spec.Containers[0].Resources) + }, pod.Spec.Containers[0].Resources, fmt.Sprintf("%+v", pod.Spec.Containers[0].Resources)) assert.EqualValues(t, []v1.EnvVar{ { Name: "FLYTE_INTERNAL_EXECUTION_ID", diff --git a/go/tasks/plugins/k8s/sidecar/sidecar.go b/go/tasks/plugins/k8s/sidecar/sidecar.go index c33e814aa..a2161ec20 100755 --- a/go/tasks/plugins/k8s/sidecar/sidecar.go +++ b/go/tasks/plugins/k8s/sidecar/sidecar.go @@ -34,44 +34,29 @@ func validateAndFinalizePod( ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, primaryContainerName string, pod k8sv1.Pod) (*k8sv1.Pod, error) { var hasPrimaryContainer bool - finalizedContainers := make([]k8sv1.Container, len(pod.Spec.Containers)) resReqs := make([]k8sv1.ResourceRequirements, 0, len(pod.Spec.Containers)) for index, container := range pod.Spec.Containers { + var resourceMode = flytek8s.LeaveResourcesUnmodified if container.Name == primaryContainerName { hasPrimaryContainer = true + resourceMode = flytek8s.MergeExistingResources } - modifiedCommand, err := template.Render(ctx, container.Command, template.Parameters{ + templateParameters := template.Parameters{ TaskExecMetadata: taskCtx.TaskExecutionMetadata(), Inputs: taskCtx.InputReader(), OutputPath: taskCtx.OutputWriter(), Task: taskCtx.TaskReader(), - }) - if err != nil { - return nil, err } - container.Command = modifiedCommand - - modifiedArgs, err := template.Render(ctx, container.Args, template.Parameters{ - TaskExecMetadata: taskCtx.TaskExecutionMetadata(), - Inputs: taskCtx.InputReader(), - OutputPath: taskCtx.OutputWriter(), - Task: taskCtx.TaskReader(), - }) + err := flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters, resourceMode, &pod.Spec.Containers[index]) if err != nil { return nil, err } - container.Args = modifiedArgs - container.Env = flytek8s.DecorateEnvVars(ctx, container.Env, taskCtx.TaskExecutionMetadata().GetTaskExecutionID()) - resources := flytek8s.ApplyResourceOverrides(ctx, container.Resources) - resReqs = append(resReqs, *resources) - finalizedContainers[index] = container } if !hasPrimaryContainer { return nil, errors.Errorf(errors.BadTaskSpecification, "invalid Sidecar task, primary container [%s] not defined", primaryContainerName) } - pod.Spec.Containers = finalizedContainers flytek8s.UpdatePod(taskCtx.TaskExecutionMetadata(), resReqs, &pod.Spec) return &pod, nil } diff --git a/go/tasks/plugins/k8s/sidecar/sidecar_test.go b/go/tasks/plugins/k8s/sidecar/sidecar_test.go index 1d68b52d0..2c060be8d 100755 --- a/go/tasks/plugins/k8s/sidecar/sidecar_test.go +++ b/go/tasks/plugins/k8s/sidecar/sidecar_test.go @@ -35,8 +35,8 @@ const ResourceNvidiaGPU = "nvidia.com/gpu" var resourceRequirements = &v1.ResourceRequirements{ Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1024m"), - v1.ResourceStorage: resource.MustParse("100M"), + v1.ResourceCPU: resource.MustParse("2048m"), + v1.ResourceEphemeralStorage: resource.MustParse("100M"), }, } @@ -243,6 +243,17 @@ func TestBuildSidecarResource_TaskType2(t *testing.T) { assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name) checkUserTolerations(t, res) + // Assert resource requirements are correctly set + expectedCPURequest := resource.MustParse("1") + assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value()) + expectedMemRequest := resource.MustParse("100Mi") + assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value()) + expectedCPULimit := resource.MustParse("2048m") + assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value()) + expectedMemLimit := resource.MustParse("200Mi") + assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value()) + expectedEphemeralStorageLimit := resource.MustParse("100M") + assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value()) } func TestBuildSidecarResource_TaskType2_Invalid_Spec(t *testing.T) { @@ -330,6 +341,18 @@ func TestBuildSidecarResource_TaskType1(t *testing.T) { assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name) checkUserTolerations(t, res) + + // Assert resource requirements are correctly set + expectedCPURequest := resource.MustParse("1") + assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value()) + expectedMemRequest := resource.MustParse("100Mi") + assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value()) + expectedCPULimit := resource.MustParse("2048m") + assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value()) + expectedMemLimit := resource.MustParse("200Mi") + assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value()) + expectedEphemeralStorageLimit := resource.MustParse("100M") + assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value()) } func TestBuildSideResource_TaskType1_InvalidSpec(t *testing.T) { @@ -453,6 +476,18 @@ func TestBuildSidecarResource(t *testing.T) { t.Fatalf("unexpected toleration [%+v]", tol) } } + + // Assert resource requirements are correctly set + expectedCPURequest := resource.MustParse("2048m") + assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value()) + expectedMemRequest := resource.MustParse("1024Mi") + assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value()) + expectedCPULimit := resource.MustParse("2048m") + assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value()) + expectedMemLimit := resource.MustParse("1024Mi") + assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value()) + expectedEphemeralStorageLimit := resource.MustParse("100M") + assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value()) } func TestBuildSidecarReosurceMissingAnnotationsAndLabels(t *testing.T) {