Skip to content

Commit

Permalink
Implement workflow execution recovery (flyteorg#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Jul 20, 2021
1 parent dc5b435 commit 59069d2
Show file tree
Hide file tree
Showing 61 changed files with 1,269 additions and 760 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.19.5
github.com/flyteorg/flyteidl v0.19.14
github.com/flyteorg/flyteplugins v0.5.59
github.com/flyteorg/flytestdlib v0.3.27
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.5 h1:qNhNK6mhCTuOms7zJmBtog6bLQJhBj+iScf1IlHdqeg=
github.com/flyteorg/flyteidl v0.19.5/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.14 h1:OLg2eT9uYllcfMMjEZJoXQ+2WXcrNbUxD+yaCrz2AlI=
github.com/flyteorg/flyteidl v0.19.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.59 h1:Uw1xlrlx5rSTpdTMwJTo7mbqHI7X7p7CFVm3473iRjo=
github.com/flyteorg/flyteplugins v0.5.59/go.mod h1:nesnW7pJhXEysFQg9TnSp36ao33ie0oA/TI4sYPaeyw=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
Expand Down
6 changes: 5 additions & 1 deletion pkg/apis/flyteworkflow/v1alpha1/execution_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v1alpha1

import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

// This contains an OutputLocationPrefix. When running against AWS, this should be something of the form
// s3://my-bucket, or s3://my-bucket/ A sharding string will automatically be appended to this prefix before
Expand All @@ -20,6 +22,8 @@ type ExecutionConfig struct {
TaskPluginImpls map[string]TaskPluginOverride
// Can be used to control the number of parallel nodes to run within the workflow. This is useful to achieve fairness.
MaxParallelism uint32
// Defines execution behavior for processing nodes.
RecoveryExecution WorkflowExecutionIdentifier
}

type TaskPluginOverride struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
NodePhaseTimingOut
NodePhaseTimedOut
NodePhaseDynamicRunning
NodePhaseRecovered
)

func (p NodePhase) String() string {
Expand Down Expand Up @@ -92,6 +93,8 @@ func (p NodePhase) String() string {
return "RetryableFailure"
case NodePhaseDynamicRunning:
return "DynamicRunning"
case NodePhaseRecovered:
return "NodePhaseRecovered"
}

return "Unknown"
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (in *NodeStatus) GetMessage() string {
}

func IsPhaseTerminal(phase NodePhase) bool {
return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut
return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut || phase == NodePhaseRecovered
}

func (in *NodeStatus) GetOrCreateTaskStatus() MutableTaskNodeStatus {
Expand Down Expand Up @@ -576,7 +576,7 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe
}

func (in *NodeStatus) IsTerminated() bool {
return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded
return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded || in.GetPhase() == NodePhaseRecovered
}

func (in *NodeStatus) GetDataDir() DataReference {
Expand Down
68 changes: 34 additions & 34 deletions pkg/compiler/common/mocks/workflow_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ tasks:
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
image: myflytecontainer:abc123
resources: {}
id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_10_simple.json
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_3.json
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_4.json
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_5.json
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_6.json
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_7_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_8_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_9_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
9 changes: 9 additions & 0 deletions pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 59069d2

Please sign in to comment.