From cbc932ff44afd9a509f8c0f692631b73ebfc7ffd Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sat, 30 Jan 2021 21:44:37 -0800 Subject: [PATCH 1/9] Propeller Fast follow - Turbo mode in FlytePropeller --- config.yaml | 3 +- pkg/controller/config/config.go | 1 + pkg/controller/handler.go | 72 ++++++++++++++++++++------------- 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/config.yaml b/config.yaml index d9318d4d9..5cbbd2894 100644 --- a/config.yaml +++ b/config.yaml @@ -32,7 +32,6 @@ tasks: - container - sidecar - K8S-ARRAY - - qubole-hive-executor # Uncomment to enable sagemaker plugin # - sagemaker_training # - sagemaker_hyperparameter_tuning @@ -95,7 +94,7 @@ event: rate: 500 capacity: 1000 admin: - endpoint: localhost:30081 + endpoint: localhost:80 insecure: true catalog-cache: type: noop diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 400e7ede9..38cc31174 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -74,6 +74,7 @@ type Config struct { MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` + EnableFastFollow bool `json:"enable-fast-follow" pflag:",Boolean flag that enables Fast Follow mode, this makes Propeller proceed to another round on successful write to etcD."` } type KubeClientConfig struct { diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 0800029fe..d414337d4 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -181,39 +181,53 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { } } - mutatedWf, err := p.TryMutateWorkflow(ctx, w) - if err != nil { - // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations - // We only want to increase failed attempts and discard any other partial changes to the CRD. - mutatedWf = RecordSystemError(w, err) - p.metrics.SystemError.Inc(ctx) - } else if mutatedWf == nil { - return nil - } else { - if !w.GetExecutionStatus().IsTerminated() { - // No updates in the status we detected, we will skip writing to KubeAPI - if mutatedWf.Status.Equals(&w.Status) { - logger.Info(ctx, "WF hasn't been updated in this round.") - return nil + for ; ; { + mutatedWf, err := p.TryMutateWorkflow(ctx, w) + if err != nil { + // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations + // We only want to increase failed attempts and discard any other partial changes to the CRD. + mutatedWf = RecordSystemError(w, err) + p.metrics.SystemError.Inc(ctx) + } else if mutatedWf == nil { + return nil + } else { + if !w.GetExecutionStatus().IsTerminated() { + // No updates in the status we detected, we will skip writing to KubeAPI + if mutatedWf.Status.Equals(&w.Status) { + logger.Info(ctx, "WF hasn't been updated in this round.") + return nil + } + } + if mutatedWf.GetExecutionStatus().IsTerminated() { + // If the end result is a terminated workflow, we remove the labels + // We add a completed label so that we can avoid polling for this workflow + SetCompletedLabel(mutatedWf, time.Now()) + ResetFinalizers(mutatedWf) } } - if mutatedWf.GetExecutionStatus().IsTerminated() { - // If the end result is a terminated workflow, we remove the labels - // We add a completed label so that we can avoid polling for this workflow - SetCompletedLabel(mutatedWf, time.Now()) - ResetFinalizers(mutatedWf) + // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update + + // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) + if updateErr != nil { + return updateErr } + if err != nil { + // An error was encountered during the round. Let us return, so that we can back-off gracefully + return err + } + if !p.cfg.EnableFastFollow || mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion { + // Workflow is terminated (no need to continue) or no status was changed, we can wait + logger.Infof(ctx, "Will not fast follow, Reason: Enabled? %v, Wf terminated? %v, Version matched? %v", + p.cfg.EnableFastFollow, mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) + return nil + } + logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round") + w = newWf } - // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update - - // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - _, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) - if updateErr != nil { - return updateErr - } - return err + return nil } func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller { From a00e94d4965a1f1edbf842ff409deb84dd7d6a81 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sun, 31 Jan 2021 12:56:15 -0800 Subject: [PATCH 2/9] Streaks: and limits for it --- pkg/controller/config/config.go | 4 ++++ pkg/controller/handler.go | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 38cc31174..dcdb5bc6e 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -47,6 +47,8 @@ var ( MaxNodeRetriesOnSystemFailures: 3, InterruptibleFailureThreshold: 1, }, + EnableFastFollow: true, + MaxStreakLength: 5, } ) @@ -75,6 +77,8 @@ type Config struct { KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` EnableFastFollow bool `json:"enable-fast-follow" pflag:",Boolean flag that enables Fast Follow mode, this makes Propeller proceed to another round on successful write to etcD."` + MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow if fast follow mode is enabled."` + } type KubeClientConfig struct { diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index d414337d4..86e3d708f 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -32,6 +32,7 @@ type propellerMetrics struct { PanicObserved labeled.Counter RoundSkipped prometheus.Counter WorkflowNotFound prometheus.Counter + StreakLength labeled.Counter } func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { @@ -45,6 +46,7 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { PanicObserved: labeled.NewCounter("panic", "Panic during handling or aborting workflow", roundScope, labeled.EmitUnlabeledMetric), RoundSkipped: roundScope.MustNewCounter("skipped", "Round Skipped because of stale workflow"), WorkflowNotFound: roundScope.MustNewCounter("not_found", "workflow not found in the cache"), + StreakLength: labeled.NewCounter("streak_length", "Number of consecutive rounds used in fast follow mode", roundScope, labeled.EmitUnlabeledMetric), } } @@ -180,8 +182,10 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { return nil } } + streak := 0 + defer p.metrics.StreakLength.Add(ctx, float64(streak)) - for ; ; { + for streak = 0; streak < p.cfg.MaxTTLInHours; streak++ { mutatedWf, err := p.TryMutateWorkflow(ctx, w) if err != nil { // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations @@ -224,7 +228,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { p.cfg.EnableFastFollow, mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) return nil } - logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round") + logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak) w = newWf } return nil From 6671fcedc479fa72bf0467e3a4e56d58363f536c Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sun, 31 Jan 2021 13:14:54 -0800 Subject: [PATCH 3/9] Linter fixes --- cmd/kubectl-flyte/cmd/get.go | 6 ++++-- pkg/compiler/errors/compiler_errors.go | 2 +- pkg/compiler/validators/interface.go | 14 ++++++++------ pkg/controller/config/config.go | 5 ++--- pkg/controller/nodes/dynamic/dynamic_workflow.go | 3 ++- .../task/fakeplugins/next_phase_state_plugin.go | 2 +- pkg/controller/workflow/executor.go | 2 +- 7 files changed, 19 insertions(+), 15 deletions(-) diff --git a/cmd/kubectl-flyte/cmd/get.go b/cmd/kubectl-flyte/cmd/get.go index ff0afdd0d..73e18b4ea 100644 --- a/cmd/kubectl-flyte/cmd/get.go +++ b/cmd/kubectl-flyte/cmd/get.go @@ -92,7 +92,8 @@ func (g *GetOpts) iterateOverWorkflows(f func(*v1alpha1.FlyteWorkflow) error, ba return err } for _, w := range wList.Items { - if err := f(&w); err != nil { + _w := w + if err := f(&_w); err != nil { return err } counter++ @@ -127,7 +128,8 @@ func (g *GetOpts) iterateOverQuotas(f func(quota *v12.ResourceQuota) error, batc return err } for _, r := range rq.Items { - if err := f(&r); err != nil { + _r := r + if err := f(&_r); err != nil { return err } counter++ diff --git a/pkg/compiler/errors/compiler_errors.go b/pkg/compiler/errors/compiler_errors.go index 14f804bf8..d9fe600e9 100755 --- a/pkg/compiler/errors/compiler_errors.go +++ b/pkg/compiler/errors/compiler_errors.go @@ -83,7 +83,7 @@ const ( func NewBranchNodeNotSpecified(branchNodeID string) *CompileError { return newError( BranchNodeIDNotFound, - fmt.Sprintf("BranchNode not assigned"), + "BranchNode not assigned", branchNodeID, ) } diff --git a/pkg/compiler/validators/interface.go b/pkg/compiler/validators/interface.go index febd66350..c89a47246 100644 --- a/pkg/compiler/validators/interface.go +++ b/pkg/compiler/validators/interface.go @@ -70,13 +70,15 @@ func ValidateUnderlyingInterface(w c.WorkflowBuilder, node c.NodeBuilder, errs e // Compute exposed inputs as the union of all required inputs and any input overwritten by the node. exposedInputs := map[string]*core.Variable{} - for name, p := range inputs.Parameters { - if p.GetRequired() { - exposedInputs[name] = p.Var - } else if containsBindingByVariableName(node.GetInputs(), name) { - exposedInputs[name] = p.Var + if inputs != nil && inputs.Parameters != nil { + for name, p := range inputs.Parameters { + if p.GetRequired() { + exposedInputs[name] = p.Var + } else if containsBindingByVariableName(node.GetInputs(), name) { + exposedInputs[name] = p.Var + } + // else, the param has a default value and is not being overwritten by the node } - // else, the param has a default value and is not being overwritten by the node } iface = &core.TypedInterface{ diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index dcdb5bc6e..8b1d878d4 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -48,7 +48,7 @@ var ( InterruptibleFailureThreshold: 1, }, EnableFastFollow: true, - MaxStreakLength: 5, + MaxStreakLength: 5, } ) @@ -76,9 +76,8 @@ type Config struct { MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` - EnableFastFollow bool `json:"enable-fast-follow" pflag:",Boolean flag that enables Fast Follow mode, this makes Propeller proceed to another round on successful write to etcD."` + EnableFastFollow bool `json:"enable-fast-follow" pflag:",Boolean flag that enables Fast Follow mode, this makes Propeller proceed to another round on successful write to etcD."` MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow if fast follow mode is enabled."` - } type KubeClientConfig struct { diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index c7d3a9ca6..407da1a68 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -286,7 +286,8 @@ func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, var launchPlanInterfaces = make([]common.InterfaceProvider, len(launchPlanIDs)) for idx, id := range launchPlanIDs { - lp, err := d.lpReader.GetLaunchPlan(ctx, &id) + idVal := id + lp, err := d.lpReader.GetLaunchPlan(ctx, &idVal) if err != nil { logger.Debugf(ctx, "Error fetching launch plan definition from admin") if launchplan.IsNotFound(err) || launchplan.IsUserError(err) { diff --git a/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go b/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go index ec1781d26..6f5cb30e4 100644 --- a/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go +++ b/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go @@ -84,7 +84,7 @@ func (n NextPhaseStatePlugin) Finalize(ctx context.Context, tCtx pluginCore.Task func NewPhaseBasedPlugin() NextPhaseStatePlugin { return NextPhaseStatePlugin{ - id: fmt.Sprintf("next_phase_plugin"), + id: "next_phase_plugin", props: pluginCore.PluginProperties{}, } } diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 35531d643..d885664fb 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -275,7 +275,7 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W return nil case v1alpha1.WorkflowPhaseRunning: wfEvent.Phase = core.WorkflowExecution_RUNNING - wStatus.UpdatePhase(v1alpha1.WorkflowPhaseRunning, fmt.Sprintf("Workflow Started"), nil) + wStatus.UpdatePhase(v1alpha1.WorkflowPhaseRunning, "Workflow Started", nil) wfEvent.OccurredAt = utils.GetProtoTime(wStatus.GetStartedAt()) case v1alpha1.WorkflowPhaseHandlingFailureNode: fallthrough From 7475e77409e7901b0a8e18b29eb459f34ea83b84 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sun, 31 Jan 2021 13:50:29 -0800 Subject: [PATCH 4/9] Max streaklength bug --- pkg/controller/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 86e3d708f..d325b6d82 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -185,7 +185,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { streak := 0 defer p.metrics.StreakLength.Add(ctx, float64(streak)) - for streak = 0; streak < p.cfg.MaxTTLInHours; streak++ { + for streak = 0; streak < p.cfg.MaxStreakLength; streak++ { mutatedWf, err := p.TryMutateWorkflow(ctx, w) if err != nil { // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations From 855d6835b0922f2d6c185d08877e4319dda31d95 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sun, 31 Jan 2021 14:27:54 -0800 Subject: [PATCH 5/9] Handler improved --- pkg/controller/handler.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index d325b6d82..7b8a6bd88 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -185,7 +185,12 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { streak := 0 defer p.metrics.StreakLength.Add(ctx, float64(streak)) - for streak = 0; streak < p.cfg.MaxStreakLength; streak++ { + maxLength := p.cfg.MaxStreakLength + if maxLength <= 0 { + maxLength = 1 + } + + for streak = 0; streak < ; streak++ { mutatedWf, err := p.TryMutateWorkflow(ctx, w) if err != nil { // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations From 64fc4ece64ae24b970bf4d221e689f5df2fe09ea Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sun, 31 Jan 2021 14:30:49 -0800 Subject: [PATCH 6/9] missed a typo --- pkg/controller/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 7b8a6bd88..dbcd8fe8b 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -190,7 +190,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { maxLength = 1 } - for streak = 0; streak < ; streak++ { + for streak = 0; streak < maxLength; streak++ { mutatedWf, err := p.TryMutateWorkflow(ctx, w) if err != nil { // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations From 14a38af3e9af9e8450ddd12f76919acd382c1fc4 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sun, 31 Jan 2021 22:07:59 -0800 Subject: [PATCH 7/9] More unit tests --- pkg/controller/config/config.go | 8 +- pkg/controller/handler.go | 9 +- pkg/controller/handler_test.go | 216 +++++++++++++++++++++++ pkg/controller/workflowstore/inmemory.go | 6 +- 4 files changed, 231 insertions(+), 8 deletions(-) diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 8b1d878d4..f2459fd5a 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -47,8 +47,8 @@ var ( MaxNodeRetriesOnSystemFailures: 3, InterruptibleFailureThreshold: 1, }, - EnableFastFollow: true, - MaxStreakLength: 5, + EnableTurboMode: true, + MaxStreakLength: 5, } ) @@ -76,8 +76,8 @@ type Config struct { MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` - EnableFastFollow bool `json:"enable-fast-follow" pflag:",Boolean flag that enables Fast Follow mode, this makes Propeller proceed to another round on successful write to etcD."` - MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow if fast follow mode is enabled."` + EnableTurboMode bool `json:"enable-turbo-mode" pflag:",Boolean flag that enables Turbo-mode, this makes Propeller proceed to another round on successful write to etcD."` + MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow if turbo-mode is enabled."` } type KubeClientConfig struct { diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index dbcd8fe8b..3b542aa43 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -132,6 +132,10 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F return mutableW, nil } +func ShouldExitCurrentEvaluationLoop(turboModeEnabled bool, isWorkflowTerminated bool, newResVer, oldResVer string) bool { + return !(turboModeEnabled && !isWorkflowTerminated && newResVer != oldResVer) +} + // reconciler compares the actual state with the desired, and attempts to // converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource // with the current status of the resource. @@ -227,10 +231,11 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { // An error was encountered during the round. Let us return, so that we can back-off gracefully return err } - if !p.cfg.EnableFastFollow || mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion { + if ShouldExitCurrentEvaluationLoop(p.cfg.EnableTurboMode, mutatedWf.GetExecutionStatus().IsTerminated(), + newWf.ResourceVersion, mutatedWf.ResourceVersion) { // Workflow is terminated (no need to continue) or no status was changed, we can wait logger.Infof(ctx, "Will not fast follow, Reason: Enabled? %v, Wf terminated? %v, Version matched? %v", - p.cfg.EnableFastFollow, mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) + p.cfg.EnableTurboMode, mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) return nil } logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak) diff --git a/pkg/controller/handler_test.go b/pkg/controller/handler_test.go index 2c93a5c22..754c97074 100644 --- a/pkg/controller/handler_test.go +++ b/pkg/controller/handler_test.go @@ -449,6 +449,214 @@ func TestPropeller_Handle(t *testing.T) { }) } +func TestPropeller_Handle_TurboMode(t *testing.T) { + // TODO unit tests need to fixed + scope := promutils.NewTestScope() + ctx := context.TODO() + s := workflowstore.NewInMemoryWorkflowStore() + exec := &mockExecutor{} + cfg := &config.Config{ + MaxWorkflowRetries: 0, + EnableTurboMode: true, + MaxStreakLength: 5, + } + + const namespace = "test" + const name = "123" + + p := NewPropellerHandler(ctx, cfg, s, exec, scope) + + t.Run("error", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + })) + called := false + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + if called { + return fmt.Errorf("already called once") + } + called = true + return fmt.Errorf("failed") + } + assert.Error(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseReady, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 0, len(r.Finalizers)) + assert.Equal(t, uint32(1), r.Status.FailedAttempts) + assert.False(t, HasCompletedLabel(r)) + }) + + t.Run("abort", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 1, + }, + })) + + called := false + exec.HandleAbortedCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow, maxRetries uint32) error { + if called { + return fmt.Errorf("already called once") + } + called = true + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseFailed, "done", nil) + return nil + } + assert.NoError(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseFailed, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 0, len(r.Finalizers)) + assert.True(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(1), r.Status.FailedAttempts) + }) + + t.Run("abort-error", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 1, + Phase: v1alpha1.WorkflowPhaseRunning, + }, + })) + + called := false + exec.HandleAbortedCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow, maxRetries uint32) error { + if called { + return fmt.Errorf("already called once") + } + called = true + return fmt.Errorf("abort error") + } + assert.Error(t, p.Handle(ctx, namespace, name)) + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseRunning, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 0, len(r.Finalizers)) + assert.False(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(2), r.Status.FailedAttempts) + }) + + t.Run("noUpdate", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + Finalizers: []string{"f1"}, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + Status: v1alpha1.WorkflowStatus{ + Phase: v1alpha1.WorkflowPhaseSucceeding, + }, + })) + called := false + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + if called { + return fmt.Errorf("already called once") + } + called = true + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseSucceeding, "", nil) + return nil + } + assert.NoError(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) + assert.False(t, HasCompletedLabel(r)) + assert.Equal(t, 1, len(r.Finalizers)) + }) + + t.Run("happy-nochange", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + })) + called := 0 + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + if called >= 2 { + return fmt.Errorf("already called once") + } + called++ + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseSucceeding, "done", nil) + return nil + } + assert.NoError(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 1, len(r.Finalizers)) + assert.False(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(0), r.Status.FailedAttempts) + }) + + t.Run("happy-success", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + })) + called := 0 + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + if called >= 2 { + return fmt.Errorf("already called once") + } + if called == 0 { + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseSucceeding, "done", nil) + } else { + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseSuccess, "done", nil) + } + called++ + return nil + } + assert.NoError(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseSuccess.String(), r.GetExecutionStatus().GetPhase().String()) + assert.Equal(t, 0, len(r.Finalizers)) + assert.True(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(0), r.Status.FailedAttempts) + assert.Equal(t, 2, called) + }) + +} + func TestPropellerHandler_Initialize(t *testing.T) { scope := promutils.NewTestScope() ctx := context.TODO() @@ -462,3 +670,11 @@ func TestPropellerHandler_Initialize(t *testing.T) { assert.NoError(t, p.Initialize(ctx)) } + +func TestShouldExitCurrentEvaluationLoop(t *testing.T) { + assert.True(t, ShouldExitCurrentEvaluationLoop(false, false, "x", "y")) + assert.True(t, ShouldExitCurrentEvaluationLoop(true, true, "x", "y")) + assert.True(t, ShouldExitCurrentEvaluationLoop(true, false, "x", "x")) + assert.True(t, ShouldExitCurrentEvaluationLoop(false, true, "x", "x")) + assert.False(t, ShouldExitCurrentEvaluationLoop(true, false, "x", "y")) +} diff --git a/pkg/controller/workflowstore/inmemory.go b/pkg/controller/workflowstore/inmemory.go index 50ed58b4f..46277eff8 100644 --- a/pkg/controller/workflowstore/inmemory.go +++ b/pkg/controller/workflowstore/inmemory.go @@ -50,8 +50,10 @@ func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.Fl if w.Name != "" && w.Namespace != "" { if m, ok := i.store[w.Namespace]; ok { if _, ok := m[w.Name]; ok { - m[w.Name] = w - return w, nil + newW := w.DeepCopy() + newW.ResourceVersion = w.ResourceVersion + "x" + m[w.Name] = newW + return newW, nil } } From b6fe696d8edfa985c714e169f9cbfdcf681b296c Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 1 Feb 2021 21:55:05 -0800 Subject: [PATCH 8/9] Fast follow - comments addressed --- pkg/controller/config/config.go | 6 ++---- pkg/controller/handler.go | 18 ++++++++++-------- pkg/controller/handler_test.go | 9 --------- 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index f2459fd5a..162a9b7cb 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -47,8 +47,7 @@ var ( MaxNodeRetriesOnSystemFailures: 3, InterruptibleFailureThreshold: 1, }, - EnableTurboMode: true, - MaxStreakLength: 5, + MaxStreakLength: 5, // Turbo mode is enabled by default } ) @@ -76,8 +75,7 @@ type Config struct { MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` - EnableTurboMode bool `json:"enable-turbo-mode" pflag:",Boolean flag that enables Turbo-mode, this makes Propeller proceed to another round on successful write to etcD."` - MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow if turbo-mode is enabled."` + MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."` } type KubeClientConfig struct { diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 3b542aa43..b43924e3c 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -33,6 +33,7 @@ type propellerMetrics struct { RoundSkipped prometheus.Counter WorkflowNotFound prometheus.Counter StreakLength labeled.Counter + RoundTime labeled.StopWatch } func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { @@ -47,6 +48,7 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { RoundSkipped: roundScope.MustNewCounter("skipped", "Round Skipped because of stale workflow"), WorkflowNotFound: roundScope.MustNewCounter("not_found", "workflow not found in the cache"), StreakLength: labeled.NewCounter("streak_length", "Number of consecutive rounds used in fast follow mode", roundScope, labeled.EmitUnlabeledMetric), + RoundTime: labeled.NewStopWatch("round_time", "Total time taken by one round traversing, copying and storing a workflow", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric), } } @@ -132,10 +134,6 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F return mutableW, nil } -func ShouldExitCurrentEvaluationLoop(turboModeEnabled bool, isWorkflowTerminated bool, newResVer, oldResVer string) bool { - return !(turboModeEnabled && !isWorkflowTerminated && newResVer != oldResVer) -} - // reconciler compares the actual state with the desired, and attempts to // converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource // with the current status of the resource. @@ -195,6 +193,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { } for streak = 0; streak < maxLength; streak++ { + t := p.metrics.RoundTime.Start(ctx) mutatedWf, err := p.TryMutateWorkflow(ctx, w) if err != nil { // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations @@ -208,6 +207,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { // No updates in the status we detected, we will skip writing to KubeAPI if mutatedWf.Status.Equals(&w.Status) { logger.Info(ctx, "WF hasn't been updated in this round.") + t.Stop() return nil } } @@ -231,16 +231,18 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { // An error was encountered during the round. Let us return, so that we can back-off gracefully return err } - if ShouldExitCurrentEvaluationLoop(p.cfg.EnableTurboMode, mutatedWf.GetExecutionStatus().IsTerminated(), - newWf.ResourceVersion, mutatedWf.ResourceVersion) { + if mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion { // Workflow is terminated (no need to continue) or no status was changed, we can wait - logger.Infof(ctx, "Will not fast follow, Reason: Enabled? %v, Wf terminated? %v, Version matched? %v", - p.cfg.EnableTurboMode, mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) + logger.Infof(ctx, "Will not fast follow, Reason: Wf terminated? %v, Version matched? %v", + mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) + t.Stop() return nil } logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak) w = newWf + t.Stop() } + logger.Infof(ctx, "Streak ended at [%d]/Max: [%d]", streak, maxLength) return nil } diff --git a/pkg/controller/handler_test.go b/pkg/controller/handler_test.go index 754c97074..380ac2014 100644 --- a/pkg/controller/handler_test.go +++ b/pkg/controller/handler_test.go @@ -457,7 +457,6 @@ func TestPropeller_Handle_TurboMode(t *testing.T) { exec := &mockExecutor{} cfg := &config.Config{ MaxWorkflowRetries: 0, - EnableTurboMode: true, MaxStreakLength: 5, } @@ -670,11 +669,3 @@ func TestPropellerHandler_Initialize(t *testing.T) { assert.NoError(t, p.Initialize(ctx)) } - -func TestShouldExitCurrentEvaluationLoop(t *testing.T) { - assert.True(t, ShouldExitCurrentEvaluationLoop(false, false, "x", "y")) - assert.True(t, ShouldExitCurrentEvaluationLoop(true, true, "x", "y")) - assert.True(t, ShouldExitCurrentEvaluationLoop(true, false, "x", "x")) - assert.True(t, ShouldExitCurrentEvaluationLoop(false, true, "x", "x")) - assert.False(t, ShouldExitCurrentEvaluationLoop(true, false, "x", "y")) -} From 1ce9c12e0e8494395ff6b80245d0a92e911ae614 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 3 Feb 2021 16:28:41 -0800 Subject: [PATCH 9/9] Address comments --- pkg/controller/handler.go | 3 +++ pkg/controller/workflowstore/inmemory.go | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index b43924e3c..46b04791d 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -201,6 +201,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { mutatedWf = RecordSystemError(w, err) p.metrics.SystemError.Inc(ctx) } else if mutatedWf == nil { + logger.Errorf(ctx, "Should not happen! Mutation resulted in a nil workflow!") return nil } else { if !w.GetExecutionStatus().IsTerminated() { @@ -225,9 +226,11 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { // nothing other than resource status has been updated. newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) if updateErr != nil { + t.Stop() return updateErr } if err != nil { + t.Stop() // An error was encountered during the round. Let us return, so that we can back-off gracefully return err } diff --git a/pkg/controller/workflowstore/inmemory.go b/pkg/controller/workflowstore/inmemory.go index 46277eff8..0a7423c17 100644 --- a/pkg/controller/workflowstore/inmemory.go +++ b/pkg/controller/workflowstore/inmemory.go @@ -51,6 +51,7 @@ func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.Fl if m, ok := i.store[w.Namespace]; ok { if _, ok := m[w.Name]; ok { newW := w.DeepCopy() + // Appends to the resource version to ensure that resource version has been updated newW.ResourceVersion = w.ResourceVersion + "x" m[w.Name] = newW return newW, nil @@ -69,6 +70,9 @@ func (i *InmemoryWorkflowStore) Update(ctx context.Context, w *v1alpha1.FlyteWor return i.UpdateStatus(ctx, w, priorityClass) } +// Returns an inmemory store, that will update the resource version for every update automatically. This is a good +// idea to test that resource version checks work, but does not represent that the object was actually updated or not +// The resource version is ALWAYS updated. func NewInMemoryWorkflowStore() *InmemoryWorkflowStore { return &InmemoryWorkflowStore{ store: map[string]map[string]*v1alpha1.FlyteWorkflow{},