Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add support for custom KubeClient (#225)
Browse files Browse the repository at this point in the history
* Add support for custom KubeClient

Signed-off-by: Filipe Regadas <[email protected]>

* Review and rebase

Signed-off-by: Filipe Regadas <[email protected]>

* Fix test

Signed-off-by: Filipe Regadas <[email protected]>

* Update and fix conditions

Signed-off-by: Filipe Regadas <[email protected]>

* fix typo

Signed-off-by: Filipe Regadas <[email protected]>

* Rework test

Signed-off-by: Filipe Regadas <[email protected]>

* rebase master

Signed-off-by: Filipe Regadas <[email protected]>
  • Loading branch information
regadas authored Mar 19, 2021
1 parent 11ee7f1 commit 0d1eeab
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.20
github.com/flyteorg/flyteplugins v0.5.35
github.com/flyteorg/flyteplugins v0.5.38
github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.20 h1:OGOb2FOHWL363Qp8uzbJeFbQBKYPT30+afv+8BnBlGs=
github.com/flyteorg/flyteidl v0.18.20/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.35 h1:KEMOiA4B+lIxQ+l7FRHzVcPA234Td9+ursuJDm6I8dg=
github.com/flyteorg/flyteplugins v0.5.35/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
78 changes: 51 additions & 27 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,6 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics {
}
}

func AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())
if cfg.InjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func IsK8sObjectNotExists(err error) bool {
return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err)
}
Expand All @@ -109,8 +97,26 @@ type PluginManager struct {
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
disableInjectOwnerReferences bool
disableInjectFinalizer bool
}

func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())

if !e.disableInjectOwnerReferences {
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
}

if cfg.InjectFinalizer && !e.disableInjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func (e *PluginManager) GetProperties() pluginsCore.PluginProperties {
Expand Down Expand Up @@ -175,7 +181,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas
return pluginsCore.UnknownTransition, err
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GetObjectKind().GroupVersionKind(), o.GetNamespace(), o.GetName())

key := backoff.ComposeResourceKey(o)
Expand Down Expand Up @@ -227,7 +233,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore
return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
Expand Down Expand Up @@ -314,7 +320,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())

err = e.kubeClient.GetClient().Delete(ctx, o)
if err != nil && !IsK8sObjectNotExists(err) {
Expand Down Expand Up @@ -352,7 +358,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
Expand Down Expand Up @@ -392,7 +398,19 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.")
}

if iCtx.KubeClient() == nil {
kubeClient := iCtx.KubeClient()
if entry.CustomKubeClient != nil {
kc, err := entry.CustomKubeClient(ctx)
if err != nil {
return nil, err
}

if kc != nil {
kubeClient = kc
}
}

if kubeClient == nil {
return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize K8sResource Plugin, Kubeclient cannot be nil!")
}

Expand All @@ -401,14 +419,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
Type: entry.ResourceToWatch,
}

ownerKind := iCtx.OwnerKind()
workflowParentPredicate := func(o metav1.Object) bool {
if entry.DisableInjectOwnerReferences {
return true
}

ownerReference := metav1.GetControllerOf(o)
if ownerReference != nil {
if ownerReference.Kind == ownerKind {
if ownerReference.Kind == iCtx.OwnerKind() {
return true
}
}

return false
}

Expand Down Expand Up @@ -492,12 +514,14 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
rm.RunCollectorOnce(ctx)

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: iCtx.KubeClient(),
resourceLevelMonitor: rm,
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
disableInjectOwnerReferences: entry.DisableInjectOwnerReferences,
disableInjectFinalizer: entry.DisableInjectFinalizer,
}, nil
}

Expand Down
143 changes: 116 additions & 27 deletions pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ func getMockTaskExecutionMetadata() pluginsCore.TaskExecutionMetadata {
return taskExecutionMetadata
}

