From b705844ea0a518a18eda55c705b4d0e24f0df07a Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 14 Sep 2021 13:39:16 -0400 Subject: [PATCH] Use PluginCleanupPolicy from flyteorg/flyteplugins#203 (#311) * use PluginCleanupPolicy if it exists Signed-off-by: Claire McGinty * add unit test Signed-off-by: Claire McGinty * cleanup test code Signed-off-by: Claire McGinty * assert OnAbort is not attempted by default Signed-off-by: Claire McGinty * update plugin override interface Signed-off-by: Claire McGinty * lint Signed-off-by: Claire McGinty * Apply PR suggestions Signed-off-by: Claire McGinty * update flyteplugins lib Signed-off-by: Claire McGinty --- flytepropeller/go.mod | 2 +- flytepropeller/go.sum | 4 +- .../nodes/task/k8s/plugin_manager.go | 34 ++++- .../nodes/task/k8s/plugin_manager_test.go | 136 ++++++++++++++++++ 4 files changed, 171 insertions(+), 5 deletions(-) diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index 897c2f024..48e4211ea 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.21.0 - github.com/flyteorg/flyteplugins v0.6.0 + github.com/flyteorg/flyteplugins v0.6.1 github.com/flyteorg/flytestdlib v0.3.34 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/flytepropeller/go.sum b/flytepropeller/go.sum index 67ee85bbd..723576d50 100644 --- a/flytepropeller/go.sum +++ b/flytepropeller/go.sum @@ -231,8 +231,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.21.0 h1:AwHNusfxJMfRRSDk2QWfb3aIlyLJrFWVGtpXCbCtJ5A= github.com/flyteorg/flyteidl v0.21.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteplugins v0.6.0 h1:eoMmqJIw3K+J4JWokDcd4Y1YGLiicE6p5vEYhOUHZ4s= -github.com/flyteorg/flyteplugins v0.6.0/go.mod h1:rPzV/KS6h0BkgK0Z+CnO6JjY58tzUdYvDLMYS10IKG0= +github.com/flyteorg/flyteplugins v0.6.1 h1:Mq9uM/IN6fXHo03NlXSa+to2GHEom2NAcRWlr+bVH6g= +github.com/flyteorg/flyteplugins v0.6.1/go.mod h1:rPzV/KS6h0BkgK0Z+CnO6JjY58tzUdYvDLMYS10IKG0= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.3.33/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q= github.com/flyteorg/flytestdlib v0.3.34 h1:OOuV03X8c1AWInzBU6IRsqpEF6y8WDJngbPcdL4VktY= diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index 2e91939e6..a5e67c80e 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -340,10 +340,40 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) - err = e.kubeClient.GetClient().Delete(ctx, o) + deleteResource := true + abortOverride, hasAbortOverride := e.plugin.(k8s.PluginAbortOverride) + + resourceToFinalize := o + var behavior k8s.AbortBehavior + + if hasAbortOverride { + behavior, err = abortOverride.OnAbort(ctx, tCtx, o) + deleteResource = err == nil && behavior.DeleteResource + if err == nil && behavior.Resource != nil { + resourceToFinalize = behavior.Resource + } + } + + if err != nil { + } else if deleteResource { + err = e.kubeClient.GetClient().Delete(ctx, resourceToFinalize) + } else { + if behavior.Patch != nil && behavior.Update == nil { + err = e.kubeClient.GetClient().Patch(ctx, resourceToFinalize, behavior.Patch.Patch, behavior.Patch.Options...) + } else if behavior.Patch == nil && behavior.Update != nil { + err = e.kubeClient.GetClient().Update(ctx, resourceToFinalize, behavior.Update.Options...) + } else { + err = errors.Errorf(errors.RuntimeFailure, "AbortBehavior for resource %v must specify either a Patch and an Update operation if Delete is set to false. Only one can be supplied.", resourceToFinalize.GetName()) + } + if behavior.DeleteOnErr && err != nil { + logger.Warningf(ctx, "Failed to apply AbortBehavior for resource %v with error %v. Will attempt to delete resource.", resourceToFinalize.GetName(), err) + err = e.kubeClient.GetClient().Delete(ctx, resourceToFinalize) + } + } + if err != nil && !IsK8sObjectNotExists(err) { logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", - o.GetNamespace(), o.GetName(), err) + resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err) return err } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 443021acd..4ef7a759c 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -45,6 +45,8 @@ type extendedFakeClient struct { CreateError error GetError error DeleteError error + PatchError error + UpdateError error } func (e extendedFakeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { @@ -69,6 +71,22 @@ func (e extendedFakeClient) Delete(ctx context.Context, obj client.Object, opts return e.Client.Delete(ctx, obj, opts...) } +func (e extendedFakeClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + if e.PatchError != nil { + return e.PatchError + } + + return e.Client.Patch(ctx, obj, patch, opts...) +} + +func (e extendedFakeClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if e.UpdateError != nil { + return e.UpdateError + } + + return e.Client.Update(ctx, obj, opts...) +} + type k8sSampleHandler struct { } @@ -88,6 +106,32 @@ func (k8sSampleHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.Plug panic("implement me") } +type pluginWithAbortOverride struct { + mock.Mock +} + +func (p *pluginWithAbortOverride) GetProperties() k8s.PluginProperties { + return p.Called().Get(0).(k8s.PluginProperties) +} + +func (p *pluginWithAbortOverride) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) { + panic("implement me") +} + +func (p *pluginWithAbortOverride) BuildIdentityResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionMetadata) (client.Object, error) { + args := p.Called(ctx, taskCtx) + return args.Get(0).(client.Object), args.Error(1) +} + +func (p *pluginWithAbortOverride) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) { + panic("implement me") +} + +func (p *pluginWithAbortOverride) OnAbort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, resource client.Object) (behavior k8s.AbortBehavior, err error) { + args := p.Called(ctx, tCtx, resource) + return args.Get(0).(k8s.AbortBehavior), args.Error(1) +} + func ExampleNewPluginManager() { sCtx := &pluginsCoreMock.SetupContext{} fakeKubeClient := mocks.NewFakeKubeClient() @@ -203,6 +247,32 @@ func dummySetupContext(fakeClient client.Client) pluginsCore.SetupContext { return setupContext } +func buildPluginWithAbortOverride(ctx context.Context, tctx pluginsCore.TaskExecutionContext, abortBehavior k8s.AbortBehavior, client client.Client) (*PluginManager, error) { + pluginResource := &v1.Pod{} + + mockResourceHandler := new(pluginWithAbortOverride) + + mockResourceHandler.On( + "OnAbort", ctx, tctx, pluginResource, + ).Return(abortBehavior, nil) + + mockResourceHandler.On( + "BuildIdentityResource", ctx, tctx.TaskExecutionMetadata(), + ).Return(pluginResource, nil) + + mockResourceHandler.On("GetProperties").Return(k8s.PluginProperties{}) + + mockClient := extendedFakeClient{ + Client: client, + } + + return NewPluginManager(ctx, dummySetupContext(mockClient), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: pluginResource, + Plugin: mockResourceHandler, + }, NewResourceMonitorIndex()) +} + func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { ctx := context.TODO() /*var tmpl *core.TaskTemplate @@ -419,6 +489,9 @@ func TestPluginManager_Abort(t *testing.T) { err = pluginManager.Abort(ctx, tctx) assert.NoError(t, err) + + // no custom cleanup policy has been specified + mockResourceHandler.AssertNumberOfCalls(t, "OnAbort", 0) }) t.Run("Abort Pod doesn't exist", func(t *testing.T) { @@ -441,6 +514,69 @@ func TestPluginManager_Abort(t *testing.T) { err = pluginManager.Abort(ctx, tctx) assert.NoError(t, err) }) + + t.Run("Abort Plugin has Patch PluginAbortOverride", func(t *testing.T) { + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + expectedErr := errors.New("client-side patch error") + pluginManager, err := buildPluginWithAbortOverride( + ctx, + tctx, + k8s.AbortBehaviorPatchDefaultResource(k8s.PatchResourceOperation{ + Patch: nil, + Options: nil, + }, false), + extendedFakeClient{ + DeleteError: errors.New( + "kubeClient.Delete() should not be called if custom cleanup policy exists"), + PatchError: expectedErr, + }) + + assert.NotNil(t, res) + assert.NoError(t, err) + + err = pluginManager.Abort(ctx, tctx) + assert.Equal(t, expectedErr, err) + }) + + t.Run("Abort Plugin has Update PluginAbortOverride", func(t *testing.T) { + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + expectedErr := errors.New("client-side update error") + pluginManager, err := buildPluginWithAbortOverride( + ctx, + tctx, + k8s.AbortBehaviorUpdateDefaultResource(k8s.UpdateResourceOperation{ + Options: nil, + }, false), + extendedFakeClient{ + DeleteError: errors.New( + "kubeClient.Delete() should not be called if custom cleanup policy exists"), + UpdateError: expectedErr, + }) + + assert.NotNil(t, res) + assert.NoError(t, err) + + err = pluginManager.Abort(ctx, tctx) + assert.Equal(t, expectedErr, err) + }) + + t.Run("Abort Plugin has Delete PluginAbortOverride", func(t *testing.T) { + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + expectedErr := errors.New("client-side delete error") + pluginManager, err := buildPluginWithAbortOverride( + ctx, + tctx, + k8s.AbortBehaviorDeleteDefaultResource(), + extendedFakeClient{ + DeleteError: expectedErr, + }) + + assert.NotNil(t, res) + assert.NoError(t, err) + + err = pluginManager.Abort(ctx, tctx) + assert.Equal(t, expectedErr, err) + }) } func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {