diff --git a/go.mod b/go.mod index 5c0070747..fad812291 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.25 - github.com/flyteorg/flyteplugins v0.5.39 + github.com/flyteorg/flyteplugins v0.5.40 github.com/flyteorg/flytestdlib v0.3.13 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index eb8e9eaa6..cd3a42f2f 100644 --- a/go.sum +++ b/go.sum @@ -72,7 +72,6 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA= @@ -233,8 +232,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8= github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteplugins v0.5.39 h1:nN8lK4SBtK3FvxSKHDiH/caNwTlb0V+DWAOIMCeFcu0= -github.com/flyteorg/flyteplugins v0.5.39/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= +github.com/flyteorg/flyteplugins v0.5.40 h1:3Vaat/CzMv87hIuloVRKsPussO0271TUmtbCzBMTAN8= +github.com/flyteorg/flyteplugins v0.5.40/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= @@ -1229,7 +1228,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78= k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78= diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index 6e190de74..a5c22039f 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -214,7 +214,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error { sCtxFinal := newNameSpacedSetupCtx( tSCtx, newResourceManagerBuilder.GetResourceRegistrar(pluginResourceNamespacePrefix)) logger.Infof(ctx, "Loading Plugin [%s] ENABLED", p.ID) - cp, err := p.LoadPlugin(ctx, sCtxFinal) + cp, err := pluginCore.LoadPlugin(ctx, sCtxFinal, p) if err != nil { return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID) } @@ -476,7 +476,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) logger.Infof(ctx, "Node level caching is disabled. Skipping catalog read.") } - tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID()) + tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p) if err != nil { return handler.UnknownTransition, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context") } @@ -662,7 +662,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r return errors.Wrapf(errors.UnsupportedTaskTypeError, nCtx.NodeID(), err, "unable to resolve plugin") } - tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID()) + tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p) if err != nil { return errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context") } @@ -718,7 +718,7 @@ func (t Handler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext return errors.Wrapf(errors.UnsupportedTaskTypeError, nCtx.NodeID(), err, "unable to resolve plugin") } - tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID()) + tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p) if err != nil { return errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context") } diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index d5981e4f3..00730ec91 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -106,16 +106,20 @@ func Test_task_Setup(t *testing.T) { corePluginType := "core" corePlugin := &pluginCoreMocks.Plugin{} corePlugin.On("GetID").Return(corePluginType) + corePlugin.OnGetProperties().Return(pluginCore.PluginProperties{}) corePluginDefaultType := "coredefault" corePluginDefault := &pluginCoreMocks.Plugin{} corePluginDefault.On("GetID").Return(corePluginDefaultType) + corePluginDefault.OnGetProperties().Return(pluginCore.PluginProperties{}) k8sPluginType := "k8s" k8sPlugin := &pluginK8sMocks.Plugin{} + k8sPlugin.OnGetProperties().Return(pluginK8s.PluginProperties{}) k8sPluginDefaultType := "k8sdefault" k8sPluginDefault := &pluginK8sMocks.Plugin{} + k8sPluginDefault.OnGetProperties().Return(pluginK8s.PluginProperties{}) corePluginEntry := pluginCore.PluginEntry{ ID: corePluginType, @@ -1173,7 +1177,10 @@ func Test_task_Handle_Barrier(t *testing.T) { assert.NoError(t, err) tk.resourceManager = noopRm - tctx, err := tk.newTaskExecutionContext(context.TODO(), nCtx, "plugin1") + p := &pluginCoreMocks.Plugin{} + p.On("GetID").Return("plugin1") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) + tctx, err := tk.newTaskExecutionContext(context.TODO(), nCtx, p) assert.NoError(t, err) id := tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() @@ -1322,12 +1329,14 @@ func Test_task_Abort(t *testing.T) { {"abort-fails", fields{defaultPluginCallback: func() pluginCore.Plugin { p := &pluginCoreMocks.Plugin{} p.On("GetID").Return("id") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) p.On("Abort", mock.Anything, mock.Anything).Return(fmt.Errorf("error")) return p }}, args{nil}, true, true}, {"abort-success", fields{defaultPluginCallback: func() pluginCore.Plugin { p := &pluginCoreMocks.Plugin{} p.On("GetID").Return("id") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) p.On("Abort", mock.Anything, mock.Anything).Return(nil) return p }}, args{ev: &fakeBufferedTaskEventRecorder{}}, false, true}, @@ -1464,12 +1473,14 @@ func Test_task_Abort_v1(t *testing.T) { {"abort-fails", fields{defaultPluginCallback: func() pluginCore.Plugin { p := &pluginCoreMocks.Plugin{} p.On("GetID").Return("id") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) p.On("Abort", mock.Anything, mock.Anything).Return(fmt.Errorf("error")) return p }}, args{nil}, true, true}, {"abort-success", fields{defaultPluginCallback: func() pluginCore.Plugin { p := &pluginCoreMocks.Plugin{} p.On("GetID").Return("id") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) p.On("Abort", mock.Anything, mock.Anything).Return(nil) return p }}, args{ev: &fakeBufferedTaskEventRecorder{}}, false, true}, @@ -1603,12 +1614,14 @@ func Test_task_Finalize(t *testing.T) { {"finalize-fails", fields{defaultPluginCallback: func() pluginCore.Plugin { p := &pluginCoreMocks.Plugin{} p.On("GetID").Return("id") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) p.On("Finalize", mock.Anything, mock.Anything).Return(fmt.Errorf("error")) return p }}, args{nCtx: nCtx}, true, true}, {"finalize-success", fields{defaultPluginCallback: func() pluginCore.Plugin { p := &pluginCoreMocks.Plugin{} p.On("GetID").Return("id") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) p.On("Finalize", mock.Anything, mock.Anything).Return(nil) return p }}, args{nCtx: nCtx}, false, true}, diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 1bf65bd4e..c9add56b4 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -97,10 +97,8 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor - disableInjectOwnerReferences bool - disableInjectFinalizer bool + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor } func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { @@ -109,18 +107,21 @@ func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) - if !e.disableInjectOwnerReferences { + if !e.plugin.GetProperties().DisableInjectOwnerReferences { o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) } - if cfg.InjectFinalizer && !e.disableInjectFinalizer { + if cfg.InjectFinalizer && !e.plugin.GetProperties().DisableInjectFinalizer { f := append(o.GetFinalizers(), finalizer) o.SetFinalizers(f) } } func (e *PluginManager) GetProperties() pluginsCore.PluginProperties { - return pluginsCore.PluginProperties{} + props := e.plugin.GetProperties() + return pluginsCore.PluginProperties{ + GeneratedNameMaxLength: props.GeneratedNameMaxLength, + } } func (e *PluginManager) GetID() string { @@ -432,7 +433,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry } workflowParentPredicate := func(o metav1.Object) bool { - if entry.DisableInjectOwnerReferences { + if entry.Plugin.GetProperties().DisableInjectOwnerReferences { return true } @@ -526,14 +527,12 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: kubeClient, - resourceLevelMonitor: rm, - disableInjectOwnerReferences: entry.DisableInjectOwnerReferences, - disableInjectFinalizer: entry.DisableInjectFinalizer, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, }, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 242e639cf..98ab54363 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -72,6 +72,10 @@ func (e extendedFakeClient) Delete(ctx context.Context, obj client.Object, opts type k8sSampleHandler struct { } +func (k8sSampleHandler) GetProperties() k8s.PluginProperties { + panic("implement me") +} + func (k8sSampleHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) { panic("implement me") } @@ -208,6 +212,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { tCtx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted) // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ @@ -236,6 +241,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted) // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ @@ -266,6 +272,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted) // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := extendedFakeClient{ Client: fake.NewClientBuilder().WithRuntimeObjects().Build(), @@ -299,6 +306,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted) // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := extendedFakeClient{ Client: fake.NewClientBuilder().WithRuntimeObjects().Build(), @@ -330,6 +338,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted) // Creating a mock k8s plugin mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: flytek8s.PodKind, @@ -397,6 +406,7 @@ func TestPluginManager_Abort(t *testing.T) { // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ @@ -417,6 +427,7 @@ func TestPluginManager_Abort(t *testing.T) { fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)} // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ @@ -538,6 +549,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { fc := tt.args.fakeClient() // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.On("BuildIdentityResource", mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) mockResourceHandler.On("GetTaskPhase", mock.Anything, mock.Anything, mock.Anything).Return(tt.args.getTaskPhaseCB()) pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ @@ -573,6 +585,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted) // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil) fakeClient := fake.NewClientBuilder().Build() newFakeClient := &pluginsCoreMock.KubeClient{} @@ -601,7 +614,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { t.Run("default", func(t *testing.T) { o := &v1.Pod{} - pluginManager := PluginManager{} + p := pluginsk8sMock.Plugin{} + p.OnGetProperties().Return(k8s.PluginProperties{}) + pluginManager := PluginManager{plugin: &p} pluginManager.AddObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) @@ -615,7 +630,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { }) t.Run("Disable OwnerReferences injection", func(t *testing.T) { - pluginManager := PluginManager{disableInjectOwnerReferences: true} + p := pluginsk8sMock.Plugin{} + p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectOwnerReferences: true}) + pluginManager := PluginManager{plugin: &p} o := &v1.Pod{} pluginManager.AddObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) @@ -631,7 +648,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { }) t.Run("Disable enabled InjectFinalizer", func(t *testing.T) { - pluginManager := PluginManager{disableInjectFinalizer: true} + p := pluginsk8sMock.Plugin{} + p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true}) + pluginManager := PluginManager{plugin: &p} // enable finalizer injection cfg.InjectFinalizer = true o := &v1.Pod{} @@ -649,7 +668,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { }) t.Run("Disable disabled InjectFinalizer", func(t *testing.T) { - pluginManager := PluginManager{disableInjectFinalizer: true} + p := pluginsk8sMock.Plugin{} + p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true}) + pluginManager := PluginManager{plugin: &p} // disable finalizer injection cfg.InjectFinalizer = false o := &v1.Pod{} diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index d2f222ef4..336b08c40 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -124,7 +124,7 @@ func (t taskExecutionContext) SecretManager() pluginCore.SecretManager { return t.sm } -func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, pluginID string) (*taskExecutionContext, error) { +func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, plugin pluginCore.Plugin) (*taskExecutionContext, error) { id := GetTaskExecutionIdentifier(nCtx) currentNodeUniqueID := nCtx.NodeID() @@ -136,7 +136,12 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node } } - uniqueID, err := utils.FixedLengthUniqueIDForParts(IDMaxLength, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(id.RetryAttempt))) + length := IDMaxLength + if l := plugin.GetProperties().GeneratedNameMaxLength; l != nil { + length = *l + } + + uniqueID, err := utils.FixedLengthUniqueIDForParts(length, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(id.RetryAttempt))) if err != nil { // SHOULD never really happen return nil, err @@ -157,7 +162,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node return nil, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "unable to initialize plugin state manager") } - resourceNamespacePrefix := pluginCore.ResourceNamespace(t.resourceManager.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(pluginID)) + resourceNamespacePrefix := pluginCore.ResourceNamespace(t.resourceManager.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(plugin.GetID())) maxAttempts := uint32(DefaultMaxAttempts) if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil { maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts) diff --git a/pkg/controller/nodes/task/taskexec_context_test.go b/pkg/controller/nodes/task/taskexec_context_test.go index 9c259207a..1c545a0ac 100644 --- a/pkg/controller/nodes/task/taskexec_context_test.go +++ b/pkg/controller/nodes/task/taskexec_context_test.go @@ -14,6 +14,7 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" + pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flytestdlib/promutils" @@ -121,7 +122,10 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { resourceManager: noopRm, } - got, err := tk.newTaskExecutionContext(context.TODO(), nCtx, "plugin1") + p := &pluginCoreMocks.Plugin{} + p.On("GetID").Return("plugin1") + p.OnGetProperties().Return(pluginCore.PluginProperties{}) + got, err := tk.newTaskExecutionContext(context.TODO(), nCtx, p) assert.NoError(t, err) assert.NotNil(t, got) @@ -168,4 +172,13 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { }, got.ResourceManager().(resourcemanager.TaskResourceManager).GetResourcePoolInfo()) assert.Nil(t, got.Catalog()) // assert.Equal(t, got.InputReader(), ir) + + anotherPlugin := &pluginCoreMocks.Plugin{} + anotherPlugin.On("GetID").Return("plugin2") + maxLength := 8 + anotherPlugin.OnGetProperties().Return(pluginCore.PluginProperties{ + GeneratedNameMaxLength: &maxLength, + }) + anotherTaskExecCtx, _ := tk.newTaskExecutionContext(context.TODO(), nCtx, anotherPlugin) + assert.Equal(t, anotherTaskExecCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), "fpmmhh6q") }