Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Fix cases where we miss finalizing tasks (#135)
Browse files Browse the repository at this point in the history
* Wip

* Actually start the informers

* Generate

* fixes

* build break

* Add goroutine label

* nit

* op_code_generaet

* PR Comments

* PR Comments

* PR Comments

* Unit tests

* Update plugins dep
  • Loading branch information
EngHabu authored May 27, 2020
1 parent 02098c6 commit 5e69329
Show file tree
Hide file tree
Showing 21 changed files with 461 additions and 134 deletions.
19 changes: 18 additions & 1 deletion cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"flag"
"fmt"
"os"
"runtime/pprof"
"strings"

"github.com/lyft/flytestdlib/contextutils"

"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/cache"

Expand Down Expand Up @@ -215,10 +218,24 @@ func executeRootCmd(cfg *config2.Config) {
logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err)
}

c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope)
// Start controller runtime manager to start listening to resource changes.
// K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer
// EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect
// workflow changes faster than the default sync interval for workflow CRDs.
go func(ctx context.Context) {
ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager")
pprof.SetGoroutineLabels(ctx)
logger.Infof(ctx, "Starting controller-runtime manager")
err := mgr.Start(ctx.Done())
if err != nil {
logger.Fatalf(ctx, "Failed to start manager. Error: %v", err)
}
}(ctx)

c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope)
if err != nil {
logger.Fatalf(ctx, "Failed to start Controller - [%v]", err.Error())
return
} else if c == nil {
logger.Fatalf(ctx, "Failed to start Controller, nil controller received.")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.24
github.com/lyft/flyteplugins v0.3.23
github.com/lyft/flyteplugins v0.3.28
github.com/lyft/flytestdlib v0.3.3
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.6 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0
github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.23 h1:cN6d6f1ZkoHw+HD4wFCSVFVv+sCSeyx13E+hXIYEDzo=
github.com/lyft/flyteplugins v0.3.23/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flyteplugins v0.3.28 h1:4YSjJyQUHFtVoQio4X3wYtS7WRIGdJQf9Wtcl75e+1w=
github.com/lyft/flyteplugins v0.3.28/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
Expand Down Expand Up @@ -629,6 +631,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand Down Expand Up @@ -802,6 +805,7 @@ gomodules.xyz/jsonpatch/v2 v2.1.0/go.mod h1:IhYNNY4jnS53ZnfE4PAmpKtDpTCj1JFXc+3m
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485 h1:OB/uP/Puiu5vS5QMRPrXCDWUPb+kt8f1KW8oQzFejQw=
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e h1:jRyg0XfpwWlhEV8mDfdNGBeSJM2fuyh9Yjrnd8kF2Ts=
gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e/go.mod h1:kS+toOQn6AQKjmKJ7gzohV1XkqsFehRA2FbsbkopSuQ=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
Expand Down
2 changes: 1 addition & 1 deletion hack/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
package tools

// Uncomment this to make code-generator work
//import _ "k8s.io/code-generator"
// import _ "k8s.io/code-generator"
13 changes: 13 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ const (
WorkflowPhaseFailing
WorkflowPhaseFailed
WorkflowPhaseAborted
// WorkflowPhaseHandlingFailureNode is the phase the workflow will enter when a failure is detected in the workflow,
// the workflow has finished cleaning up (aborted running nodes... etc.) and a failure node is declared in the
// workflow spec. We enter this explicit phase so as to ensure we do not attempt to repeatedly clean up old nodes
// when handling a workflow event which might yield to seemingly random failures. This phase ensure we are handling,
// and only so, the failure node until it's done executing or it fails itself.
// If a failure node fails to execute (a real possibility), the final failure output of the workflow will only include
// its failure reason. In other words, its failure will mask the original failure for the workflow. It's imperative
// failure nodes should be very simple, very resilient and very well tested.
WorkflowPhaseHandlingFailureNode
)

func (p WorkflowPhase) String() string {
Expand All @@ -117,6 +126,8 @@ func (p WorkflowPhase) String() string {
return "Succeeding"
case WorkflowPhaseAborted:
return "Aborted"
case WorkflowPhaseHandlingFailureNode:
return "HandlingFailureNode"
}
return "Unknown"
}
Expand Down Expand Up @@ -198,12 +209,14 @@ type ExecutableBranchNode interface {

type ExecutableWorkflowNodeStatus interface {
GetWorkflowNodePhase() WorkflowNodePhase
GetExecutionError() *core.ExecutionError
}

type MutableWorkflowNodeStatus interface {
Mutable
ExecutableWorkflowNodeStatus
SetWorkflowNodePhase(phase WorkflowNodePhase)
SetExecutionError(executionError *core.ExecutionError)
}

type Mutable interface {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,19 @@ const (

type WorkflowNodeStatus struct {
MutableStruct
Phase WorkflowNodePhase `json:"phase"`
Phase WorkflowNodePhase `json:"phase"`
ExecutionError *core.ExecutionError `json:"executionError"`
}

func (in *WorkflowNodeStatus) SetExecutionError(executionError *core.ExecutionError) {
if in.ExecutionError != executionError {
in.SetDirty()
in.ExecutionError = executionError
}
}

func (in *WorkflowNodeStatus) GetExecutionError() *core.ExecutionError {
return in.ExecutionError
}

func (in *WorkflowNodeStatus) GetWorkflowNodePhase() WorkflowNodePhase {
Expand Down
Loading

0 comments on commit 5e69329

Please sign in to comment.