From 2687cebb7ff43a0b1ad5f2f203fe7b8f34a74630 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 13 Mar 2023 16:41:47 -0500 Subject: [PATCH 1/5] persisting k8s plugin state between evaluations Signed-off-by: Daniel Rammer --- go.mod | 2 + go.sum | 4 +- .../nodes/task/k8s/plugin_context.go | 32 ++++++++++++- .../nodes/task/k8s/plugin_manager.go | 48 +++++++++++++++++-- 4 files changed, 79 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index f6fef25fc..ef7e45070 100644 --- a/go.mod +++ b/go.mod @@ -147,3 +147,5 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d + +replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.41-0.20230313213632-e021db003b46 diff --git a/go.sum b/go.sum index b6aa787a8..f9a670c36 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk= github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s= -github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= +github.com/flyteorg/flyteplugins v1.0.41-0.20230313213632-e021db003b46 h1:4OVLZ+yvm7QJVyiFkaEzaR80fkcrN6xOagFdnFO/dDQ= +github.com/flyteorg/flyteplugins v1.0.41-0.20230313213632-e021db003b46/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/pkg/controller/nodes/task/k8s/plugin_context.go b/pkg/controller/nodes/task/k8s/plugin_context.go index cb90edfb3..ac2d06f7a 100644 --- a/pkg/controller/nodes/task/k8s/plugin_context.go +++ b/pkg/controller/nodes/task/k8s/plugin_context.go @@ -16,6 +16,7 @@ type pluginContext struct { pluginsCore.TaskExecutionContext // Lazily creates a buffered outputWriter, overriding the input outputWriter. ow *ioutils.BufferedOutputWriter + k8sPluginState *k8s.PluginState } // Provides an output sync of type io.OutputWriter @@ -26,9 +27,38 @@ func (p *pluginContext) OutputWriter() io.OutputWriter { return buf } -func newPluginContext(tCtx pluginsCore.TaskExecutionContext) *pluginContext { +// TODO @hamersaw docs +type pluginStateReader struct { + k8sPluginState *k8s.PluginState +} + +// TODO @hamersaw docs +func (p pluginStateReader) GetStateVersion() uint8 { + return 0; +} + +// TODO @hamersaw docs +func (p pluginStateReader) Get(t interface{}) (stateVersion uint8, err error) { + if pointer, ok := t.(*k8s.PluginState); ok { + *pointer = *p.k8sPluginState + } else { + // TODO @hamersaw err + } + + return 0, nil +} + +// TODO @hamersaw docs +func (p *pluginContext) PluginStateReader() pluginsCore.PluginStateReader { + return pluginStateReader { + k8sPluginState: p.k8sPluginState, + } +} + +func newPluginContext(tCtx pluginsCore.TaskExecutionContext, k8sPluginState *k8s.PluginState) *pluginContext { return &pluginContext{ TaskExecutionContext: tCtx, ow: nil, + k8sPluginState: k8sPluginState, } } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index e0d786858..02c08177e 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -59,7 +59,8 @@ const ( ) type PluginState struct { - Phase PluginPhase + Phase PluginPhase + K8sPluginState k8s.PluginState } type PluginMetrics struct { @@ -247,7 +248,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.DoTransition(pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "task submitted to K8s")), nil } -func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { +func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, k8sPluginState *k8s.PluginState) (pluginsCore.Transition, error) { o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) if err != nil { @@ -274,7 +275,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore e.metrics.ResourceDeleted.Inc(ctx) } - pCtx := newPluginContext(tCtx) + pCtx := newPluginContext(tCtx, k8sPluginState) p, err := e.plugin.GetTaskPhase(ctx, pCtx, o) if err != nil { logger.Warnf(ctx, "failed to check status of resource in plugin [%s], with error: %s", e.GetID(), err.Error()) @@ -311,6 +312,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore } func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { + // read phase state ps := PluginState{} if v, err := tCtx.PluginStateReader().Get(&ps); err != nil { if v != pluginStateVersion { @@ -318,7 +320,45 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio } return pluginsCore.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state") } + + // evaluate plugin + var err error + var transition pluginsCore.Transition + pluginPhase := ps.Phase if ps.Phase == PluginPhaseNotStarted { + transition, err = e.LaunchResource(ctx, tCtx) + pluginPhase = PluginPhaseStarted + } else { + transition, err = e.CheckResourcePhase(ctx, tCtx, &ps.K8sPluginState) + } + + if err != nil { + return transition, err + } + + // persist any changes in phase state + k8sPluginState := ps.K8sPluginState + if ps.Phase != pluginPhase || k8sPluginState.Phase != transition.Info().Phase() || + k8sPluginState.PhaseVersion != transition.Info().Version() || k8sPluginState.Reason != transition.Info().Reason() { + + newPluginState := PluginState{ + Phase: pluginPhase, + K8sPluginState: k8s.PluginState{ + Phase: transition.Info().Phase(), + PhaseVersion: transition.Info().Version(), + Reason: transition.Info().Reason(), + }, + } + + if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &newPluginState); err != nil { + return pluginsCore.UnknownTransition, err + } + } + + return transition, err + + // TODO @hamersaw - dead code + /*if ps.Phase == PluginPhaseNotStarted { t, err := e.LaunchResource(ctx, tCtx) if err == nil && t.Info().Phase() == pluginsCore.PhaseQueued { if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &PluginState{Phase: PluginPhaseStarted}); err != nil { @@ -327,7 +367,7 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio } return t, err } - return e.CheckResourcePhase(ctx, tCtx) + return e.CheckResourcePhase(ctx, tCtx)*/ } func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { From 1360dcac07637c03eb5e29c4822a320526b9abfd Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 14 Mar 2023 17:28:02 -0500 Subject: [PATCH 2/5] fixed unit tests and linter Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 4 ++-- pkg/controller/nodes/task/k8s/plugin_context.go | 9 +++++---- pkg/controller/nodes/task/k8s/plugin_manager.go | 8 +++++--- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index ef7e45070..6621f9906 100644 --- a/go.mod +++ b/go.mod @@ -148,4 +148,4 @@ require ( replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d -replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.41-0.20230313213632-e021db003b46 +replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.41-0.20230314215832-73c496550087 diff --git a/go.sum b/go.sum index f9a670c36..3fc942177 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk= github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.41-0.20230313213632-e021db003b46 h1:4OVLZ+yvm7QJVyiFkaEzaR80fkcrN6xOagFdnFO/dDQ= -github.com/flyteorg/flyteplugins v1.0.41-0.20230313213632-e021db003b46/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= +github.com/flyteorg/flyteplugins v1.0.41-0.20230314215832-73c496550087 h1:rgOGaYVQTOe/HJbChlQt/LcURQnmSk7JpBsUPSsDHnM= +github.com/flyteorg/flyteplugins v1.0.41-0.20230314215832-73c496550087/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/pkg/controller/nodes/task/k8s/plugin_context.go b/pkg/controller/nodes/task/k8s/plugin_context.go index ac2d06f7a..a2851c206 100644 --- a/pkg/controller/nodes/task/k8s/plugin_context.go +++ b/pkg/controller/nodes/task/k8s/plugin_context.go @@ -2,6 +2,7 @@ package k8s import ( "context" + "fmt" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" @@ -15,7 +16,7 @@ var _ k8s.PluginContext = &pluginContext{} type pluginContext struct { pluginsCore.TaskExecutionContext // Lazily creates a buffered outputWriter, overriding the input outputWriter. - ow *ioutils.BufferedOutputWriter + ow *ioutils.BufferedOutputWriter k8sPluginState *k8s.PluginState } @@ -34,7 +35,7 @@ type pluginStateReader struct { // TODO @hamersaw docs func (p pluginStateReader) GetStateVersion() uint8 { - return 0; + return 0 } // TODO @hamersaw docs @@ -42,7 +43,7 @@ func (p pluginStateReader) Get(t interface{}) (stateVersion uint8, err error) { if pointer, ok := t.(*k8s.PluginState); ok { *pointer = *p.k8sPluginState } else { - // TODO @hamersaw err + return 0, fmt.Errorf("unexpected type when reading plugin state") } return 0, nil @@ -50,7 +51,7 @@ func (p pluginStateReader) Get(t interface{}) (stateVersion uint8, err error) { // TODO @hamersaw docs func (p *pluginContext) PluginStateReader() pluginsCore.PluginStateReader { - return pluginStateReader { + return pluginStateReader{ k8sPluginState: p.k8sPluginState, } } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 02c08177e..9a0c00684 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -327,7 +327,9 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio pluginPhase := ps.Phase if ps.Phase == PluginPhaseNotStarted { transition, err = e.LaunchResource(ctx, tCtx) - pluginPhase = PluginPhaseStarted + if err == nil && transition.Info().Phase() == pluginsCore.PhaseQueued { + pluginPhase = PluginPhaseStarted + } } else { transition, err = e.CheckResourcePhase(ctx, tCtx, &ps.K8sPluginState) } @@ -338,11 +340,11 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio // persist any changes in phase state k8sPluginState := ps.K8sPluginState - if ps.Phase != pluginPhase || k8sPluginState.Phase != transition.Info().Phase() || + if ps.Phase != pluginPhase || k8sPluginState.Phase != transition.Info().Phase() || k8sPluginState.PhaseVersion != transition.Info().Version() || k8sPluginState.Reason != transition.Info().Reason() { newPluginState := PluginState{ - Phase: pluginPhase, + Phase: pluginPhase, K8sPluginState: k8s.PluginState{ Phase: transition.Info().Phase(), PhaseVersion: transition.Info().Version(), From 930778005dce3fc24095b3954f7f87ba59cd4c79 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 14 Mar 2023 17:31:08 -0500 Subject: [PATCH 3/5] added docs Signed-off-by: Daniel Rammer --- pkg/controller/nodes/task/k8s/plugin_context.go | 8 ++++---- pkg/controller/nodes/task/k8s/plugin_manager.go | 12 ------------ 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/pkg/controller/nodes/task/k8s/plugin_context.go b/pkg/controller/nodes/task/k8s/plugin_context.go index a2851c206..aed5bc468 100644 --- a/pkg/controller/nodes/task/k8s/plugin_context.go +++ b/pkg/controller/nodes/task/k8s/plugin_context.go @@ -28,17 +28,17 @@ func (p *pluginContext) OutputWriter() io.OutputWriter { return buf } -// TODO @hamersaw docs +// pluginStateReader overrides the default PluginStateReader to return a pre-assigned PluginState. This allows us to +// encapsulate plugin state persistence in the existing k8s PluginManager and only expose the ability to read the +// previous Phase, PhaseVersion, and Reason for all k8s plugins. type pluginStateReader struct { k8sPluginState *k8s.PluginState } -// TODO @hamersaw docs func (p pluginStateReader) GetStateVersion() uint8 { return 0 } -// TODO @hamersaw docs func (p pluginStateReader) Get(t interface{}) (stateVersion uint8, err error) { if pointer, ok := t.(*k8s.PluginState); ok { *pointer = *p.k8sPluginState @@ -49,7 +49,7 @@ func (p pluginStateReader) Get(t interface{}) (stateVersion uint8, err error) { return 0, nil } -// TODO @hamersaw docs +// PluginStateReader overrides the default behavior to return our k8s plugin specific reader. func (p *pluginContext) PluginStateReader() pluginsCore.PluginStateReader { return pluginStateReader{ k8sPluginState: p.k8sPluginState, diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 9a0c00684..67b0356a3 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -358,18 +358,6 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio } return transition, err - - // TODO @hamersaw - dead code - /*if ps.Phase == PluginPhaseNotStarted { - t, err := e.LaunchResource(ctx, tCtx) - if err == nil && t.Info().Phase() == pluginsCore.PhaseQueued { - if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &PluginState{Phase: PluginPhaseStarted}); err != nil { - return pluginsCore.UnknownTransition, err - } - } - return t, err - } - return e.CheckResourcePhase(ctx, tCtx)*/ } func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { From 3b6380d2264dc554f7c8be6723b211f74dce6344 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 27 Mar 2023 19:57:44 -0500 Subject: [PATCH 4/5] updating flyteplugins dep Signed-off-by: Daniel Rammer --- go.mod | 6 ++---- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 6621f9906..7a5caddbf 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 - github.com/flyteorg/flyteidl v1.3.9 - github.com/flyteorg/flyteplugins v1.0.40 + github.com/flyteorg/flyteidl v1.3.14 + github.com/flyteorg/flyteplugins v1.0.44 github.com/flyteorg/flytestdlib v1.0.15 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible @@ -147,5 +147,3 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d - -replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.41-0.20230314215832-73c496550087 diff --git a/go.sum b/go.sum index 3fc942177..6b4e8cba5 100644 --- a/go.sum +++ b/go.sum @@ -260,10 +260,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk= -github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.41-0.20230314215832-73c496550087 h1:rgOGaYVQTOe/HJbChlQt/LcURQnmSk7JpBsUPSsDHnM= -github.com/flyteorg/flyteplugins v1.0.41-0.20230314215832-73c496550087/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= +github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= +github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteplugins v1.0.44 h1:uKizng+i0vfXslyPBlrsfecInhvy71fTB4kRg7eiifE= +github.com/flyteorg/flyteplugins v1.0.44/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= From c73a6052422e68306bbd3139393be4fb567f5f54 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 30 Mar 2023 11:26:31 -0500 Subject: [PATCH 5/5] added unit tests Signed-off-by: Daniel Rammer --- .../nodes/task/k8s/plugin_manager_test.go | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 160dc335f..94b6b5524 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "testing" "k8s.io/client-go/kubernetes/scheme" @@ -715,6 +716,157 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { } } +func TestPluginManager_Handle_PluginState(t *testing.T) { + ctx := context.TODO() + tm := getMockTaskExecutionMetadata() + res := &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: tm.GetTaskExecutionID().GetGeneratedName(), + Namespace: tm.GetNamespace(), + }, + } + + pluginStateQueued := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseQueued, + PhaseVersion: 0, + Reason: "foo", + }, + } + pluginStateQueuedVersion1 := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseQueued, + PhaseVersion: 1, + Reason: "foo", + }, + } + pluginStateQueuedReasonBar := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseQueued, + PhaseVersion: 0, + Reason: "bar", + }, + } + pluginStateRunning := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseRunning, + PhaseVersion: 0, + Reason: "", + }, + } + + phaseInfoQueued := pluginsCore.PhaseInfoQueuedWithTaskInfo(pluginStateQueued.K8sPluginState.PhaseVersion, pluginStateQueued.K8sPluginState.Reason, nil) + phaseInfoQueuedVersion1 := pluginsCore.PhaseInfoQueuedWithTaskInfo( + pluginStateQueuedVersion1.K8sPluginState.PhaseVersion, + pluginStateQueuedVersion1.K8sPluginState.Reason, + nil, + ) + phaseInfoQueuedReasonBar := pluginsCore.PhaseInfoQueuedWithTaskInfo( + pluginStateQueuedReasonBar.K8sPluginState.PhaseVersion, + pluginStateQueuedReasonBar.K8sPluginState.Reason, + nil, + ) + phaseInfoRunning := pluginsCore.PhaseInfoRunning(0, nil) + + tests := []struct { + name string + startPluginState PluginState + reportedPhaseInfo pluginsCore.PhaseInfo + expectedPluginState PluginState + }{ + { + "NoChange", + pluginStateQueued, + phaseInfoQueued, + pluginStateQueued, + }, + { + "K8sPhaseChange", + pluginStateQueued, + phaseInfoRunning, + pluginStateRunning, + }, + { + "PhaseVersionChange", + pluginStateQueued, + phaseInfoQueuedVersion1, + pluginStateQueuedVersion1, + }, + { + "ReasonChange", + pluginStateQueued, + phaseInfoQueuedReasonBar, + pluginStateQueuedReasonBar, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // mock TaskExecutionContext + tCtx := &pluginsCoreMock.TaskExecutionContext{} + tCtx.OnTaskExecutionMetadata().Return(getMockTaskExecutionMetadata()) + + tReader := &pluginsCoreMock.TaskReader{} + tReader.OnReadMatch(mock.Anything).Return(&core.TaskTemplate{}, nil) + tCtx.OnTaskReader().Return(tReader) + + // mock state reader / writer to use local pluginState variable + pluginState := &tt.startPluginState + customStateReader := &pluginsCoreMock.PluginStateReader{} + customStateReader.OnGetMatch(mock.MatchedBy(func(i interface{}) bool { + ps, ok := i.(*PluginState) + if ok { + *ps = *pluginState + return true + } + return false + })).Return(uint8(0), nil) + tCtx.OnPluginStateReader().Return(customStateReader) + + customStateWriter := &pluginsCoreMock.PluginStateWriter{} + customStateWriter.OnPutMatch(mock.Anything, mock.MatchedBy(func(i interface{}) bool { + ps, ok := i.(*PluginState) + if ok { + *pluginState = *ps + } + return ok + })).Return(nil) + tCtx.OnPluginStateWriter().Return(customStateWriter) + tCtx.OnOutputWriter().Return(&dummyOutputWriter{}) + + fc := extendedFakeClient{Client: fake.NewFakeClient(res)} + + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) + mockResourceHandler.On("BuildIdentityResource", mock.Anything, tCtx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.On("GetTaskPhase", mock.Anything, mock.Anything, mock.Anything).Return(tt.reportedPhaseInfo, nil) + + // create new PluginManager + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }, NewResourceMonitorIndex()) + assert.NoError(t, err) + + // handle plugin + _, err = pluginManager.Handle(ctx, tCtx) + assert.NoError(t, err) + + // verify expected PluginState + newPluginState := PluginState{} + _, err = tCtx.PluginStateReader().Get(&newPluginState) + assert.NoError(t, err) + + assert.True(t, reflect.DeepEqual(newPluginState, tt.expectedPluginState)) + }) + } +} + func TestPluginManager_CustomKubeClient(t *testing.T) { ctx := context.TODO() tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)