Skip to content

Commit

Permalink
Only require default plugin configuration for task types with multipl…
Browse files Browse the repository at this point in the history
…e registered plugins (flyteorg#405)

* task types with only one registered plugin use it - otherwise declare default plugin in config

Signed-off-by: Daniel Rammer <[email protected]>

* fixed test and lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updated task-config flags

Signed-off-by: Daniel Rammer <[email protected]>

* converted to named return values

Signed-off-by: Daniel Rammer <[email protected]>

* failing to initialize plugins on nil TaskConfig

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins version

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins to merged version

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Mar 10, 2022
1 parent bd1f108 commit e30da6f
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 37 deletions.
4 changes: 2 additions & 2 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.23.0
github.com/flyteorg/flyteplugins v0.10.9
github.com/flyteorg/flyteplugins v0.10.10
github.com/flyteorg/flytestdlib v0.4.12
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/imdario/mergo v0.3.11 // indirect
github.com/imdario/mergo v0.3.11
github.com/magiconair/properties v1.8.4
github.com/mitchellh/mapstructure v1.4.1
github.com/pkg/errors v0.9.1
Expand Down
10 changes: 2 additions & 8 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,10 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.21.23/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.22.0 h1:lVAXyacTCIX6Fl0qWoFzyto9Hfx0ADlyPhHPmOMiuIY=
github.com/flyteorg/flyteidl v0.22.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.22.2-0.20220217205210-d817026005f3 h1:b/mCyBe9+Yw1LTBJRRXtsZO6TH6TqzzKaup8gkhus2U=
github.com/flyteorg/flyteidl v0.22.2-0.20220217205210-d817026005f3/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.22.4-0.20220301015125-8c9756723d27 h1:qrqTtWay6wRaJYYapPVeKOqMonsw+AnkYtkoYEj1k4Y=
github.com/flyteorg/flyteidl v0.22.4-0.20220301015125-8c9756723d27/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.23.0 h1:Pjl9Tq1pJfIK0au5PiqPVpl25xTYosN6BruZl+PgWAk=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.10.9 h1:Hj4kBc2pNAJOTUP14+KSd44RzSE+A6fMEnTMQXhE6r8=
github.com/flyteorg/flyteplugins v0.10.9/go.mod h1:RXgHGGUGC1akEnAd0yi4cLuYP1BF1rVkxhGjzIrm6VU=
github.com/flyteorg/flyteplugins v0.10.10 h1:sd4HMOFZo5VrjfCiPfTabyh3DcBDL5eVPNsubjPEyug=
github.com/flyteorg/flyteplugins v0.10.10/go.mod h1:RXgHGGUGC1akEnAd0yi4cLuYP1BF1rVkxhGjzIrm6VU=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.4.7/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs=
github.com/flyteorg/flytestdlib v0.4.12 h1:e88Tcu+8ug6T2tzzxRltbsuBkcz4coAquF9xqwIrmLw=
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/flyteorg/flytestdlib/config"
)

//go:generate pflags Config --default-var defaultConfig
//go:generate pflags Config

const SectionKey = "tasks"

Expand Down Expand Up @@ -50,7 +50,7 @@ type BarrierConfig struct {
}

type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",deprecated"`
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
// Maps task types to their plugin handler (by ID).
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {

// Create the resource negotiator here
// and then convert it to proxies later and pass them to plugins
enabledPlugins, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry)
enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry)
if err != nil {
logger.Errorf(ctx, "Failed to finalize enabled plugins. Error: %s", err)
return err
Expand All @@ -244,7 +244,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {
// For every default plugin for a task type specified in flytepropeller config we validate that the plugin's
// static definition includes that task type as something it is registered to handle.
for _, tt := range p.RegisteredTaskTypes {
for _, defaultTaskType := range p.DefaultForTaskTypes {
for _, defaultTaskType := range defaultForTaskTypes[cp.GetID()] {
if defaultTaskType == tt {
if existingHandler, alreadyDefaulted := t.defaultPlugins[tt]; alreadyDefaulted && existingHandler.GetID() != cp.GetID() {
logger.Errorf(ctx, "TaskType [%s] has multiple default handlers specified: [%s] and [%s]",
Expand Down
33 changes: 14 additions & 19 deletions flytepropeller/pkg/controller/nodes/task/plugin_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package task

import (
"context"
"fmt"
"strings"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff"
Expand All @@ -13,23 +14,21 @@ import (
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/k8s"
)

func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) ([]core.PluginEntry, error) {
allPluginsEnabled := false
pluginsConfigMeta := config.PluginsConfigMeta{
AllDefaultForTaskTypes: map[pluginID][]taskType{},
func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) {
if cfg == nil {
return nil, nil, fmt.Errorf("unable to initialize plugin list, cfg is a required argument")
}
var err error
if cfg != nil {
pluginsConfigMeta, err = cfg.GetEnabledPlugins()
if err != nil {
return nil, err
}

pluginsConfigMeta, err := cfg.GetEnabledPlugins()
if err != nil {
return nil, nil, err
}

allPluginsEnabled := false
if pluginsConfigMeta.EnabledPlugins.Len() == 0 {
allPluginsEnabled = true
}

var finalizedPlugins []core.PluginEntry
logger.Infof(ctx, "Enabled plugins: %v", pluginsConfigMeta.EnabledPlugins.List())
logger.Infof(ctx, "Loading core Plugins, plugin configuration [all plugins enabled: %v]", allPluginsEnabled)
for _, cpe := range pr.GetCorePlugins() {
Expand All @@ -38,10 +37,7 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
logger.Infof(ctx, "Plugin [%s] is DISABLED (not found in enabled plugins list).", id)
} else {
logger.Infof(ctx, "Plugin [%s] ENABLED", id)
if defaults, ok := pluginsConfigMeta.AllDefaultForTaskTypes[id]; ok {
cpe.DefaultForTaskTypes = defaults
}
finalizedPlugins = append(finalizedPlugins, cpe)
enabledPlugins = append(enabledPlugins, cpe)
}
}

Expand All @@ -65,11 +61,10 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) {
return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex)
},
IsDefault: kpe.IsDefault,
DefaultForTaskTypes: pluginsConfigMeta.AllDefaultForTaskTypes[id],
IsDefault: kpe.IsDefault,
}
finalizedPlugins = append(finalizedPlugins, plugin)
enabledPlugins = append(enabledPlugins, plugin)
}
}
return finalizedPlugins, nil
return enabledPlugins, pluginsConfigMeta.AllDefaultForTaskTypes, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) {
}{
{"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}},
{"no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: nil}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}},
{"no-config-no-plugins", args{}, want{}},
{"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}},
{"no-config-no-plugins", args{}, want{err: true}},
{"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{err: true}},
{"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}},
{"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
{"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
Expand All @@ -61,7 +61,7 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) {
core: tt.args.corePlugins,
k8s: tt.args.k8sPlugins,
}
got, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr)
got, _, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr)
if (err != nil) != tt.want.err {
t.Errorf("WranglePluginsAndGenerateFinalList() error = %v, wantErr %v", err, tt.want.err)
return
Expand Down

0 comments on commit e30da6f

Please sign in to comment.