func getMockTaskExecutionMetadataCustom(
tid string,
ns string,
annotations map[string]string,
labels map[string]string,
ownerRef v12.OwnerReference) pluginsCore.TaskExecutionMetadata {
taskExecutionMetadata := &pluginsCoreMock.TaskExecutionMetadata{}
taskExecutionMetadata.On("GetNamespace").Return(ns)
taskExecutionMetadata.On("GetAnnotations").Return(annotations)
taskExecutionMetadata.On("GetLabels").Return(labels)
taskExecutionMetadata.On("GetOwnerReference").Return(ownerRef)

id := &pluginsCoreMock.TaskExecutionID{}
id.On("GetGeneratedName").Return(tid)
id.On("GetID").Return(core.TaskExecutionIdentifier{})
taskExecutionMetadata.On("GetTaskExecutionID").Return(id)
return taskExecutionMetadata
}

func dummySetupContext(fakeClient client.Client) pluginsCore.SetupContext {
setupContext := &pluginsCoreMock.SetupContext{}
var enqueueOwnerFunc = pluginsCore.EnqueueOwner(func(ownerId k8stypes.NamespacedName) error { return nil })
Expand Down Expand Up @@ -202,7 +221,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase())
createdPod := &v1.Pod{}

AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tctx.TaskExecutionMetadata().GetNamespace(),
Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod))
assert.Equal(t, tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name)
Expand All @@ -223,7 +242,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
assert.NoError(t, err)

createdPod := &v1.Pod{}
AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
assert.NoError(t, fakeClient.Create(ctx, createdPod))

transition, err := pluginManager.Handle(ctx, tctx)
Expand Down Expand Up @@ -349,7 +368,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
// Build a reference resource that is supposed to be identical to the resource built by pluginManager
referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx)
assert.NoError(t, err)
AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig())
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig())
refKey := backoff.ComposeResourceKey(referenceResource)
podBackOffHandler, found := backOffController.GetBackOffHandler(refKey)
assert.True(t, found)
Expand Down Expand Up @@ -545,34 +564,104 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
}
}

func TestAddObjectMetadata(t *testing.T) {
func TestPluginManager_CustomKubeClient(t *testing.T) {
ctx := context.TODO()
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().Build()
newFakeClient := &pluginsCoreMock.KubeClient{}
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) {
return newFakeClient, nil
},
}, NewResourceMonitorIndex())
assert.NoError(t, err)

assert.Equal(t, newFakeClient, pluginManager.kubeClient)
}

func TestPluginManager_AddObjectMetadata(t *testing.T) {
genName := "genName"
execID := &pluginsCoreMock.TaskExecutionID{}
execID.On("GetGeneratedName").Return(genName)
tm := &pluginsCoreMock.TaskExecutionMetadata{}
tm.On("GetTaskExecutionID").Return(execID)
or := v12.OwnerReference{}
tm.On("GetOwnerReference").Return(or)
ns := "ns"
tm.On("GetNamespace").Return(ns)
tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"})

l := map[string]string{
"l1": "lv1",
}
tm.On("GetLabels").Return(l)
or := v12.OwnerReference{}
l := map[string]string{"l1": "lv1"}
a := map[string]string{"aKey": "aVal"}
tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or)

o := &v1.Pod{}
cfg := config.GetK8sPluginConfig()
AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences())
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())

t.Run("default", func(t *testing.T) {
o := &v1.Pod{}
pluginManager := PluginManager{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences())
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

t.Run("Disable OwnerReferences injection", func(t *testing.T) {
pluginManager := PluginManager{disableInjectOwnerReferences: true}
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
// empty OwnerReference since we are ignoring
assert.Equal(t, 0, len(o.GetOwnerReferences()))
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

t.Run("Disable enabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
// enable finalizer injection
cfg.InjectFinalizer = true
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
// empty OwnerReference since we are ignoring
assert.Equal(t, 1, len(o.GetOwnerReferences()))
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

t.Run("Disable disabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
// disable finalizer injection
cfg.InjectFinalizer = false
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
// empty OwnerReference since we are ignoring
assert.Equal(t, 1, len(o.GetOwnerReferences()))
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
assert.Equal(t, 0, len(o.GetFinalizers()))
})

}

func TestResourceManagerConstruction(t *testing.T) {
Expand Down

0 comments on commit 0d1eeab

Please sign in to comment.