From 4e9ca5f4d21b496350079abecdd9499f976f4107 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 2 Mar 2021 15:11:42 +0000 Subject: [PATCH] Add support for custom KubeClient --- go.mod | 2 + .../nodes/task/k8s/plugin_manager.go | 76 ++++++++++++------- .../nodes/task/k8s/plugin_manager_test.go | 64 ++++++++++++++-- 3 files changed, 110 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index f152f03a7..eacc89f1b 100644 --- a/go.mod +++ b/go.mod @@ -62,4 +62,6 @@ replace ( k8s.io/api => github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f k8s.io/client-go => k8s.io/client-go v0.0.0-20191016111102-bec269661e48 + + github.com/lyft/flyteplugins => ../flyteplugins ) diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index ac2760dc2..48cba7070 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -82,18 +82,6 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics { } } -func AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o k8s.Resource, cfg *config.K8sPluginConfig) { - o.SetNamespace(taskCtx.GetNamespace()) - o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) - o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) - o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) - o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) - if cfg.InjectFinalizer { - f := append(o.GetFinalizers(), finalizer) - o.SetFinalizers(f) - } -} - func IsK8sObjectNotExists(err error) bool { return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err) } @@ -107,8 +95,25 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor + ignoreOwnerReferences bool +} + +func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o k8s.Resource, cfg *config.K8sPluginConfig) { + o.SetNamespace(taskCtx.GetNamespace()) + o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) + o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) + o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) + + if !e.ignoreOwnerReferences { + o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) + } + + if cfg.InjectFinalizer { + f := append(o.GetFinalizers(), finalizer) + o.SetFinalizers(f) + } } func (e *PluginManager) GetProperties() pluginsCore.PluginProperties { @@ -173,7 +178,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.UnknownTransition, err } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GroupVersionKind(), o.GetNamespace(), o.GetName()) key := backoff.ComposeResourceKey(o) @@ -225,7 +230,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { @@ -312,7 +317,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution return nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) err = e.kubeClient.GetClient().Delete(ctx, o) if err != nil && !IsK8sObjectNotExists(err) { @@ -350,7 +355,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu return nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { @@ -390,7 +395,19 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.") } - if iCtx.KubeClient() == nil { + kubeClient := iCtx.KubeClient() + if entry.NewKubeClient != nil { + kc, err := entry.NewKubeClient(ctx) + if err != nil { + return nil, err + } + + if kc != nil { + kubeClient = kc + } + } + + if kubeClient == nil { return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize K8sResource Plugin, Kubeclient cannot be nil!") } @@ -399,14 +416,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry Type: entry.ResourceToWatch, } - ownerKind := iCtx.OwnerKind() workflowParentPredicate := func(o metav1.Object) bool { + if entry.IgnoreOwnerReferences { + return true + } + ownerReference := metav1.GetControllerOf(o) if ownerReference != nil { - if ownerReference.Kind == ownerKind { + if ownerReference.Kind == iCtx.OwnerKind() { return true } } + return false } @@ -489,12 +510,13 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: iCtx.KubeClient(), - resourceLevelMonitor: rm, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, + ignoreOwnerReferences: entry.IgnoreOwnerReferences, }, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index c31bd5c57..8e9861910 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -203,7 +203,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase()) createdPod := &v1.Pod{} - AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tctx.TaskExecutionMetadata().GetNamespace(), Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod)) assert.Equal(t, tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name) @@ -224,7 +224,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.NoError(t, err) createdPod := &v1.Pod{} - AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Create(ctx, createdPod)) transition, err := pluginManager.Handle(ctx, tctx) @@ -350,7 +350,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { // Build a reference resource that is supposed to be identical to the resource built by pluginManager referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx) assert.NoError(t, err) - AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) refKey := backoff.ComposeResourceKey(referenceResource) podBackOffHandler, found := backOffController.GetBackOffHandler(refKey) assert.True(t, found) @@ -546,7 +546,61 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { } } -func TestAddObjectMetadata(t *testing.T) { +func TestPluginManager_CustomKubeClient(t *testing.T) { + ctx := context.TODO() + tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted) + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil) + fakeClient := fake.NewFakeClient() + newFakeClient := &pluginsCoreMock.KubeClient{} + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + NewKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { + return newFakeClient, nil + }, + }, NewResourceMonitorIndex()) + assert.NoError(t, err) + + assert.Equal(t, newFakeClient, pluginManager.kubeClient) +} + +func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) { + pluginManager := PluginManager{ignoreOwnerReferences: true} + genName := "genName" + execID := &pluginsCoreMock.TaskExecutionID{} + execID.On("GetGeneratedName").Return(genName) + tm := &pluginsCoreMock.TaskExecutionMetadata{} + tm.On("GetTaskExecutionID").Return(execID) + or := v12.OwnerReference{} + tm.On("GetOwnerReference").Return(or) + ns := "ns" + tm.On("GetNamespace").Return(ns) + tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"}) + + l := map[string]string{ + "l1": "lv1", + } + tm.On("GetLabels").Return(l) + + o := &v1.Pod{} + cfg := config.GetK8sPluginConfig() + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) +} + +func TestPluginManager_AddObjectMetadata(t *testing.T) { + pluginManager := PluginManager{} genName := "genName" execID := &pluginsCoreMock.TaskExecutionID{} execID.On("GetGeneratedName").Return(genName) @@ -565,7 +619,7 @@ func TestAddObjectMetadata(t *testing.T) { o := &v1.Pod{} cfg := config.GetK8sPluginConfig() - AddObjectMetadata(tm, o, cfg) + pluginManager.AddObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) assert.Equal(t, ns, o.GetNamespace())