Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add support for custom KubeClient
Browse files Browse the repository at this point in the history
Signed-off-by: Filipe Regadas <[email protected]>
  • Loading branch information
regadas committed Mar 2, 2021
1 parent 631e8cf commit 845d3ed
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 32 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ replace (
k8s.io/api => github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0
k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f
k8s.io/client-go => k8s.io/client-go v0.0.0-20191016111102-bec269661e48

github.com/lyft/flyteplugins => ../flyteplugins
)
76 changes: 49 additions & 27 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics {
}
}

func AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o k8s.Resource, cfg *config.K8sPluginConfig) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())
if cfg.InjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func IsK8sObjectNotExists(err error) bool {
return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err)
}
Expand All @@ -107,8 +95,25 @@ type PluginManager struct {
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
ignoreOwnerReferences bool
}

func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o k8s.Resource, cfg *config.K8sPluginConfig) {
o.SetNamespace(taskCtx.GetNamespace())
o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations())))
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())

if !e.ignoreOwnerReferences {
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
}

if cfg.InjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func (e *PluginManager) GetProperties() pluginsCore.PluginProperties {
Expand Down Expand Up @@ -173,7 +178,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas
return pluginsCore.UnknownTransition, err
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GroupVersionKind(), o.GetNamespace(), o.GetName())

key := backoff.ComposeResourceKey(o)
Expand Down Expand Up @@ -225,7 +230,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore
return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
Expand Down Expand Up @@ -312,7 +317,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())

err = e.kubeClient.GetClient().Delete(ctx, o)
if err != nil && !IsK8sObjectNotExists(err) {
Expand Down Expand Up @@ -350,7 +355,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
Expand Down Expand Up @@ -390,7 +395,19 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.")
}

if iCtx.KubeClient() == nil {
kubeClient := iCtx.KubeClient()
if entry.NewKubeClient != nil {
kc, err := entry.NewKubeClient(ctx)
if err != nil {
return nil, err
}

if kc != nil {
kubeClient = kc
}
}

if kubeClient == nil {
return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize K8sResource Plugin, Kubeclient cannot be nil!")
}

Expand All @@ -399,14 +416,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
Type: entry.ResourceToWatch,
}

ownerKind := iCtx.OwnerKind()
workflowParentPredicate := func(o metav1.Object) bool {
if entry.IgnoreOwnerReferences {
return true
}

ownerReference := metav1.GetControllerOf(o)
if ownerReference != nil {
if ownerReference.Kind == ownerKind {
if ownerReference.Kind == iCtx.OwnerKind() {
return true
}
}

return false
}

Expand Down Expand Up @@ -489,12 +510,13 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
rm.RunCollectorOnce(ctx)

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: iCtx.KubeClient(),
resourceLevelMonitor: rm,
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
ignoreOwnerReferences: entry.IgnoreOwnerReferences,
}, nil
}

Expand Down
64 changes: 59 additions & 5 deletions pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase())
createdPod := &v1.Pod{}

AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tctx.TaskExecutionMetadata().GetNamespace(),
Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod))
assert.Equal(t, tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name)
Expand All @@ -224,7 +224,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
assert.NoError(t, err)

createdPod := &v1.Pod{}
AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{})
assert.NoError(t, fakeClient.Create(ctx, createdPod))

transition, err := pluginManager.Handle(ctx, tctx)
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
// Build a reference resource that is supposed to be identical to the resource built by pluginManager
referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx)
assert.NoError(t, err)
AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig())
pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig())
refKey := backoff.ComposeResourceKey(referenceResource)
podBackOffHandler, found := backOffController.GetBackOffHandler(refKey)
assert.True(t, found)
Expand Down Expand Up @@ -546,7 +546,61 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
}
}

func TestAddObjectMetadata(t *testing.T) {
func TestPluginManager_CustomKubeClient(t *testing.T) {
ctx := context.TODO()
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil)
fakeClient := fake.NewFakeClient()
newFakeClient := &pluginsCoreMock.KubeClient{}
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
NewKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) {
return newFakeClient, nil
},
}, NewResourceMonitorIndex())
assert.NoError(t, err)

assert.Equal(t, newFakeClient, pluginManager.kubeClient)
}

func TestPluginManager_AddObjectMetadata_IgnoreOwnerReferences(t *testing.T) {
pluginManager := PluginManager{ignoreOwnerReferences: true}
genName := "genName"
execID := &pluginsCoreMock.TaskExecutionID{}
execID.On("GetGeneratedName").Return(genName)
tm := &pluginsCoreMock.TaskExecutionMetadata{}
tm.On("GetTaskExecutionID").Return(execID)
or := v12.OwnerReference{}
tm.On("GetOwnerReference").Return(or)
ns := "ns"
tm.On("GetNamespace").Return(ns)
tm.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"})

l := map[string]string{
"l1": "lv1",
}
tm.On("GetLabels").Return(l)

o := &v1.Pod{}
cfg := config.GetK8sPluginConfig()
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
// empty OwnerReference since we are ignoring
assert.Equal(t, 0, len(o.GetOwnerReferences()))
assert.Equal(t, ns, o.GetNamespace())
assert.Equal(t, map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
"aKey": "aVal",
}, o.GetAnnotations())
assert.Equal(t, l, o.GetLabels())
}

func TestPluginManager_AddObjectMetadata(t *testing.T) {
pluginManager := PluginManager{}
genName := "genName"
execID := &pluginsCoreMock.TaskExecutionID{}
execID.On("GetGeneratedName").Return(genName)
Expand All @@ -565,7 +619,7 @@ func TestAddObjectMetadata(t *testing.T) {

o := &v1.Pod{}
cfg := config.GetK8sPluginConfig()
AddObjectMetadata(tm, o, cfg)
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences())
assert.Equal(t, ns, o.GetNamespace())
Expand Down

0 comments on commit 845d3ed

Please sign in to comment.