Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into feature/mpi
Browse files Browse the repository at this point in the history
  • Loading branch information
yindia authored Aug 26, 2021
2 parents c4e1aea + e5085cb commit 187b708
Show file tree
Hide file tree
Showing 24 changed files with 876 additions and 138 deletions.
5 changes: 2 additions & 3 deletions CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
This project is governed by [Lyft's code of
conduct](https://github.com/lyft/code-of-conduct). All contributors
and participants agree to abide by its terms.
This project is governed by LF AI Foundation's [code of conduct](https://lfprojects.org/policies/code-of-conduct/).
All contributors and participants agree to abide by its terms.
2 changes: 2 additions & 0 deletions boilerplate/flyte/code_of_conduct/CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
This project is governed by LF AI Foundation's [code of conduct](https://lfprojects.org/policies/code-of-conduct/).
All contributors and participants agree to abide by its terms.
2 changes: 2 additions & 0 deletions boilerplate/flyte/code_of_conduct/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CODE OF CONDUCT
~~~~~~~~~~~~~~~
12 changes: 12 additions & 0 deletions boilerplate/flyte/code_of_conduct/update.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env bash

# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES.
# ONLY EDIT THIS FILE FROM WITHIN THE 'FLYTEORG/BOILERPLATE' REPOSITORY:
#
# TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst

set -e

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"

cp ${DIR}/CODE_OF_CONDUCT.md ${DIR}/../../../CODE_OF_CONDUCT.md
2 changes: 1 addition & 1 deletion boilerplate/flyte/golang_test_targets/goimports
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
#
# TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst

goimports -w $(find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./pkg/client/*")
goimports -w $(find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./pkg/client/*" -not -path "./boilerplate/*")
1 change: 1 addition & 0 deletions boilerplate/update.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ flyte/golang_support_tools
flyte/pull_request_template
flyte/docker_build
flyte/welcome_bot
flyte/code_of_conduct
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.0.0
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v0.0.0-00010101000000-000000000000
github.com/flyteorg/flytestdlib v0.3.22
github.com/flyteorg/flyteidl v0.19.2
github.com/flyteorg/flytestdlib v0.3.33
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.4.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.22 h1:nJEPaCdxzXBaeg2p4fdo3I3Ua09NedFRaUwuLafLEdw=
github.com/flyteorg/flytestdlib v0.3.22/go.mod h1:1XG0DwYTUm34Yrffm1Qy9Tdr/pWQypEqTq5dUxw3/cM=
github.com/flyteorg/flytestdlib v0.3.33 h1:+oCx3zXUIldL7CWmNMD7PMFPXvGqaPgYkSKn9wB6qvY=
github.com/flyteorg/flytestdlib v0.3.33/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
Expand Down
15 changes: 14 additions & 1 deletion go/tasks/pluginmachinery/flytek8s/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
},
CoPilot: FlyteCoPilotConfig{
NamePrefix: "flyte-copilot-",
Image: "docker.pkg.github.com/flyteorg/flyteplugins/operator:v0.4.0",
Image: "cr.flyte.org/flyteorg/flytecopilot:v0.0.9",
DefaultInputDataPath: "/var/flyte/inputs",
InputVolumeName: "flyte-inputs",
DefaultOutputPath: "/var/flyte/outputs",
Expand All @@ -40,6 +40,9 @@ var (
},
DefaultCPURequest: defaultCPURequest,
DefaultMemoryRequest: defaultMemoryRequest,
CreateContainerErrorGracePeriod: config2.Duration{
Duration: time.Minute * 3,
},
}

// K8sPluginConfigSection provides a singular top level config section for all plugins.
Expand Down Expand Up @@ -89,7 +92,12 @@ type K8sPluginConfig struct {
InterruptibleTolerations []v1.Toleration `json:"interruptible-tolerations" pflag:"-,Tolerations to be applied for interruptible pods"`
// Node Selector Labels for interruptible pods: Similar to InterruptibleTolerations, these node selector labels are added for pods that can tolerate
// eviction.
// Deprecated: Please use InterruptibleNodeSelectorRequirement/NonInterruptibleNodeSelectorRequirement
InterruptibleNodeSelector map[string]string `json:"interruptible-node-selector" pflag:"-,Defines a set of node selector labels to add to the interruptible pods."`
// Node Selector Requirements to be added to interruptible and non-interruptible
// pods respectively
InterruptibleNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"interruptible-node-selector-requirement" pflag:"-,Node selector requirement to add to interruptible pods"`
NonInterruptibleNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"non-interruptible-node-selector-requirement" pflag:"-,Node selector requirement to add to non-interruptible pods"`

// ----------------------------------------------------------------------
// Specific tolerations that are added for certain resources. Useful for maintaining gpu resources separate in the cluster
Expand All @@ -105,6 +113,11 @@ type K8sPluginConfig struct {
// are kept around (potentially consuming cluster resources). This, however, will cause k8s log links to expire as
// soon as the resource is finalized.
DeleteResourceOnFinalize bool `json:"delete-resource-on-finalize" pflag:",Instructs the system to delete the resource on finalize. This ensures that no resources are kept around (potentially consuming cluster resources). This, however, will cause k8s log links to expire as soon as the resource is finalized."`

// Time to wait for transient CreateContainerError errors to be resolved. If the
// error persists past this grace period, it will be inferred to be a permanent
// one, and the corresponding task marked as failed
CreateContainerErrorGracePeriod config2.Duration `json:"create-container-error-grace-period" pflag:"-,Time to wait for transient CreateContainerError errors to be resolved."`
}

type FlyteCoPilotConfig struct {
Expand Down
109 changes: 77 additions & 32 deletions go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ const resourceGPU = "gpu"
// Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go
const ResourceNvidiaGPU = "nvidia.com/gpu"

func MergeResources(in v1.ResourceRequirements, out *v1.ResourceRequirements) {
if out.Limits == nil {
out.Limits = in.Limits
} else if in.Limits != nil {
for key, val := range in.Limits {
out.Limits[key] = val
}
}
if out.Requests == nil {
out.Requests = in.Requests
} else if in.Requests != nil {
for key, val := range in.Requests {
out.Requests[key] = val
}
}
}

func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements {
// set memory and cpu to default if not provided by user.
if len(resources.Requests) == 0 {
Expand All @@ -43,7 +60,7 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen

if _, found := resources.Requests[v1.ResourceMemory]; !found {
// use memory limit if set else default to config
if _, limitSet := resources.Limits[v1.ResourceCPU]; limitSet {
if _, limitSet := resources.Limits[v1.ResourceMemory]; limitSet {
resources.Requests[v1.ResourceMemory] = resources.Limits[v1.ResourceMemory]
} else {
resources.Requests[v1.ResourceMemory] = resource.MustParse(config.GetK8sPluginConfig().DefaultMemoryRequest)
Expand All @@ -60,13 +77,20 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen
resources.Limits[v1.ResourceMemory] = resources.Requests[v1.ResourceMemory]
}

// Ephemeral storage resources aren't required but if one of requests or limits is set and the other isn't, we'll
// just use the same values.
if _, requested := resources.Requests[v1.ResourceEphemeralStorage]; !requested {
if _, limitSet := resources.Limits[v1.ResourceEphemeralStorage]; limitSet {
resources.Requests[v1.ResourceEphemeralStorage] = resources.Limits[v1.ResourceEphemeralStorage]
}
} else if _, limitSet := resources.Limits[v1.ResourceEphemeralStorage]; !limitSet {
resources.Limits[v1.ResourceEphemeralStorage] = resources.Requests[v1.ResourceEphemeralStorage]
}

// TODO: Make configurable. 1/15/2019 Flyte Cluster doesn't support setting storage requests/limits.
// https://github.com/kubernetes/enhancements/issues/362
delete(resources.Requests, v1.ResourceStorage)
delete(resources.Requests, v1.ResourceEphemeralStorage)

delete(resources.Limits, v1.ResourceStorage)
delete(resources.Limits, v1.ResourceEphemeralStorage)

// Override GPU
if res, found := resources.Requests[resourceGPU]; found {
Expand All @@ -81,53 +105,74 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen
return &resources
}

// Returns a K8s Container for the execution
// Transforms a task template target of type core.Container into a bare-bones kubernetes container, which can be further
// modified with flyte-specific customizations specified by various static and run-time attributes.
func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *core.TypedInterface, parameters template.Parameters) (*v1.Container, error) {
modifiedCommand, err := template.Render(ctx, taskContainer.GetCommand(), parameters)
if err != nil {
return nil, err
}

modifiedArgs, err := template.Render(ctx, taskContainer.GetArgs(), parameters)
if err != nil {
return nil, err
}

envVars := DecorateEnvVars(ctx, ToK8sEnvVar(taskContainer.GetEnv()), parameters.TaskExecMetadata.GetTaskExecutionID())

// Perform preliminary validations
if parameters.TaskExecMetadata.GetOverrides() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "platform/compiler error, overrides not set for task")
}
if parameters.TaskExecMetadata.GetOverrides() == nil || parameters.TaskExecMetadata.GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}

res := parameters.TaskExecMetadata.GetOverrides().GetResources()
if res != nil {
res = ApplyResourceOverrides(ctx, *res)
}

// Make the container name the same as the pod name, unless it violates K8s naming conventions
// Container names are subject to the DNS-1123 standard
containerName := parameters.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
if errs := validation.IsDNS1123Label(containerName); len(errs) > 0 {
containerName = rand.String(4)
}
c := &v1.Container{
container := &v1.Container{
Name: containerName,
Image: taskContainer.GetImage(),
Args: modifiedArgs,
Command: modifiedCommand,
Env: envVars,
Args: taskContainer.GetArgs(),
Command: taskContainer.GetCommand(),
Env: ToK8sEnvVar(taskContainer.GetEnv()),
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
}
if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, container, iFace, taskContainer.DataConfig); err != nil {
return nil, err
}
return container, nil
}

type ResourceCustomizationMode int

const (
AssignResources ResourceCustomizationMode = iota
MergeExistingResources
LeaveResourcesUnmodified
)

if res != nil {
c.Resources = *res
// Takes a container definition which specifies how to run a Flyte task and fills in templated command and argument
// values, updates resources and decorates environment variables with platform and task-specific customizations.
func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.Parameters,
mode ResourceCustomizationMode, container *v1.Container) error {
modifiedCommand, err := template.Render(ctx, container.Command, parameters)
if err != nil {
return err
}
container.Command = modifiedCommand

if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, c, iFace, taskContainer.DataConfig); err != nil {
return nil, err
modifiedArgs, err := template.Render(ctx, container.Args, parameters)
if err != nil {
return err
}
container.Args = modifiedArgs

container.Env = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetTaskExecutionID())

if parameters.TaskExecMetadata.GetOverrides() != nil && parameters.TaskExecMetadata.GetOverrides().GetResources() != nil {
res := parameters.TaskExecMetadata.GetOverrides().GetResources()
switch mode {
case AssignResources:
if res = ApplyResourceOverrides(ctx, *res); res != nil {
container.Resources = *res
}
case MergeExistingResources:
MergeResources(*res, &container.Resources)
container.Resources = *ApplyResourceOverrides(ctx, container.Resources)
case LeaveResourcesUnmodified:
}
}
return c, nil
return nil
}
Loading

0 comments on commit 187b708

Please sign in to comment.