diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index 05c6933acf..cf31d285e3 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.23.0 - github.com/flyteorg/flyteplugins v0.10.9 + github.com/flyteorg/flyteplugins v0.10.10 github.com/flyteorg/flytestdlib v0.4.12 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible @@ -15,7 +15,7 @@ require ( github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.2.0 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 - github.com/imdario/mergo v0.3.11 // indirect + github.com/imdario/mergo v0.3.11 github.com/magiconair/properties v1.8.4 github.com/mitchellh/mapstructure v1.4.1 github.com/pkg/errors v0.9.1 diff --git a/flytepropeller/go.sum b/flytepropeller/go.sum index 1e82c22cd4..6ebc34267e 100644 --- a/flytepropeller/go.sum +++ b/flytepropeller/go.sum @@ -237,16 +237,10 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.21.23/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.22.0 h1:lVAXyacTCIX6Fl0qWoFzyto9Hfx0ADlyPhHPmOMiuIY= -github.com/flyteorg/flyteidl v0.22.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.22.2-0.20220217205210-d817026005f3 h1:b/mCyBe9+Yw1LTBJRRXtsZO6TH6TqzzKaup8gkhus2U= -github.com/flyteorg/flyteidl v0.22.2-0.20220217205210-d817026005f3/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.22.4-0.20220301015125-8c9756723d27 h1:qrqTtWay6wRaJYYapPVeKOqMonsw+AnkYtkoYEj1k4Y= -github.com/flyteorg/flyteidl v0.22.4-0.20220301015125-8c9756723d27/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.23.0 h1:Pjl9Tq1pJfIK0au5PiqPVpl25xTYosN6BruZl+PgWAk= github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteplugins v0.10.9 h1:Hj4kBc2pNAJOTUP14+KSd44RzSE+A6fMEnTMQXhE6r8= -github.com/flyteorg/flyteplugins v0.10.9/go.mod h1:RXgHGGUGC1akEnAd0yi4cLuYP1BF1rVkxhGjzIrm6VU= +github.com/flyteorg/flyteplugins v0.10.10 h1:sd4HMOFZo5VrjfCiPfTabyh3DcBDL5eVPNsubjPEyug= +github.com/flyteorg/flyteplugins v0.10.10/go.mod h1:RXgHGGUGC1akEnAd0yi4cLuYP1BF1rVkxhGjzIrm6VU= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.4.7/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs= github.com/flyteorg/flytestdlib v0.4.12 h1:e88Tcu+8ug6T2tzzxRltbsuBkcz4coAquF9xqwIrmLw= diff --git a/flytepropeller/pkg/controller/nodes/task/config/config.go b/flytepropeller/pkg/controller/nodes/task/config/config.go index 5615d8a7d6..36bdfdb48f 100644 --- a/flytepropeller/pkg/controller/nodes/task/config/config.go +++ b/flytepropeller/pkg/controller/nodes/task/config/config.go @@ -12,7 +12,7 @@ import ( "github.com/flyteorg/flytestdlib/config" ) -//go:generate pflags Config --default-var defaultConfig +//go:generate pflags Config const SectionKey = "tasks" @@ -50,7 +50,7 @@ type BarrierConfig struct { } type TaskPluginConfig struct { - EnabledPlugins []string `json:"enabled-plugins" pflag:",deprecated"` + EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"` // Maps task types to their plugin handler (by ID). DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"` } diff --git a/flytepropeller/pkg/controller/nodes/task/config/config_flags.go b/flytepropeller/pkg/controller/nodes/task/config/config_flags.go index 9d96d8a058..059eac2932 100755 --- a/flytepropeller/pkg/controller/nodes/task/config/config_flags.go +++ b/flytepropeller/pkg/controller/nodes/task/config/config_flags.go @@ -50,7 +50,7 @@ func (Config) mustMarshalJSON(v json.Marshaler) string { // flags is json-name.json-sub-name... etc. func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags := pflag.NewFlagSet("Config", pflag.ExitOnError) - cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "task-plugins.enabled-plugins"), []string{}, "deprecated") + cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "task-plugins.enabled-plugins"), []string{}, "Plugins enabled currently") cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "max-plugin-phase-versions"), defaultConfig.MaxPluginPhaseVersions, "Maximum number of plugin phase versions allowed for one phase.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "barrier.enabled"), defaultConfig.BarrierConfig.Enabled, "Enable Barrier transitions using inmemory context") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "barrier.cache-size"), defaultConfig.BarrierConfig.CacheSize, "Max number of barrier to preserve in memory") diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index f2c62a106b..ebe9cebd54 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -221,7 +221,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error { // Create the resource negotiator here // and then convert it to proxies later and pass them to plugins - enabledPlugins, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry) + enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry) if err != nil { logger.Errorf(ctx, "Failed to finalize enabled plugins. Error: %s", err) return err @@ -244,7 +244,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error { // For every default plugin for a task type specified in flytepropeller config we validate that the plugin's // static definition includes that task type as something it is registered to handle. for _, tt := range p.RegisteredTaskTypes { - for _, defaultTaskType := range p.DefaultForTaskTypes { + for _, defaultTaskType := range defaultForTaskTypes[cp.GetID()] { if defaultTaskType == tt { if existingHandler, alreadyDefaulted := t.defaultPlugins[tt]; alreadyDefaulted && existingHandler.GetID() != cp.GetID() { logger.Errorf(ctx, "TaskType [%s] has multiple default handlers specified: [%s] and [%s]", diff --git a/flytepropeller/pkg/controller/nodes/task/plugin_config.go b/flytepropeller/pkg/controller/nodes/task/plugin_config.go index 0c2e925d47..e6f546219d 100644 --- a/flytepropeller/pkg/controller/nodes/task/plugin_config.go +++ b/flytepropeller/pkg/controller/nodes/task/plugin_config.go @@ -2,6 +2,7 @@ package task import ( "context" + "fmt" "strings" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" @@ -13,23 +14,21 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/k8s" ) -func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) ([]core.PluginEntry, error) { - allPluginsEnabled := false - pluginsConfigMeta := config.PluginsConfigMeta{ - AllDefaultForTaskTypes: map[pluginID][]taskType{}, +func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) { + if cfg == nil { + return nil, nil, fmt.Errorf("unable to initialize plugin list, cfg is a required argument") } - var err error - if cfg != nil { - pluginsConfigMeta, err = cfg.GetEnabledPlugins() - if err != nil { - return nil, err - } + + pluginsConfigMeta, err := cfg.GetEnabledPlugins() + if err != nil { + return nil, nil, err } + + allPluginsEnabled := false if pluginsConfigMeta.EnabledPlugins.Len() == 0 { allPluginsEnabled = true } - var finalizedPlugins []core.PluginEntry logger.Infof(ctx, "Enabled plugins: %v", pluginsConfigMeta.EnabledPlugins.List()) logger.Infof(ctx, "Loading core Plugins, plugin configuration [all plugins enabled: %v]", allPluginsEnabled) for _, cpe := range pr.GetCorePlugins() { @@ -38,10 +37,7 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu logger.Infof(ctx, "Plugin [%s] is DISABLED (not found in enabled plugins list).", id) } else { logger.Infof(ctx, "Plugin [%s] ENABLED", id) - if defaults, ok := pluginsConfigMeta.AllDefaultForTaskTypes[id]; ok { - cpe.DefaultForTaskTypes = defaults - } - finalizedPlugins = append(finalizedPlugins, cpe) + enabledPlugins = append(enabledPlugins, cpe) } } @@ -65,11 +61,10 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) { return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex) }, - IsDefault: kpe.IsDefault, - DefaultForTaskTypes: pluginsConfigMeta.AllDefaultForTaskTypes[id], + IsDefault: kpe.IsDefault, } - finalizedPlugins = append(finalizedPlugins, plugin) + enabledPlugins = append(enabledPlugins, plugin) } } - return finalizedPlugins, nil + return enabledPlugins, pluginsConfigMeta.AllDefaultForTaskTypes, nil } diff --git a/flytepropeller/pkg/controller/nodes/task/plugin_config_test.go b/flytepropeller/pkg/controller/nodes/task/plugin_config_test.go index 11b27f8e92..2dbf9f31c8 100644 --- a/flytepropeller/pkg/controller/nodes/task/plugin_config_test.go +++ b/flytepropeller/pkg/controller/nodes/task/plugin_config_test.go @@ -49,8 +49,8 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) { }{ {"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}}, {"no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: nil}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}}, - {"no-config-no-plugins", args{}, want{}}, - {"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}}, + {"no-config-no-plugins", args{}, want{err: true}}, + {"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{err: true}}, {"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}}, {"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, {"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, @@ -61,7 +61,7 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) { core: tt.args.corePlugins, k8s: tt.args.k8sPlugins, } - got, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr) + got, _, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr) if (err != nil) != tt.want.err { t.Errorf("WranglePluginsAndGenerateFinalList() error = %v, wantErr %v", err, tt.want.err) return