Skip to content

Commit

Permalink
Support DeleteResourceOnFinalize k8s plugin config (flyteorg#278)
Browse files Browse the repository at this point in the history
* Support DeleteResourceOnFinalize k8s plugin config

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Support per plugin DisableDeleteResourceOnFinalize

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Update flyteplugins released version

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Actually use the latest flyteplugins

Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Jun 16, 2021
1 parent 5c82679 commit 6770354
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 11 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.19.2
github.com/flyteorg/flyteplugins v0.5.54
github.com/flyteorg/flyteplugins v0.5.55
github.com/flyteorg/flytestdlib v0.3.17
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/flyteorg/flyteidl v0.18.48/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.2 h1:jXuRrLJEzSo33N9pw7bMEd6mRYSL7LCz/vnazz5XcOg=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.54 h1:QQRh4RRnLxW89A/D/SjPmbPETTp2ypNZnHpGGg1vA84=
github.com/flyteorg/flyteplugins v0.5.54/go.mod h1:dcAWfANpOlrPemHmegNXUhrkWjVWIPvLGaX6rHPlA/E=
github.com/flyteorg/flyteplugins v0.5.55 h1:IfxegVW8Cp+btlXiiIpdZZJmr4ENM5NjKAEbwBlnMcA=
github.com/flyteorg/flyteplugins v0.5.55/go.mod h1:dcAWfANpOlrPemHmegNXUhrkWjVWIPvLGaX6rHPlA/E=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.17 h1:7OexDLAjTBzJNGMmKKFmUTkss0I9IFo1LdTMpvH4qqA=
github.com/flyteorg/flytestdlib v0.3.17/go.mod h1:VlbQuHTE+z2N5qusfwi+6WEkeJoqr8Q0E4NtBAsdwkU=
Expand Down
44 changes: 36 additions & 8 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,28 @@ func (e *PluginManager) ClearFinalizers(ctx context.Context, o client.Object) er
return nil
}

func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error {
// If you change InjectFinalizer on the
if config.GetK8sPluginConfig().InjectFinalizer {
o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata())
func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (err error) {
errs := stdErrors.ErrorCollection{}
var o client.Object
var nsName k8stypes.NamespacedName
cfg := config.GetK8sPluginConfig()
if cfg.InjectFinalizer || cfg.DeleteResourceOnFinalize {
o, err = e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata())
if err != nil {
// This will recurrent, so we will skip further finalize
logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
return nil
}

e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
nsName = k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
}

// In InjectFinalizer is on, it means we may have added the finalizers when we launched this resource. Attempt to
// clear them to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// after the resource was created, we will not find any finalizers to clear and the object may have already been
// deleted at this point. Therefore, account for these cases and do not consider them errors.
if cfg.InjectFinalizer {
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
if IsK8sObjectNotExists(err) {
Expand All @@ -389,10 +399,26 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu
// the same event (idempotent) and then come here again...
err = e.ClearFinalizers(ctx, o)
if err != nil {
return err
errs.Append(err)
}
}
return nil

// If we should delete the resource when finalize is called, do a best effort delete.
if cfg.DeleteResourceOnFinalize && !e.plugin.GetProperties().DisableDeleteResourceOnFinalize {
// Attempt to delete resource, if not found, return success.
if err := e.kubeClient.GetClient().Delete(ctx, o); err != nil {
if IsK8sObjectNotExists(err) {
return errs.ErrorOrDefault()
}

// This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a
// Pod does not exist error. This should be retried using the retry policy
logger.Warningf(ctx, "Failed in finalizing. Failed to delete Resource with name: %v. Error: %v", nsName, err)
errs.Append(fmt.Errorf("finalize: failed to delete resource with name [%v]. Error: %w", nsName, err))
}
}

return errs.ErrorOrDefault()
}

func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, backOffController *backoff.Controller,
Expand All @@ -406,7 +432,9 @@ func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupCont
}

// Creates a K8s generic task executor. This provides an easier way to build task executors that create K8s resources.
func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, monitorIndex *ResourceMonitorIndex) (*PluginManager, error) {
func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry,
monitorIndex *ResourceMonitorIndex) (*PluginManager, error) {

if iCtx.EnqueueOwner() == nil {
return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,85 @@ func TestResourceManagerConstruction(t *testing.T) {
assert.NotNil(t, rm)
}

func TestFinalize(t *testing.T) {
t.Run("DeleteResourceOnFinalize=True", func(t *testing.T) {
ctx := context.Background()
sCtx := &pluginsCoreMock.SetupContext{}
fakeKubeClient := mocks.NewFakeKubeClient()
sCtx.OnKubeClient().Return(fakeKubeClient)

assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{DeleteResourceOnFinalize: true}))
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true})
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
o := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "abc",
Namespace: "xyz",
},
}

assert.NoError(t, fakeKubeClient.GetClient().Create(ctx, o))

p.OnBuildIdentityResource(ctx, tctx.TaskExecutionMetadata()).Return(o, nil)
pluginManager := PluginManager{plugin: &p, kubeClient: fakeKubeClient}
actualO := &v1.Pod{}
// Assert the object exists before calling finalize
assert.NoError(t, fakeKubeClient.GetClient().Get(ctx, k8stypes.NamespacedName{
Name: o.Name,
Namespace: o.Namespace,
}, actualO))

// Finalize should now delete the object
assert.NoError(t, pluginManager.Finalize(ctx, tctx))

// Assert the object is now deleted.
assert.Error(t, fakeKubeClient.GetClient().Get(ctx, k8stypes.NamespacedName{
Name: o.Name,
Namespace: o.Namespace,
}, actualO))
})

t.Run("DeleteResourceOnFinalize=False", func(t *testing.T) {
ctx := context.Background()
sCtx := &pluginsCoreMock.SetupContext{}
fakeKubeClient := mocks.NewFakeKubeClient()
sCtx.OnKubeClient().Return(fakeKubeClient)

assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{DeleteResourceOnFinalize: false}))
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true})
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
o := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "abc",
Namespace: "xyz",
},
}

assert.NoError(t, fakeKubeClient.GetClient().Create(ctx, o))

p.OnBuildIdentityResource(ctx, tctx.TaskExecutionMetadata()).Return(o, nil)
pluginManager := PluginManager{plugin: &p, kubeClient: fakeKubeClient}
actualO := &v1.Pod{}
// Assert the object exists before calling finalize
assert.NoError(t, fakeKubeClient.GetClient().Get(ctx, k8stypes.NamespacedName{
Name: o.Name,
Namespace: o.Namespace,
}, actualO))

// Finalize should now delete the object
assert.NoError(t, pluginManager.Finalize(ctx, tctx))

// Assert the object is still here.
assert.NoError(t, fakeKubeClient.GetClient().Get(ctx, k8stypes.NamespacedName{
Name: o.Name,
Namespace: o.Namespace,
}, actualO))
})

}

func init() {
labeled.SetMetricKeys(contextutils.ProjectKey)
}

0 comments on commit 6770354

Please sign in to comment.