Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Implement workflow execution recovery #290

Merged
merged 16 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.19.5
github.com/flyteorg/flyteidl v0.19.14
github.com/flyteorg/flyteplugins v0.5.59
github.com/flyteorg/flytestdlib v0.3.27
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.5 h1:qNhNK6mhCTuOms7zJmBtog6bLQJhBj+iScf1IlHdqeg=
github.com/flyteorg/flyteidl v0.19.5/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.14 h1:OLg2eT9uYllcfMMjEZJoXQ+2WXcrNbUxD+yaCrz2AlI=
github.com/flyteorg/flyteidl v0.19.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.59 h1:Uw1xlrlx5rSTpdTMwJTo7mbqHI7X7p7CFVm3473iRjo=
github.com/flyteorg/flyteplugins v0.5.59/go.mod h1:nesnW7pJhXEysFQg9TnSp36ao33ie0oA/TI4sYPaeyw=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
Expand Down
6 changes: 5 additions & 1 deletion pkg/apis/flyteworkflow/v1alpha1/execution_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v1alpha1

import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

// This contains an OutputLocationPrefix. When running against AWS, this should be something of the form
// s3://my-bucket, or s3://my-bucket/ A sharding string will automatically be appended to this prefix before
Expand All @@ -20,6 +22,8 @@ type ExecutionConfig struct {
TaskPluginImpls map[string]TaskPluginOverride
// Can be used to control the number of parallel nodes to run within the workflow. This is useful to achieve fairness.
MaxParallelism uint32
// Defines execution behavior for processing nodes.
RecoveryExecution WorkflowExecutionIdentifier
}

type TaskPluginOverride struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
NodePhaseTimingOut
NodePhaseTimedOut
NodePhaseDynamicRunning
NodePhaseRecovered
)

func (p NodePhase) String() string {
Expand Down Expand Up @@ -92,6 +93,8 @@ func (p NodePhase) String() string {
return "RetryableFailure"
case NodePhaseDynamicRunning:
return "DynamicRunning"
case NodePhaseRecovered:
return "NodePhaseRecovered"
}

return "Unknown"
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (in *NodeStatus) GetMessage() string {
}

func IsPhaseTerminal(phase NodePhase) bool {
return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut
return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut || phase == NodePhaseRecovered
}

func (in *NodeStatus) GetOrCreateTaskStatus() MutableTaskNodeStatus {
Expand Down Expand Up @@ -576,7 +576,7 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe
}

func (in *NodeStatus) IsTerminated() bool {
return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded
return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded || in.GetPhase() == NodePhaseRecovered
}

func (in *NodeStatus) GetDataDir() DataReference {
Expand Down
68 changes: 34 additions & 34 deletions pkg/compiler/common/mocks/workflow_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ tasks:
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
image: myflytecontainer:abc123
resources: {}
id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_3.json
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_4.json
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_5.json
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
3 changes: 2 additions & 1 deletion pkg/compiler/test/testdata/branch/k8s/success_6.json
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@
"rawOutputDataConfig": {},
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0
"MaxParallelism": 0,
"RecoveryExecution": {}
}
}
9 changes: 9 additions & 0 deletions pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading