Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add support for custom KubeClient #225

Merged
merged 7 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.20
github.com/flyteorg/flyteplugins v0.5.35
github.com/flyteorg/flyteplugins v0.5.38
github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.20 h1:OGOb2FOHWL363Qp8uzbJeFbQBKYPT30+afv+8BnBlGs=
github.com/flyteorg/flyteidl v0.18.20/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.35 h1:KEMOiA4B+lIxQ+l7FRHzVcPA234Td9+ursuJDm6I8dg=
github.com/flyteorg/flyteplugins v0.5.35/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
78 changes: 51 additions & 27 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,6 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics {
}
}

func AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())
if cfg.InjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func IsK8sObjectNotExists(err error) bool {
return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err)
}
Expand All @@ -109,8 +97,26 @@ type PluginManager struct {
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
disableInjectOwnerReferences bool
disableInjectFinalizer bool
}

func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())

if !e.disableInjectOwnerReferences {
regadas marked this conversation as resolved.
Show resolved Hide resolved
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
}

if cfg.InjectFinalizer && !e.disableInjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func (e *PluginManager) GetProperties() pluginsCore.PluginProperties {
Expand Down Expand Up @@ -175,7 +181,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas
return pluginsCore.UnknownTransition, err
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GetObjectKind().GroupVersionKind(), o.GetNamespace(), o.GetName())

key := backoff.ComposeResourceKey(o)
Expand Down Expand Up @@ -227,7 +233,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore
return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
Expand Down Expand Up @@ -314,7 +320,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())

err = e.kubeClient.GetClient().Delete(ctx, o)
if err != nil && !IsK8sObjectNotExists(err) {
Expand Down Expand Up @@ -352,7 +358,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
Expand Down Expand Up @@ -392,7 +398,19 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.")
}

if iCtx.KubeClient() == nil {
kubeClient := iCtx.KubeClient()
if entry.CustomKubeClient != nil {
kc, err := entry.CustomKubeClient(ctx)
if err != nil {
return nil, err
}

if kc != nil {
kubeClient = kc
}
}

if kubeClient == nil {
return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize K8sResource Plugin, Kubeclient cannot be nil!")
}

Expand All @@ -401,14 +419,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
Type: entry.ResourceToWatch,
}

ownerKind := iCtx.OwnerKind()
workflowParentPredicate := func(o metav1.Object) bool {
if entry.DisableInjectOwnerReferences {
return true
}

ownerReference := metav1.GetControllerOf(o)
if ownerReference != nil {
if ownerReference.Kind == ownerKind {
if ownerReference.Kind == iCtx.OwnerKind() {
return true
}
}

return false
}

Expand Down Expand Up @@ -492,12 +514,14 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
rm.RunCollectorOnce(ctx)

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: iCtx.KubeClient(),
resourceLevelMonitor: rm,
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
disableInjectOwnerReferences: entry.DisableInjectOwnerReferences,
disableInjectFinalizer: entry.DisableInjectFinalizer,
}, nil
}

Expand Down
143 changes: 116 additions & 27 deletions pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ func getMockTaskExecutionMetadata() pluginsCore.TaskExecutionMetadata {
return taskExecutionMetadata
}

func getMockTaskExecutionMetadataCustom(
tid string,
ns string,
annotations map[string]string,
labels map[string]string,
ownerRef v12.OwnerReference) pluginsCore.TaskExecutionMetadata {
taskExecutionMetadata := &pluginsCoreMock.TaskExecutionMetadata{}
taskExecutionMetadata.On("GetNamespace").Return(ns)
taskExecutionMetadata.On("GetAnnotations").Return(annotations)
taskExecutionMetadata.On("GetLabels").Return(labels)
taskExecutionMetadata.On("GetOwnerReference").Return(ownerRef)

id := &pluginsCoreMock.TaskExecutionID{}
id.On("GetGeneratedName").Return(tid)
id.On("GetID").Return(core.TaskExecutionIdentifier{})
taskExecutionMetadata.On("GetTaskExecutionID").Return(id)
return taskExecutionMetadata
}

func dummySetupContext(fakeClient client.Client) pluginsCore.SetupContext {
setupContext := &pluginsCoreMock.SetupContext{}
var enqueueOwnerFunc = pluginsCore.EnqueueOwner(func(ownerId k8stypes.NamespacedName) error { return nil })
Expand Down Expand Up @@ -202,7 +221,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase())
createdPod := &v1.Pod{}

AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tctx.TaskExecutionMetadata().GetNamespace(),
Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod))
assert.Equal(t, tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name)
Expand All @@ -223,7 +242,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
assert.NoError(t, err)

createdPod := &v1.Pod{}
AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
assert.NoError(t, fakeClient.Create(ctx, createdPod))

transition, err := pluginManager.Handle(ctx, tctx)
Expand Down Expand Up @@ -349,7 +368,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
// Build a reference resource that is supposed to be identical to the resource built by pluginManager
referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx)
assert.NoError(t, err)
AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig())
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig())
refKey := backoff.ComposeResourceKey(referenceResource)
podBackOffHandler, found := backOffController.GetBackOffHandler(refKey)
assert.True(t, found)
Expand Down Expand Up @@ -545,34 +564,104 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
}
}

func TestAddObjectMetadata(t *testing.T) {
func TestPluginManager_CustomKubeClient(t *testing.T) {
ctx := context.TODO()
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().Build()
newFakeClient := &pluginsCoreMock.KubeClient{}
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) {
return newFakeClient, nil
},
}, NewResourceMonitorIndex())
assert.NoError(t, err)

assert.Equal(t, newFakeClient, pluginManager.kubeClient)
}

func TestPluginManager_AddObjectMetadata(t *testing.T) {
genName := "genName"
execID := &pluginsCoreMock.TaskExecutionID{}
execID.On("GetGeneratedName").Return(genName)
tm := &pluginsCoreMock.TaskExecutionMetadata{}
tm.On("GetTaskExecutionID").Return(execID)
or := v12.OwnerReference{}
tm.On("GetOwnerReference").Return(or)
ns := "ns"
tm.On("GetNamespace").Return(ns)
tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"})

l := map[string]string{
"l1": "lv1",
}
tm.On("GetLabels").Return(l)
or := v12.OwnerReference{}
l := map[string]string{"l1": "lv1"}
a := map[string]string{"aKey": "aVal"}
tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or)

o := &v1.Pod{}
cfg := config.GetK8sPluginConfig()
AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences())
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())

t.Run("default", func(t *testing.T) {
o := &v1.Pod{}
pluginManager := PluginManager{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences())
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

t.Run("Disable OwnerReferences injection", func(t *testing.T) {
pluginManager := PluginManager{disableInjectOwnerReferences: true}
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
// empty OwnerReference since we are ignoring
assert.Equal(t, 0, len(o.GetOwnerReferences()))
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

t.Run("Disable enabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
// enable finalizer injection
cfg.InjectFinalizer = true
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
// empty OwnerReference since we are ignoring
assert.Equal(t, 1, len(o.GetOwnerReferences()))
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

t.Run("Disable disabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
// disable finalizer injection
cfg.InjectFinalizer = false
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
// empty OwnerReference since we are ignoring
assert.Equal(t, 1, len(o.GetOwnerReferences()))
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

}

func TestResourceManagerConstruction(t *testing.T) {
Expand Down