Skip to content

Commit

Permalink
Inject offloading literal env vars (flyteorg#6027)
Browse files Browse the repository at this point in the history
* Inject offloading literal env vars

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove SetConfig

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario and eapolinario authored Nov 19, 2024
1 parent 859da05 commit 45935e4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 16 deletions.
4 changes: 3 additions & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/coocood/freecache v1.1.1
github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.3
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.0 // indirect
Expand Down Expand Up @@ -117,7 +119,7 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
Expand Down
7 changes: 5 additions & 2 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 h1:cQyO5JQ2iuHnEcF3v24kdDMsgh04RjyFPDtuvD6PCE0=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625/go.mod h1:6PnrZv6zUDkrNMw0mIoGRmGBR7i9LulhKPmxFq4rUiM=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aws/aws-sdk-go v1.47.11 h1:Dol+MA+hQblbnXUI3Vk9qvoekU6O1uDEuAItezjiWNQ=
Expand Down Expand Up @@ -310,6 +312,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks=
github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
Expand Down Expand Up @@ -389,8 +392,8 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 h1:UNQQKPfTDe1J81ViolILjTKPr9WetKW6uei2hFgJmFs=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
propellerCfg "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
)

Expand Down Expand Up @@ -68,15 +69,6 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1
Name: "FLYTE_ATTEMPT_NUMBER",
Value: attemptNumber,
},
// TODO: Fill in these
// {
// Name: "FLYTE_INTERNAL_EXECUTION_WORKFLOW",
// Value: "",
// },
// {
// Name: "FLYTE_INTERNAL_EXECUTION_LAUNCHPLAN",
// Value: "",
// },
}

if len(consoleURL) > 0 {
Expand Down Expand Up @@ -131,9 +123,36 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1
return envVars
}

func GetLiteralOffloadingEnvVars() []v1.EnvVar {
propellerConfig := propellerCfg.GetConfig()
if !propellerConfig.LiteralOffloadingConfig.Enabled {
return []v1.EnvVar{}
}

envVars := []v1.EnvVar{}
if propellerConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading > 0 {
envVars = append(envVars,
v1.EnvVar{
Name: "_F_L_MIN_SIZE_MB",
Value: strconv.FormatInt(propellerConfig.LiteralOffloadingConfig.MinSizeInMBForOffloading, 10),
},
)
}
if propellerConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading > 0 {
envVars = append(envVars,
v1.EnvVar{
Name: "_F_L_MAX_SIZE_MB",
Value: strconv.FormatInt(propellerConfig.LiteralOffloadingConfig.MaxSizeInMBForOffloading, 10),
},
)
}
return envVars
}

func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, envFroms []v1.EnvFromSource, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource) {
envVars = append(envVars, GetContextEnvVars(ctx)...)
envVars = append(envVars, GetExecutionEnvVars(id, consoleURL)...)
envVars = append(envVars, GetLiteralOffloadingEnvVars()...)

for k, v := range taskEnvironmentVariables {
envVars = append(envVars, v1.EnvVar{Name: k, Value: v})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
propellerCfg "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
)

Expand Down Expand Up @@ -304,6 +305,8 @@ func TestDecorateEnvVars(t *testing.T) {

expected := append(defaultEnv, GetContextEnvVars(ctx)...)
expected = append(expected, GetExecutionEnvVars(mockTaskExecutionIdentifier{}, "")...)
expectedOffloaded := append(expected, v12.EnvVar{Name: "_F_L_MIN_SIZE_MB", Value: "1"})
expectedOffloaded = append(expectedOffloaded, v12.EnvVar{Name: "_F_L_MAX_SIZE_MB", Value: "42"})

aggregated := append(expected, v12.EnvVar{Name: "k", Value: "v"})
type args struct {
Expand All @@ -315,17 +318,77 @@ func TestDecorateEnvVars(t *testing.T) {
args args
additionEnvVar map[string]string
additionEnvVarFromEnv map[string]string
offloadingEnabled bool
offloadingEnvVar map[string]string
executionEnvVar map[string]string
consoleURL string
want []v12.EnvVar
}{
{"no-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, emptyEnvVar, "", expected},
{"with-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, additionalEnv, emptyEnvVar, emptyEnvVar, "", aggregated},
{"from-env", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, envVarsFromEnv, emptyEnvVar, "", aggregated},
{"from-execution-metadata", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, additionalEnv, "", aggregated},
{
"no-additional",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
emptyEnvVar,
false,
emptyEnvVar,
emptyEnvVar,
"",
expected,
},
{
"no-additional-offloading-enabled",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
emptyEnvVar,
true,
emptyEnvVar,
emptyEnvVar,
"",
expectedOffloaded,
},
{
"with-additional",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
additionalEnv,
emptyEnvVar,
false,
emptyEnvVar,
emptyEnvVar,
"",
aggregated,
},
{
"from-env",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
envVarsFromEnv,
false,
emptyEnvVar,
emptyEnvVar,
"",
aggregated,
},
{
"from-execution-metadata",
args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}},
emptyEnvVar,
emptyEnvVar,
false,
emptyEnvVar,
additionalEnv,
"",
aggregated,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := propellerCfg.GetConfig()
cfg.LiteralOffloadingConfig = propellerCfg.LiteralOffloadingConfig{
Enabled: tt.offloadingEnabled,
MinSizeInMBForOffloading: 1,
MaxSizeInMBForOffloading: 42,
}

assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
DefaultEnvVars: tt.additionEnvVar,
DefaultEnvVarsFromEnv: tt.additionEnvVarFromEnv,
Expand Down

0 comments on commit 45935e4

Please sign in to comment.