From 900121f5e595f46630db57d23cc96f37c462856b Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Nov 2024 18:25:24 -0500 Subject: [PATCH 1/2] Inject offloading literal env vars Signed-off-by: Eduardo Apolinario --- flyteplugins/go.mod | 4 +- flyteplugins/go.sum | 7 +- .../flytek8s/k8s_resource_adds.go | 37 +++++++--- .../flytek8s/k8s_resource_adds_test.go | 72 +++++++++++++++++-- .../pkg/controller/config/config.go | 5 ++ 5 files changed, 109 insertions(+), 16 deletions(-) diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 7616900390..11962b93c9 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -11,6 +11,7 @@ require ( github.com/coocood/freecache v1.1.1 github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26 github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 + github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000 github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000 github.com/go-test/deep v1.0.7 github.com/golang/protobuf v1.5.3 @@ -53,6 +54,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect + github.com/Masterminds/semver v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.0 // indirect @@ -117,7 +119,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect diff --git a/flyteplugins/go.sum b/flyteplugins/go.sum index d11f6b60a3..18242a8638 100644 --- a/flyteplugins/go.sum +++ b/flyteplugins/go.sum @@ -62,6 +62,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 h1:cQyO5JQ2iuHnEcF3v24kdDMsgh04RjyFPDtuvD6PCE0= github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625/go.mod h1:6PnrZv6zUDkrNMw0mIoGRmGBR7i9LulhKPmxFq4rUiM= +github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= +github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aws/aws-sdk-go v1.47.11 h1:Dol+MA+hQblbnXUI3Vk9qvoekU6O1uDEuAItezjiWNQ= @@ -310,6 +312,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks= github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= @@ -389,8 +392,8 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 h1:UNQQKPfTDe1J81ViolILjTKPr9WetKW6uei2hFgJmFs= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go index 3cd000dd40..f22183601e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go @@ -12,6 +12,7 @@ import ( pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + propellerCfg "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytestdlib/contextutils" ) @@ -76,15 +77,6 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1 Name: "FLYTE_ATTEMPT_NUMBER", Value: attemptNumber, }, - // TODO: Fill in these - // { - // Name: "FLYTE_INTERNAL_EXECUTION_WORKFLOW", - // Value: "", - // }, - // { - // Name: "FLYTE_INTERNAL_EXECUTION_LAUNCHPLAN", - // Value: "", - // }, } if len(consoleURL) > 0 { @@ -139,9 +131,36 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1 return envVars } +func GetLiteralOffloadingEnvVars() []v1.EnvVar { + propellerConfig := propellerCfg.GetConfig() + if !propellerConfig.LiteralOffloadingConfig.Enabled { + return []v1.EnvVar{} + } + + envVars := []v1.EnvVar{} + if propellerConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading > 0 { + envVars = append(envVars, + v1.EnvVar{ + Name: "_F_L_MIN_SIZE_MB", + Value: strconv.FormatInt(propellerConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading, 10), + }, + ) + } + if propellerConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading > 0 { + envVars = append(envVars, + v1.EnvVar{ + Name: "_F_L_MAX_SIZE_MB", + Value: strconv.FormatInt(propellerConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading, 10), + }, + ) + } + return envVars +} + func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, envFroms []v1.EnvFromSource, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource) { envVars = append(envVars, GetContextEnvVars(ctx)...) envVars = append(envVars, GetExecutionEnvVars(id, consoleURL)...) + envVars = append(envVars, GetLiteralOffloadingEnvVars()...) for k, v := range taskEnvironmentVariables { envVars = append(envVars, v1.EnvVar{Name: k, Value: v}) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go index fd4828fbbd..e215e23ee6 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go @@ -14,6 +14,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + propellerCfg "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytestdlib/contextutils" ) @@ -304,6 +305,8 @@ func TestDecorateEnvVars(t *testing.T) { expected := append(defaultEnv, GetContextEnvVars(ctx)...) expected = append(expected, GetExecutionEnvVars(mockTaskExecutionIdentifier{}, "")...) + expectedOffloaded := append(expected, v12.EnvVar{Name: "_F_L_MIN_SIZE_MB", Value: "1"}) + expectedOffloaded = append(expectedOffloaded, v12.EnvVar{Name: "_F_L_MAX_SIZE_MB", Value: "42"}) aggregated := append(expected, v12.EnvVar{Name: "k", Value: "v"}) type args struct { @@ -315,17 +318,78 @@ func TestDecorateEnvVars(t *testing.T) { args args additionEnvVar map[string]string additionEnvVarFromEnv map[string]string + offloadingEnabled bool + offloadingEnvVar map[string]string executionEnvVar map[string]string consoleURL string want []v12.EnvVar }{ - {"no-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, emptyEnvVar, "", expected}, - {"with-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, additionalEnv, emptyEnvVar, emptyEnvVar, "", aggregated}, - {"from-env", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, envVarsFromEnv, emptyEnvVar, "", aggregated}, - {"from-execution-metadata", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, additionalEnv, "", aggregated}, + { + "no-additional", + args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, + emptyEnvVar, + emptyEnvVar, + false, + emptyEnvVar, + emptyEnvVar, + "", + expected, + }, + { + "no-additional-offloading-enabled", + args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, + emptyEnvVar, + emptyEnvVar, + true, + emptyEnvVar, + emptyEnvVar, + "", + expectedOffloaded, + }, + { + "with-additional", + args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, + additionalEnv, + emptyEnvVar, + false, + emptyEnvVar, + emptyEnvVar, + "", + aggregated, + }, + { + "from-env", + args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, + emptyEnvVar, + envVarsFromEnv, + false, + emptyEnvVar, + emptyEnvVar, + "", + aggregated, + }, + { + "from-execution-metadata", + args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, + emptyEnvVar, + emptyEnvVar, + false, + emptyEnvVar, + additionalEnv, + "", + aggregated, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + assert.NoError(t, propellerCfg.SetConfig(&propellerCfg.Config{ + LiteralOffloadingConfig: propellerCfg.LiteralOffloadingConfig{ + Enabled: tt.offloadingEnabled, + MinSizeInMBForOffloading: 1, + MaxSizeInMBForOffloading: 42, + }, + })) + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ DefaultEnvVars: tt.additionEnvVar, DefaultEnvVarsFromEnv: tt.additionEnvVarFromEnv, diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 4801b8993a..24b8e0032a 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -356,6 +356,11 @@ func GetConfig() *Config { return configSection.GetConfig().(*Config) } +// SetConfig should be used for TESTING ONLY as it sets current value for the config. +func SetConfig(cfg *Config) error { + return configSection.SetConfig(cfg) +} + // MustRegisterSubSection can be used to configure any subsections the the propeller configuration func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section { return configSection.MustRegisterSection(subSectionKey, section) From 9b1f1083e6890821977da5db2e028825354a87ac Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Nov 2024 19:02:30 -0500 Subject: [PATCH 2/2] Remove SetConfig Signed-off-by: Eduardo Apolinario --- .../flytek8s/k8s_resource_adds_test.go | 13 ++++++------- flytepropeller/pkg/controller/config/config.go | 5 ----- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go index e215e23ee6..09506d155a 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go @@ -382,13 +382,12 @@ func TestDecorateEnvVars(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.NoError(t, propellerCfg.SetConfig(&propellerCfg.Config{ - LiteralOffloadingConfig: propellerCfg.LiteralOffloadingConfig{ - Enabled: tt.offloadingEnabled, - MinSizeInMBForOffloading: 1, - MaxSizeInMBForOffloading: 42, - }, - })) + cfg := propellerCfg.GetConfig() + cfg.LiteralOffloadingConfig = propellerCfg.LiteralOffloadingConfig{ + Enabled: tt.offloadingEnabled, + MinSizeInMBForOffloading: 1, + MaxSizeInMBForOffloading: 42, + } assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ DefaultEnvVars: tt.additionEnvVar, diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 24b8e0032a..4801b8993a 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -356,11 +356,6 @@ func GetConfig() *Config { return configSection.GetConfig().(*Config) } -// SetConfig should be used for TESTING ONLY as it sets current value for the config. -func SetConfig(cfg *Config) error { - return configSection.SetConfig(cfg) -} - // MustRegisterSubSection can be used to configure any subsections the the propeller configuration func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section { return configSection.MustRegisterSection(subSectionKey, section)