Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Added support for aborting task nodes reported as failures (#541)
Browse files Browse the repository at this point in the history
* added CleanupOnFailure support for TaskNodeStatus to support aborting failed task nodes

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins and generated

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 28, 2023
1 parent dd49c9d commit 0025cbe
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteplugins v1.0.49
github.com/flyteorg/flyteplugins v1.0.52
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.49 h1:lUmT4kqYamkJY2tO6nCWRCnVv2M2QNLIap5bFYAol7s=
github.com/flyteorg/flyteplugins v1.0.49/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flyteplugins v1.0.52 h1:AWNrRYgm0bCzOws+bIfJDfPBZqBmTdABxW78r8q3kP4=
github.com/flyteorg/flyteplugins v1.0.52/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ type ExecutableTaskNodeStatus interface {
GetBarrierClockTick() uint32
GetLastPhaseUpdatedAt() time.Time
GetPreviousNodeExecutionCheckpointPath() DataReference
GetCleanupOnFailure() bool
}

type MutableTaskNodeStatus interface {
Expand All @@ -371,6 +372,7 @@ type MutableTaskNodeStatus interface {
SetPluginStateVersion(uint32)
SetBarrierClockTick(tick uint32)
SetPreviousNodeExecutionCheckpointPath(DataReference)
SetCleanupOnFailure(bool)
}

// ExecutableWorkflowNode is an interface for a Child Workflow Node
Expand Down
32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableTaskNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MutableTaskNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ type TaskNodeStatus struct {
BarrierClockTick uint32 `json:"tick,omitempty"`
LastPhaseUpdatedAt time.Time `json:"updAt,omitempty"`
PreviousNodeExecutionCheckpointPath DataReference `json:"checkpointPath,omitempty"`
CleanupOnFailure bool `json:"clean,omitempty"`
}

func (in *TaskNodeStatus) GetBarrierClockTick() uint32 {
Expand Down Expand Up @@ -795,6 +796,11 @@ func (in *TaskNodeStatus) SetPluginStateVersion(v uint32) {
in.SetDirty()
}

func (in *TaskNodeStatus) SetCleanupOnFailure(cleanupOnFailure bool) {
in.CleanupOnFailure = cleanupOnFailure
in.SetDirty()
}

func (in *TaskNodeStatus) GetPluginState() []byte {
return in.PluginState
}
Expand Down Expand Up @@ -829,6 +835,10 @@ func (in TaskNodeStatus) GetPhaseVersion() uint32 {
return in.PhaseVersion
}

func (in TaskNodeStatus) GetCleanupOnFailure() bool {
return in.CleanupOnFailure
}

func (in *TaskNodeStatus) UpdatePhase(phase int, phaseVersion uint32) {
if in.Phase != phase || in.PhaseVersion != phaseVersion {
in.SetDirty()
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type TaskNodeState struct {
PluginStateVersion uint32
LastPhaseUpdatedAt time.Time
PreviousNodeExecutionCheckpointURI storage.DataReference
CleanupOnFailure bool
}

type BranchNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginState: tn.GetPluginState(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
PreviousNodeExecutionCheckpointURI: tn.GetPreviousNodeExecutionCheckpointPath(),
CleanupOnFailure: tn.GetCleanupOnFailure(),
}
}
return handler.TaskNodeState{}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginPhaseVersion: pluginTrns.pInfo.Version(),
LastPhaseUpdatedAt: time.Now(),
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
CleanupOnFailure: ts.CleanupOnFailure || pluginTrns.pInfo.CleanupOnFailure(),
})
if err != nil {
logger.Errorf(ctx, "Failed to store TaskNode state, err :%s", err.Error())
Expand All @@ -761,10 +762,11 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
}

func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, reason string) error {
currentPhase := nCtx.NodeStateReader().GetTaskNodeState().PluginPhase
taskNodeState := nCtx.NodeStateReader().GetTaskNodeState()
currentPhase := taskNodeState.PluginPhase
logger.Debugf(ctx, "Abort invoked with phase [%v]", currentPhase)

if currentPhase.IsTerminal() {
if currentPhase.IsTerminal() && !(currentPhase.IsFailure() && taskNodeState.CleanupOnFailure) {
logger.Debugf(ctx, "Returning immediately from Abort since task is already in terminal phase.", currentPhase)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t.SetPluginState(n.t.PluginState)
t.SetPluginStateVersion(n.t.PluginStateVersion)
t.SetPreviousNodeExecutionCheckpointPath(n.t.PreviousNodeExecutionCheckpointURI)
t.SetCleanupOnFailure(n.t.CleanupOnFailure)
}

// Update dynamic node status
Expand Down

0 comments on commit 0025cbe

Please sign in to comment.