Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject offloading literal env vars #6027

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/coocood/freecache v1.1.1
github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.3
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.0 // indirect
Expand Down Expand Up @@ -117,7 +119,7 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
Expand Down
7 changes: 5 additions & 2 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 h1:cQyO5JQ2iuHnEcF3v24kdDMsgh04RjyFPDtuvD6PCE0=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625/go.mod h1:6PnrZv6zUDkrNMw0mIoGRmGBR7i9LulhKPmxFq4rUiM=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aws/aws-sdk-go v1.47.11 h1:Dol+MA+hQblbnXUI3Vk9qvoekU6O1uDEuAItezjiWNQ=
Expand Down Expand Up @@ -310,6 +312,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks=
github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
Expand Down Expand Up @@ -389,8 +392,8 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 h1:UNQQKPfTDe1J81ViolILjTKPr9WetKW6uei2hFgJmFs=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
propellerCfg "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
)

Expand Down Expand Up @@ -76,15 +77,6 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1
Name: "FLYTE_ATTEMPT_NUMBER",
Value: attemptNumber,
},
// TODO: Fill in these
// {
// Name: "FLYTE_INTERNAL_EXECUTION_WORKFLOW",
// Value: "",
// },
// {
// Name: "FLYTE_INTERNAL_EXECUTION_LAUNCHPLAN",
// Value: "",
// },
}

if len(consoleURL) > 0 {
Expand Down Expand Up @@ -139,9 +131,36 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1
return envVars
}

func GetLiteralOffloadingEnvVars() []v1.EnvVar {
propellerConfig := propellerCfg.GetConfig()
if !propellerConfig.LiteralOffloadingConfig.Enabled {
return []v1.EnvVar{}
}

envVars := []v1.EnvVar{}
if propellerConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading > 0 {
envVars = append(envVars,
v1.EnvVar{
Name: "_F_L_MIN_SIZE_MB",
Value: strconv.FormatInt(propellerConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading, 10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could directly use strconv.Itoa() which defaults to base 10

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://pkg.go.dev/strconv#FormatInt handles int64 directly (which those two values are, instead of int).

},
)
}
if propellerConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading > 0 {
envVars = append(envVars,
v1.EnvVar{
Name: "_F_L_MAX_SIZE_MB",
Value: strconv.FormatInt(propellerConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading, 10),
},
)
}
return envVars
}

func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, envFroms []v1.EnvFromSource, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource) {
envVars = append(envVars, GetContextEnvVars(ctx)...)
envVars = append(envVars, GetExecutionEnvVars(id, consoleURL)...)
envVars = append(envVars, GetLiteralOffloadingEnvVars()...)

for k, v := range taskEnvironmentVariables {
envVars = append(envVars, v1.EnvVar{Name: k, Value: v})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
propellerCfg "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
)

Expand Down Expand Up @@ -304,6 +305,8 @@ func TestDecorateEnvVars(t *testing.T) {

expected := append(defaultEnv, GetContextEnvVars(ctx)...)
expected = append(expected, GetExecutionEnvVars(mockTaskExecutionIdentifier{}, "")...)
expectedOffloaded := append(expected, v12.EnvVar{Name: "_F_L_MIN_SIZE_MB", Value: "1"})
expectedOffloaded = append(expectedOffloaded, v12.EnvVar{Name: "_F_L_MAX_SIZE_MB", Value: "42"})

aggregated := append(expected, v12.EnvVar{Name: "k", Value: "v"})
type args struct {
Expand All @@ -315,17 +318,77 @@ func TestDecorateEnvVars(t *testing.T) {
args args
additionEnvVar map[string]string
additionEnvVarFromEnv map[string]string
offloadingEnabled bool
offloadingEnvVar map[string]string
executionEnvVar map[string]string
consoleURL string
want []v12.EnvVar
}{
{"no-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, emptyEnvVar, "", expected},
{"with-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, additionalEnv, emptyEnvVar, emptyEnvVar, "", aggregated},
{"from-env", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, envVarsFromEnv, emptyEnvVar, "", aggregated},
{"from-execution-metadata", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, additionalEnv, "", aggregated},
{
"no-additional",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
emptyEnvVar,
false,
emptyEnvVar,
emptyEnvVar,
"",
expected,
},
{
"no-additional-offloading-enabled",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
emptyEnvVar,
true,
emptyEnvVar,
emptyEnvVar,
"",
expectedOffloaded,
},
{
"with-additional",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
additionalEnv,
emptyEnvVar,
false,
emptyEnvVar,
emptyEnvVar,
"",
aggregated,
},
{
"from-env",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
envVarsFromEnv,
false,
emptyEnvVar,
emptyEnvVar,
"",
aggregated,
},
{
"from-execution-metadata",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
emptyEnvVar,
false,
emptyEnvVar,
additionalEnv,
"",
aggregated,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := propellerCfg.GetConfig()
cfg.LiteralOffloadingConfig = propellerCfg.LiteralOffloadingConfig{
Enabled: tt.offloadingEnabled,
MinSizeInMBForOffloading: 1,
MaxSizeInMBForOffloading: 42,
}

assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
DefaultEnvVars: tt.additionEnvVar,
DefaultEnvVarsFromEnv: tt.additionEnvVarFromEnv,
Expand Down
Loading