From f2a8ac7d9a649a3b5d0da5acdb04fd42168c0074 Mon Sep 17 00:00:00 2001 From: Ketan Umare <16888709+kumare3@users.noreply.github.com> Date: Tue, 6 Apr 2021 21:36:24 -0700 Subject: [PATCH] #patch Bug Fixes - resiliency in Dynamic spec loading failures and Workqueue metrics name fix (#251) * Bug Fixes: - dynamic spec load failure should cause a system retry - Workqueue metrics name fixed - Better logging Signed-off-by: Ketan Umare * comments addressed Signed-off-by: Ketan Umare --- cmd/controller/cmd/root.go | 5 +++-- go.mod | 2 +- go.sum | 5 ++--- pkg/controller/composite_workqueue.go | 4 ++-- pkg/controller/nodes/dynamic/dynamic_workflow.go | 4 ++-- pkg/controller/nodes/dynamic/handler.go | 12 +++++------- pkg/controller/workqueue.go | 2 ++ 7 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cmd/controller/cmd/root.go b/cmd/controller/cmd/root.go index 11973771c7..e1d42ea200 100644 --- a/cmd/controller/cmd/root.go +++ b/cmd/controller/cmd/root.go @@ -4,8 +4,8 @@ package cmd import ( "context" "flag" - "fmt" "os" + "runtime" "runtime/pprof" "strings" @@ -69,8 +69,9 @@ var rootCmd = &cobra.Command{ // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { version.LogBuildInformation(appName) + logger.Infof(context.TODO(), "Detected: %d CPU's\n", runtime.NumCPU()) if err := rootCmd.Execute(); err != nil { - fmt.Println(err) + logger.Error(context.TODO(), err) os.Exit(1) } } diff --git a/go.mod b/go.mod index 3c0ef7d0df..c3349789e8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.31 github.com/flyteorg/flyteplugins v0.5.42 - github.com/flyteorg/flytestdlib v0.3.13 + github.com/flyteorg/flytestdlib v0.3.16 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible github.com/go-test/deep v1.0.7 diff --git a/go.sum b/go.sum index 457d9751ff..36831c66e7 100644 --- a/go.sum +++ b/go.sum @@ -230,16 +230,15 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= -github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8= github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= github.com/flyteorg/flyteidl v0.18.31 h1:aEu9HRT2GsiVKJsHYBEUA+zkPgCWFSokE0/kixYQiFY= github.com/flyteorg/flyteidl v0.18.31/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteplugins v0.5.41 h1:8n1Z55P59ICV4453Dk7fhaUbB944j3BMZ+ozywHczgU= -github.com/flyteorg/flyteplugins v0.5.41/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= github.com/flyteorg/flyteplugins v0.5.42 h1:G4DRR2r8LlmkV+orXloDi1ly+M5WuvAaNlWFgGGyy3A= github.com/flyteorg/flyteplugins v0.5.42/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= +github.com/flyteorg/flytestdlib v0.3.16 h1:wO/tk0KfLIpLS89GbJWG4nNADWnOse4nQbTDN/mEvzM= +github.com/flyteorg/flytestdlib v0.3.16/go.mod h1:VlbQuHTE+z2N5qusfwi+6WEkeJoqr8Q0E4NtBAsdwkU= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= diff --git a/pkg/controller/composite_workqueue.go b/pkg/controller/composite_workqueue.go index 08ecf38081..260b504bef 100644 --- a/pkg/controller/composite_workqueue.go +++ b/pkg/controller/composite_workqueue.go @@ -146,13 +146,13 @@ func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{}) { } func NewCompositeWorkQueue(ctx context.Context, cfg config.CompositeQueueConfig, scope promutils.Scope) (CompositeWorkQueue, error) { - workQ, err := NewWorkQueue(ctx, cfg.Queue, scope.CurrentScope()) + workQ, err := NewWorkQueue(ctx, cfg.Queue, scope.NewScopedMetricName("main")) if err != nil { return nil, errors.Wrapf(err, "failed to create WorkQueue in CompositeQueue type Batch") } switch cfg.Type { case config.CompositeQueueBatch: - subQ, err := NewWorkQueue(ctx, cfg.Sub, scope.NewSubScope("sub").CurrentScope()) + subQ, err := NewWorkQueue(ctx, cfg.Sub, scope.NewScopedMetricName("sub")) if err != nil { return nil, errors.Wrapf(err, "failed to create SubQueue in CompositeQueue type Batch") } diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index 852db85ec3..3d0e2dc078 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -122,7 +122,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C defer t.Stop() f, err := task.NewRemoteFutureFileReader(ctx, nCtx.NodeStatus().GetOutputDir(), nCtx.DataStore()) if err != nil { - return dynamicWorkflowContext{}, err + return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to open futures file for reading") } // TODO: This is a hack to set parent task execution id, we should move to node-node relationship. @@ -153,7 +153,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C // We know for sure that futures file was generated. Lets read it djSpec, err := f.Read(ctx) if err != nil { - return dynamicWorkflowContext{}, errors.Wrapf("DynamicJobSpecReadFailed", err, "unable to read futures file, maybe corrupted") + return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "unable to read futures file, maybe corrupted") } var closure *core.CompiledWorkflowClosure diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 0225c740e8..3299df1fbb 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -93,15 +93,13 @@ func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevSt func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) { dCtx, err := d.buildContextualDynamicWorkflow(ctx, nCtx) if err != nil { - kind := core.ExecutionError_UNKNOWN if stdErrors.IsCausedBy(err, utils.ErrorCodeUser) { - kind = core.ExecutionError_USER - } else if stdErrors.IsCausedBy(err, utils.ErrorCodeSystem) { - kind = core.ExecutionError_SYSTEM + return handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoFailure(core.ExecutionError_USER, "DynamicWorkflowBuildFailed", err.Error(), nil), + ), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil } - return handler.DoTransition(handler.TransitionTypeEphemeral, - handler.PhaseInfoFailure(kind, "DynamicWorkflowBuildFailed", err.Error(), nil), - ), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil + // Mostly a system error or unknnwn + return handler.Transition{}, handler.DynamicNodeState{}, err } trns, newState, err := d.progressDynamicWorkflow(ctx, dCtx.execContext, dCtx.subWorkflow, dCtx.nodeLookup, nCtx, prevState) diff --git a/pkg/controller/workqueue.go b/pkg/controller/workqueue.go index 38e766e5c2..fd30394c94 100644 --- a/pkg/controller/workqueue.go +++ b/pkg/controller/workqueue.go @@ -8,6 +8,8 @@ import ( "golang.org/x/time/rate" "github.com/flyteorg/flytestdlib/logger" + // Setup workqueue metrics + _ "github.com/flyteorg/flytestdlib/promutils" "k8s.io/client-go/util/workqueue" )