diff --git a/go.mod b/go.mod index 21eac55f1..0954f63dd 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.20 - github.com/flyteorg/flyteplugins v0.5.35 + github.com/flyteorg/flyteplugins v0.5.38 github.com/flyteorg/flytestdlib v0.3.13 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index fb8c58ce8..8e92f5de2 100644 --- a/go.sum +++ b/go.sum @@ -233,8 +233,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= github.com/flyteorg/flyteidl v0.18.20 h1:OGOb2FOHWL363Qp8uzbJeFbQBKYPT30+afv+8BnBlGs= github.com/flyteorg/flyteidl v0.18.20/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteplugins v0.5.35 h1:KEMOiA4B+lIxQ+l7FRHzVcPA234Td9+ursuJDm6I8dg= -github.com/flyteorg/flyteplugins v0.5.35/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo= +github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw= +github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 048852b2f..2a38ab430 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -84,18 +84,6 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics { } } -func AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { - o.SetNamespace(taskCtx.GetNamespace()) - o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) - o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) - o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) - o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) - if cfg.InjectFinalizer { - f := append(o.GetFinalizers(), finalizer) - o.SetFinalizers(f) - } -} - func IsK8sObjectNotExists(err error) bool { return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err) } @@ -109,8 +97,26 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor + disableInjectOwnerReferences bool + disableInjectFinalizer bool +} + +func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { + o.SetNamespace(taskCtx.GetNamespace()) + o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) + o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels)) + o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) + + if !e.disableInjectOwnerReferences { + o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()}) + } + + if cfg.InjectFinalizer && !e.disableInjectFinalizer { + f := append(o.GetFinalizers(), finalizer) + o.SetFinalizers(f) + } } func (e *PluginManager) GetProperties() pluginsCore.PluginProperties { @@ -175,7 +181,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.UnknownTransition, err } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GetObjectKind().GroupVersionKind(), o.GetNamespace(), o.GetName()) key := backoff.ComposeResourceKey(o) @@ -227,7 +233,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { @@ -314,7 +320,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution return nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) err = e.kubeClient.GetClient().Delete(ctx, o) if err != nil && !IsK8sObjectNotExists(err) { @@ -352,7 +358,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu return nil } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { @@ -392,7 +398,19 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.") } - if iCtx.KubeClient() == nil { + kubeClient := iCtx.KubeClient() + if entry.CustomKubeClient != nil { + kc, err := entry.CustomKubeClient(ctx) + if err != nil { + return nil, err + } + + if kc != nil { + kubeClient = kc + } + } + + if kubeClient == nil { return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize K8sResource Plugin, Kubeclient cannot be nil!") } @@ -401,14 +419,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry Type: entry.ResourceToWatch, } - ownerKind := iCtx.OwnerKind() workflowParentPredicate := func(o metav1.Object) bool { + if entry.DisableInjectOwnerReferences { + return true + } + ownerReference := metav1.GetControllerOf(o) if ownerReference != nil { - if ownerReference.Kind == ownerKind { + if ownerReference.Kind == iCtx.OwnerKind() { return true } } + return false } @@ -492,12 +514,14 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: iCtx.KubeClient(), - resourceLevelMonitor: rm, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, + disableInjectOwnerReferences: entry.DisableInjectOwnerReferences, + disableInjectFinalizer: entry.DisableInjectFinalizer, }, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 221f3d751..b0469528f 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -160,6 +160,25 @@ func getMockTaskExecutionMetadata() pluginsCore.TaskExecutionMetadata { return taskExecutionMetadata } +func getMockTaskExecutionMetadataCustom( + tid string, + ns string, + annotations map[string]string, + labels map[string]string, + ownerRef v12.OwnerReference) pluginsCore.TaskExecutionMetadata { + taskExecutionMetadata := &pluginsCoreMock.TaskExecutionMetadata{} + taskExecutionMetadata.On("GetNamespace").Return(ns) + taskExecutionMetadata.On("GetAnnotations").Return(annotations) + taskExecutionMetadata.On("GetLabels").Return(labels) + taskExecutionMetadata.On("GetOwnerReference").Return(ownerRef) + + id := &pluginsCoreMock.TaskExecutionID{} + id.On("GetGeneratedName").Return(tid) + id.On("GetID").Return(core.TaskExecutionIdentifier{}) + taskExecutionMetadata.On("GetTaskExecutionID").Return(id) + return taskExecutionMetadata +} + func dummySetupContext(fakeClient client.Client) pluginsCore.SetupContext { setupContext := &pluginsCoreMock.SetupContext{} var enqueueOwnerFunc = pluginsCore.EnqueueOwner(func(ownerId k8stypes.NamespacedName) error { return nil }) @@ -202,7 +221,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase()) createdPod := &v1.Pod{} - AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tctx.TaskExecutionMetadata().GetNamespace(), Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod)) assert.Equal(t, tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name) @@ -223,7 +242,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.NoError(t, err) createdPod := &v1.Pod{} - AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Create(ctx, createdPod)) transition, err := pluginManager.Handle(ctx, tctx) @@ -349,7 +368,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { // Build a reference resource that is supposed to be identical to the resource built by pluginManager referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx) assert.NoError(t, err) - AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) + pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) refKey := backoff.ComposeResourceKey(referenceResource) podBackOffHandler, found := backOffController.GetBackOffHandler(refKey) assert.True(t, found) @@ -545,34 +564,104 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { } } -func TestAddObjectMetadata(t *testing.T) { +func TestPluginManager_CustomKubeClient(t *testing.T) { + ctx := context.TODO() + tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted) + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil) + fakeClient := fake.NewClientBuilder().Build() + newFakeClient := &pluginsCoreMock.KubeClient{} + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { + return newFakeClient, nil + }, + }, NewResourceMonitorIndex()) + assert.NoError(t, err) + + assert.Equal(t, newFakeClient, pluginManager.kubeClient) +} + +func TestPluginManager_AddObjectMetadata(t *testing.T) { genName := "genName" - execID := &pluginsCoreMock.TaskExecutionID{} - execID.On("GetGeneratedName").Return(genName) - tm := &pluginsCoreMock.TaskExecutionMetadata{} - tm.On("GetTaskExecutionID").Return(execID) - or := v12.OwnerReference{} - tm.On("GetOwnerReference").Return(or) ns := "ns" - tm.On("GetNamespace").Return(ns) - tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"}) - - l := map[string]string{ - "l1": "lv1", - } - tm.On("GetLabels").Return(l) + or := v12.OwnerReference{} + l := map[string]string{"l1": "lv1"} + a := map[string]string{"aKey": "aVal"} + tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) - o := &v1.Pod{} cfg := config.GetK8sPluginConfig() - AddObjectMetadata(tm, o, cfg) - assert.Equal(t, genName, o.GetName()) - assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) - assert.Equal(t, ns, o.GetNamespace()) - assert.Equal(t, map[string]string{ - "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", - "aKey": "aVal", - }, o.GetAnnotations()) - assert.Equal(t, l, o.GetLabels()) + + t.Run("default", func(t *testing.T) { + o := &v1.Pod{} + pluginManager := PluginManager{} + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) + + t.Run("Disable OwnerReferences injection", func(t *testing.T) { + pluginManager := PluginManager{disableInjectOwnerReferences: true} + o := &v1.Pod{} + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 0, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) + + t.Run("Disable enabled InjectFinalizer", func(t *testing.T) { + pluginManager := PluginManager{disableInjectFinalizer: true} + // enable finalizer injection + cfg.InjectFinalizer = true + o := &v1.Pod{} + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 1, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) + + t.Run("Disable disabled InjectFinalizer", func(t *testing.T) { + pluginManager := PluginManager{disableInjectFinalizer: true} + // disable finalizer injection + cfg.InjectFinalizer = false + o := &v1.Pod{} + pluginManager.AddObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 1, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 0, len(o.GetFinalizers())) + }) + } func TestResourceManagerConstruction(t *testing.T) {