From 727a4d02afc10ce18c69d601f9f52f486a288f90 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 2 Mar 2021 15:11:42 +0000 Subject: [PATCH 1/7] Add support for custom KubeClient Signed-off-by: Filipe Regadas --- .../nodes/task/k8s/plugin_manager.go | 76 ++++++++++++------- .../nodes/task/k8s/plugin_manager_test.go | 64 ++++++++++++++-- 2 files changed, 108 insertions(+), 32 deletions(-) diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 048852b2f..68d1ea6e2 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -84,18 +84,6 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics { } } -func AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { - o.SetNamespace(taskCtx.GetNamespace()) - o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) - o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) - o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) - o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) - if cfg.InjectFinalizer { - f := append(o.GetFinalizers(), finalizer) - o.SetFinalizers(f) - } -} - func IsK8sObjectNotExists(err error) bool { return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err) } @@ -109,8 +97,25 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor + ignoreOwnerReferences bool +} + +func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o k8s.Resource, cfg *config.K8sPluginConfig) { + o.SetNamespace(taskCtx.GetNamespace()) + o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) + o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) + o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) + + if !e.ignoreOwnerReferences { + o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) + } + + if cfg.InjectFinalizer { + f := append(o.GetFinalizers(), finalizer) + o.SetFinalizers(f) + } } func (e *PluginManager) GetProperties() pluginsCore.PluginProperties { @@ -175,7 +180,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.UnknownTransition, err } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GetObjectKind().GroupVersionKind(), o.GetNamespace(), o.GetName()) key := backoff.ComposeResourceKey(o) @@ -227,7 +232,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { @@ -314,7 +319,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution return nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) err = e.kubeClient.GetClient().Delete(ctx, o) if err != nil && !IsK8sObjectNotExists(err) { @@ -352,7 +357,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu return nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { @@ -392,7 +397,19 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.") } - if iCtx.KubeClient() == nil { + kubeClient := iCtx.KubeClient() + if entry.NewKubeClient != nil { + kc, err := entry.NewKubeClient(ctx) + if err != nil { + return nil, err + } + + if kc != nil { + kubeClient = kc + } + } + + if kubeClient == nil { return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize K8sResource Plugin, Kubeclient cannot be nil!") } @@ -401,14 +418,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry Type: entry.ResourceToWatch, } - ownerKind := iCtx.OwnerKind() workflowParentPredicate := func(o metav1.Object) bool { + if entry.IgnoreOwnerReferences { + return true + } + ownerReference := metav1.GetControllerOf(o) if ownerReference != nil { - if ownerReference.Kind == ownerKind { + if ownerReference.Kind == iCtx.OwnerKind() { return true } } + return false } @@ -492,12 +513,13 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: iCtx.KubeClient(), - resourceLevelMonitor: rm, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, + ignoreOwnerReferences: entry.IgnoreOwnerReferences, }, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 221f3d751..5ab0aa091 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -202,7 +202,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase()) createdPod := &v1.Pod{} - AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tctx.TaskExecutionMetadata().GetNamespace(), Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod)) assert.Equal(t, tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name) @@ -223,7 +223,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.NoError(t, err) createdPod := &v1.Pod{} - AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Create(ctx, createdPod)) transition, err := pluginManager.Handle(ctx, tctx) @@ -349,7 +349,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { // Build a reference resource that is supposed to be identical to the resource built by pluginManager referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx) assert.NoError(t, err) - AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) refKey := backoff.ComposeResourceKey(referenceResource) podBackOffHandler, found := backOffController.GetBackOffHandler(refKey) assert.True(t, found) @@ -545,7 +545,61 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { } } -func TestAddObjectMetadata(t *testing.T) { +func TestPluginManager_CustomKubeClient(t *testing.T) { + ctx := context.TODO() + tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted) + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil) + fakeClient := fake.NewFakeClient() + newFakeClient := &pluginsCoreMock.KubeClient{} + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + NewKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { + return newFakeClient, nil + }, + }, NewResourceMonitorIndex()) + assert.NoError(t, err) + + assert.Equal(t, newFakeClient, pluginManager.kubeClient) +} + +func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { + pluginManager := PluginManager{ignoreOwnerReferences: true} + genName := "genName" + execID := &pluginsCoreMock.TaskExecutionID{} + execID.On("GetGeneratedName").Return(genName) + tm := &pluginsCoreMock.TaskExecutionMetadata{} + tm.On("GetTaskExecutionID").Return(execID) + or := v12.OwnerReference{} + tm.On("GetOwnerReference").Return(or) + ns := "ns" + tm.On("GetNamespace").Return(ns) + tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"}) + + l := map[string]string{ + "l1": "lv1", + } + tm.On("GetLabels").Return(l) + + o := &v1.Pod{} + cfg := config.GetK8sPluginConfig() + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) +} + +func TestPluginManager_AddObjectMetadata(t *testing.T) { + pluginManager := PluginManager{} genName := "genName" execID := &pluginsCoreMock.TaskExecutionID{} execID.On("GetGeneratedName").Return(genName) @@ -564,7 +618,7 @@ func TestAddObjectMetadata(t *testing.T) { o := &v1.Pod{} cfg := config.GetK8sPluginConfig() - AddObjectMetadata(tm, o, cfg) + pluginManager.AddObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) assert.Equal(t, ns, o.GetNamespace()) From 4cd03f4c5ffadd39f0c035c1c19abd3448706479 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 18 Mar 2021 14:12:21 +0000 Subject: [PATCH 2/7] Review and rebase Signed-off-by: Filipe Regadas --- go.mod | 2 +- go.sum | 4 +- .../nodes/task/k8s/plugin_manager.go | 35 ++++---- .../nodes/task/k8s/plugin_manager_test.go | 83 +++++++++++++------ pkg/controller/nodes/task/taskexec_context.go | 8 ++ 5 files changed, 88 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 21eac55f1..395e37852 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.20 - github.com/flyteorg/flyteplugins v0.5.35 + github.com/flyteorg/flyteplugins v0.5.37 github.com/flyteorg/flytestdlib v0.3.13 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index fb8c58ce8..249aedc30 100644 --- a/go.sum +++ b/go.sum @@ -233,8 +233,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= github.com/flyteorg/flyteidl v0.18.20 h1:OGOb2FOHWL363Qp8uzbJeFbQBKYPT30+afv+8BnBlGs= github.com/flyteorg/flyteidl v0.18.20/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteplugins v0.5.35 h1:KEMOiA4B+lIxQ+l7FRHzVcPA234Td9+ursuJDm6I8dg= -github.com/flyteorg/flyteplugins v0.5.35/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo= +github.com/flyteorg/flyteplugins v0.5.37 h1:9kl91k2xG5QJbhdSwoTa7XFnLIynBsidcwBlbsysr5s= +github.com/flyteorg/flyteplugins v0.5.37/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 68d1ea6e2..795453e72 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -97,22 +97,24 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor - ignoreOwnerReferences bool + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor + overrideInjectOwnerReferences *bool + overrideInjectFinalizer *bool } -func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o k8s.Resource, cfg *config.K8sPluginConfig) { +func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { o.SetNamespace(taskCtx.GetNamespace()) o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) - if !e.ignoreOwnerReferences { + if e.overrideInjectOwnerReferences != nil && *e.overrideInjectOwnerReferences { o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) } - if cfg.InjectFinalizer { + overrideInjectFinalizer := e.overrideInjectFinalizer + if (overrideInjectFinalizer != nil && *overrideInjectFinalizer) || cfg.InjectFinalizer { f := append(o.GetFinalizers(), finalizer) o.SetFinalizers(f) } @@ -398,8 +400,8 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry } kubeClient := iCtx.KubeClient() - if entry.NewKubeClient != nil { - kc, err := entry.NewKubeClient(ctx) + if entry.CustomKubeClient != nil { + kc, err := entry.CustomKubeClient(ctx) if err != nil { return nil, err } @@ -419,7 +421,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry } workflowParentPredicate := func(o metav1.Object) bool { - if entry.IgnoreOwnerReferences { + if entry.OverrideInjectOwnerReferences != nil && *entry.OverrideInjectOwnerReferences { return true } @@ -513,13 +515,14 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: kubeClient, - resourceLevelMonitor: rm, - ignoreOwnerReferences: entry.IgnoreOwnerReferences, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, + overrideInjectOwnerReferences: entry.OverrideInjectOwnerReferences, + overrideInjectFinalizer: entry.OverrideInjectFinalizer, }, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 5ab0aa091..7ef52c20b 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -160,6 +160,25 @@ func getMockTaskExecutionMetadata() pluginsCore.TaskExecutionMetadata { return taskExecutionMetadata } +func getMockTaskExecutionMetadataCustom( + tid string, + ns string, + annotations map[string]string, + labels map[string]string, + ownerRef v12.OwnerReference) pluginsCore.TaskExecutionMetadata { + taskExecutionMetadata := &pluginsCoreMock.TaskExecutionMetadata{} + taskExecutionMetadata.On("GetNamespace").Return(ns) + taskExecutionMetadata.On("GetAnnotations").Return(annotations) + taskExecutionMetadata.On("GetLabels").Return(labels) + taskExecutionMetadata.On("GetOwnerReference").Return(ownerRef) + + id := &pluginsCoreMock.TaskExecutionID{} + id.On("GetGeneratedName").Return(tid) + id.On("GetID").Return(core.TaskExecutionIdentifier{}) + taskExecutionMetadata.On("GetTaskExecutionID").Return(id) + return taskExecutionMetadata +} + func dummySetupContext(fakeClient client.Client) pluginsCore.SetupContext { setupContext := &pluginsCoreMock.SetupContext{} var enqueueOwnerFunc = pluginsCore.EnqueueOwner(func(ownerId k8stypes.NamespacedName) error { return nil }) @@ -567,22 +586,43 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { } func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { - pluginManager := PluginManager{ignoreOwnerReferences: true} + override := true + pluginManager := PluginManager{overrideInjectOwnerReferences: &override} genName := "genName" - execID := &pluginsCoreMock.TaskExecutionID{} - execID.On("GetGeneratedName").Return(genName) - tm := &pluginsCoreMock.TaskExecutionMetadata{} - tm.On("GetTaskExecutionID").Return(execID) - or := v12.OwnerReference{} - tm.On("GetOwnerReference").Return(or) ns := "ns" - tm.On("GetNamespace").Return(ns) - tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"}) + or := v12.OwnerReference{} + l := map[string]string{"l1": "lv1"} + a := map[string]string{"aKey": "aVal"} + tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) - l := map[string]string{ - "l1": "lv1", + o := &v1.Pod{} + cfg := config.GetK8sPluginConfig() + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) +} + +func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { + overrideInjectOwnerRef := true + overrideInjectFinalizer := true + pluginManager := PluginManager{ + overrideInjectOwnerReferences: &overrideInjectOwnerRef, + overrideInjectFinalizer: &overrideInjectFinalizer, } - tm.On("GetLabels").Return(l) + genName := "genName" + ns := "ns" + or := v12.OwnerReference{} + l := map[string]string{"l1": "lv1"} + a := map[string]string{"aKey": "aVal"} + tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) o := &v1.Pod{} cfg := config.GetK8sPluginConfig() @@ -596,25 +636,17 @@ func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { "aKey": "aVal", }, o.GetAnnotations()) assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 1, len(o.GetFinalizers())) } func TestPluginManager_AddObjectMetadata(t *testing.T) { pluginManager := PluginManager{} genName := "genName" - execID := &pluginsCoreMock.TaskExecutionID{} - execID.On("GetGeneratedName").Return(genName) - tm := &pluginsCoreMock.TaskExecutionMetadata{} - tm.On("GetTaskExecutionID").Return(execID) - or := v12.OwnerReference{} - tm.On("GetOwnerReference").Return(or) ns := "ns" - tm.On("GetNamespace").Return(ns) - tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"}) - - l := map[string]string{ - "l1": "lv1", - } - tm.On("GetLabels").Return(l) + or := v12.OwnerReference{} + l := map[string]string{"l1": "lv1"} + a := map[string]string{"aKey": "aVal"} + tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) o := &v1.Pod{} cfg := config.GetK8sPluginConfig() @@ -627,6 +659,7 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { "aKey": "aVal", }, o.GetAnnotations()) assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) } func TestResourceManagerConstruction(t *testing.T) { diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index 39a60e0c8..4495c1e24 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -64,6 +64,14 @@ func (t taskExecutionMetadata) GetMaxAttempts() uint32 { return t.maxAttempts } +func (t taskExecutionMetadata) GetSecurityContext() core.SecurityContext { + return core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: t.GetK8sServiceAccount(), + }, + } +} + type taskExecutionContext struct { handler.NodeExecutionContext tm taskExecutionMetadata From 26342001c9fbfd3493bd666d4037cf577194de3a Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 18 Mar 2021 16:17:56 +0000 Subject: [PATCH 3/7] Fix test Signed-off-by: Filipe Regadas --- pkg/controller/nodes/task/k8s/plugin_manager_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 7ef52c20b..82c6eb621 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -570,13 +570,13 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil) - fakeClient := fake.NewFakeClient() + fakeClient := fake.NewClientBuilder().Build() newFakeClient := &pluginsCoreMock.KubeClient{} pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - NewKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { + CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { return newFakeClient, nil }, }, NewResourceMonitorIndex()) From bd6a8f0872544a8add555f1c6a9d545d73e09cd6 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 18 Mar 2021 20:24:21 +0000 Subject: [PATCH 4/7] Update and fix conditions Signed-off-by: Filipe Regadas --- go.mod | 2 +- go.sum | 4 +- .../nodes/task/k8s/plugin_manager.go | 31 ++++++----- .../nodes/task/k8s/plugin_manager_test.go | 52 +++++++++++++------ 4 files changed, 53 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 395e37852..0954f63dd 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.20 - github.com/flyteorg/flyteplugins v0.5.37 + github.com/flyteorg/flyteplugins v0.5.38 github.com/flyteorg/flytestdlib v0.3.13 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index 249aedc30..8e92f5de2 100644 --- a/go.sum +++ b/go.sum @@ -233,8 +233,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= github.com/flyteorg/flyteidl v0.18.20 h1:OGOb2FOHWL363Qp8uzbJeFbQBKYPT30+afv+8BnBlGs= github.com/flyteorg/flyteidl v0.18.20/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteplugins v0.5.37 h1:9kl91k2xG5QJbhdSwoTa7XFnLIynBsidcwBlbsysr5s= -github.com/flyteorg/flyteplugins v0.5.37/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo= +github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw= +github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 795453e72..2a38ab430 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -97,10 +97,10 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor - overrideInjectOwnerReferences *bool - overrideInjectFinalizer *bool + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor + disableInjectOwnerReferences bool + disableInjectFinalizer bool } func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { @@ -109,12 +109,11 @@ func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) - if e.overrideInjectOwnerReferences != nil && *e.overrideInjectOwnerReferences { + if !e.disableInjectOwnerReferences { o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) } - overrideInjectFinalizer := e.overrideInjectFinalizer - if (overrideInjectFinalizer != nil && *overrideInjectFinalizer) || cfg.InjectFinalizer { + if cfg.InjectFinalizer && !e.disableInjectFinalizer { f := append(o.GetFinalizers(), finalizer) o.SetFinalizers(f) } @@ -421,7 +420,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry } workflowParentPredicate := func(o metav1.Object) bool { - if entry.OverrideInjectOwnerReferences != nil && *entry.OverrideInjectOwnerReferences { + if entry.DisableInjectOwnerReferences { return true } @@ -515,14 +514,14 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: kubeClient, - resourceLevelMonitor: rm, - overrideInjectOwnerReferences: entry.OverrideInjectOwnerReferences, - overrideInjectFinalizer: entry.OverrideInjectFinalizer, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, + disableInjectOwnerReferences: entry.DisableInjectOwnerReferences, + disableInjectFinalizer: entry.DisableInjectFinalizer, }, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 82c6eb621..504e28332 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -586,8 +586,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { } func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { - override := true - pluginManager := PluginManager{overrideInjectOwnerReferences: &override} + pluginManager := PluginManager{disableInjectOwnerReferences: true} genName := "genName" ns := "ns" or := v12.OwnerReference{} @@ -611,11 +610,9 @@ func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { } func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { - overrideInjectOwnerRef := true - overrideInjectFinalizer := true pluginManager := PluginManager{ - overrideInjectOwnerReferences: &overrideInjectOwnerRef, - overrideInjectFinalizer: &overrideInjectFinalizer, + disableInjectOwnerReferences: true, + disableInjectFinalizer: true, } genName := "genName" ns := "ns" @@ -626,17 +623,38 @@ func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { o := &v1.Pod{} cfg := config.GetK8sPluginConfig() - pluginManager.AddObjectMetadata(tm, o, cfg) - assert.Equal(t, genName, o.GetName()) - // empty OwnerReference since we are ignoring - assert.Equal(t, 0, len(o.GetOwnerReferences())) - assert.Equal(t, ns, o.GetNamespace()) - assert.Equal(t, map[string]string{ - "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", - "aKey": "aVal", - }, o.GetAnnotations()) - assert.Equal(t, l, o.GetLabels()) - assert.Equal(t, 1, len(o.GetFinalizers())) + + t.Run("Disable enbaled InjectFinalizer", func(t *testing.T) { + // enable finalizer injection + cfg.InjectFinalizer = true + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) + + t.Run("Disable disabled InjectFinalizer", func(t *testing.T) { + // enable finalizer injection + cfg.InjectFinalizer = false + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) } func TestPluginManager_AddObjectMetadata(t *testing.T) { From 8f5c77e21cd07b3565d5a650e1056a7177ce6a37 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 18 Mar 2021 20:31:19 +0000 Subject: [PATCH 5/7] fix typo Signed-off-by: Filipe Regadas --- pkg/controller/nodes/task/k8s/plugin_manager_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 504e28332..4f922dfa0 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -624,7 +624,7 @@ func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { o := &v1.Pod{} cfg := config.GetK8sPluginConfig() - t.Run("Disable enbaled InjectFinalizer", func(t *testing.T) { + t.Run("Disable enabled InjectFinalizer", func(t *testing.T) { // enable finalizer injection cfg.InjectFinalizer = true pluginManager.AddObjectMetadata(tm, o, cfg) @@ -641,7 +641,7 @@ func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { }) t.Run("Disable disabled InjectFinalizer", func(t *testing.T) { - // enable finalizer injection + // disable finalizer injection cfg.InjectFinalizer = false pluginManager.AddObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) From 8650e249a2a6efad92bba30e54186f61bdfd131a Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 18 Mar 2021 21:02:59 +0000 Subject: [PATCH 6/7] Rework test Signed-off-by: Filipe Regadas --- .../nodes/task/k8s/plugin_manager_test.go | 88 ++++++++----------- 1 file changed, 36 insertions(+), 52 deletions(-) diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 4f922dfa0..b0469528f 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -585,8 +585,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { assert.Equal(t, newFakeClient, pluginManager.kubeClient) } -func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { - pluginManager := PluginManager{disableInjectOwnerReferences: true} +func TestPluginManager_AddObjectMetadata(t *testing.T) { genName := "genName" ns := "ns" or := v12.OwnerReference{} @@ -594,43 +593,48 @@ func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { a := map[string]string{"aKey": "aVal"} tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) - o := &v1.Pod{} cfg := config.GetK8sPluginConfig() - pluginManager.AddObjectMetadata(tm, o, cfg) - assert.Equal(t, genName, o.GetName()) - // empty OwnerReference since we are ignoring - assert.Equal(t, 0, len(o.GetOwnerReferences())) - assert.Equal(t, ns, o.GetNamespace()) - assert.Equal(t, map[string]string{ - "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", - "aKey": "aVal", - }, o.GetAnnotations()) - assert.Equal(t, l, o.GetLabels()) - assert.Equal(t, 0, len(o.GetFinalizers())) -} -func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { - pluginManager := PluginManager{ - disableInjectOwnerReferences: true, - disableInjectFinalizer: true, - } - genName := "genName" - ns := "ns" - or := v12.OwnerReference{} - l := map[string]string{"l1": "lv1"} - a := map[string]string{"aKey": "aVal"} - tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) + t.Run("default", func(t *testing.T) { + o := &v1.Pod{} + pluginManager := PluginManager{} + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) - o := &v1.Pod{} - cfg := config.GetK8sPluginConfig() + t.Run("Disable OwnerReferences injection", func(t *testing.T) { + pluginManager := PluginManager{disableInjectOwnerReferences: true} + o := &v1.Pod{} + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) t.Run("Disable enabled InjectFinalizer", func(t *testing.T) { + pluginManager := PluginManager{disableInjectFinalizer: true} // enable finalizer injection cfg.InjectFinalizer = true + o := &v1.Pod{} pluginManager.AddObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring - assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, 1, len(o.GetOwnerReferences())) assert.Equal(t, ns, o.GetNamespace()) assert.Equal(t, map[string]string{ "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", @@ -641,12 +645,14 @@ func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { }) t.Run("Disable disabled InjectFinalizer", func(t *testing.T) { + pluginManager := PluginManager{disableInjectFinalizer: true} // disable finalizer injection cfg.InjectFinalizer = false + o := &v1.Pod{} pluginManager.AddObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring - assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, 1, len(o.GetOwnerReferences())) assert.Equal(t, ns, o.GetNamespace()) assert.Equal(t, map[string]string{ "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", @@ -655,29 +661,7 @@ func TestPluginManager_AddObjectMetadata_InjectFinalizer(t *testing.T) { assert.Equal(t, l, o.GetLabels()) assert.Equal(t, 0, len(o.GetFinalizers())) }) -} -func TestPluginManager_AddObjectMetadata(t *testing.T) { - pluginManager := PluginManager{} - genName := "genName" - ns := "ns" - or := v12.OwnerReference{} - l := map[string]string{"l1": "lv1"} - a := map[string]string{"aKey": "aVal"} - tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) - - o := &v1.Pod{} - cfg := config.GetK8sPluginConfig() - pluginManager.AddObjectMetadata(tm, o, cfg) - assert.Equal(t, genName, o.GetName()) - assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) - assert.Equal(t, ns, o.GetNamespace()) - assert.Equal(t, map[string]string{ - "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", - "aKey": "aVal", - }, o.GetAnnotations()) - assert.Equal(t, l, o.GetLabels()) - assert.Equal(t, 0, len(o.GetFinalizers())) } func TestResourceManagerConstruction(t *testing.T) { From b9f8f50b5521bcea951d451e55f929f79c9b0dad Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 18 Mar 2021 21:45:18 +0000 Subject: [PATCH 7/7] rebase master Signed-off-by: Filipe Regadas --- pkg/controller/nodes/task/taskexec_context.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index 4495c1e24..39a60e0c8 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -64,14 +64,6 @@ func (t taskExecutionMetadata) GetMaxAttempts() uint32 { return t.maxAttempts } -func (t taskExecutionMetadata) GetSecurityContext() core.SecurityContext { - return core.SecurityContext{ - RunAs: &core.Identity{ - K8SServiceAccount: t.GetK8sServiceAccount(), - }, - } -} - type taskExecutionContext struct { handler.NodeExecutionContext tm taskExecutionMetadata