Skip to content

Commit

Permalink
adding cleanupOnFailure to PhaseInfo (flyteorg#333)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 20, 2023
1 parent 46796b6 commit 84030bd
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
49 changes: 33 additions & 16 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ type PhaseInfo struct {
err *core.ExecutionError
// reason why the current phase exists.
reason string
// cleanupOnFailure indicates that this task should be cleaned up even though the phase indicates a failure. This
// applies to situations where a task is marked a failure but is still running, for example an ImagePullBackoff in
// a k8s Pod where the image does not exist will continually reattempt the pull even though it will never succeed.
cleanupOnFailure bool
}

func (p PhaseInfo) Phase() Phase {
Expand All @@ -139,6 +143,10 @@ func (p PhaseInfo) Err() *core.ExecutionError {
return p.err
}

func (p PhaseInfo) CleanupOnFailure() bool {
return p.cleanupOnFailure
}

func (p PhaseInfo) WithVersion(version uint32) PhaseInfo {
return PhaseInfo{
phase: p.phase,
Expand All @@ -159,7 +167,7 @@ func (p PhaseInfo) String() string {
// PhaseInfoUndefined should be used when the Phase is unknown usually associated with an error
var PhaseInfoUndefined = PhaseInfo{phase: PhaseUndefined}

func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) PhaseInfo {
func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo, cleanupOnFailure bool) PhaseInfo {
if info == nil {
info = &TaskInfo{}
}
Expand All @@ -168,69 +176,74 @@ func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) Phas
info.OccurredAt = &t
}
return PhaseInfo{
phase: p,
version: v,
info: info,
err: err,
phase: p,
version: v,
info: info,
err: err,
cleanupOnFailure: cleanupOnFailure,
}
}

// Return in the case the plugin is not ready to start
func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t})
pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
return pi
}

// Deprecated: Please use PhaseInfoWaitingForResourcesInfo instead
func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, &TaskInfo{OccurredAt: &t})
pi := phaseInfo(PhaseWaitingForResources, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
return pi
}

// Return in the case the plugin is not ready to start
func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, info)
pi := phaseInfo(PhaseWaitingForResources, version, nil, info, false)
pi.reason = reason
return pi
}

func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseQueued, version, nil, &TaskInfo{OccurredAt: &t})
pi := phaseInfo(PhaseQueued, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
return pi
}

func PhaseInfoQueuedWithTaskInfo(version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseQueued, version, nil, info)
pi := phaseInfo(PhaseQueued, version, nil, info, false)
pi.reason = reason
return pi
}

func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {

pi := phaseInfo(PhaseInitializing, version, nil, info)
pi := phaseInfo(PhaseInitializing, version, nil, info, false)
pi.reason = reason
return pi
}

func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo {
func phaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo, cleanupOnFailure bool) PhaseInfo {
if err == nil {
err = &core.ExecutionError{
Code: "Unknown",
Message: "Unknown error message",
}
}
return phaseInfo(p, DefaultPhaseVersion, err, info)
return phaseInfo(p, DefaultPhaseVersion, err, info, cleanupOnFailure)
}

func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo {
return phaseInfo(p, DefaultPhaseVersion, err, info, false)
}

func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseRunning, version, nil, info)
return phaseInfo(PhaseRunning, version, nil, info, false)
}

func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info)
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info, false)
}

func PhaseInfoSystemFailure(code, reason string, info *TaskInfo) PhaseInfo {
Expand All @@ -245,11 +258,15 @@ func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info)
}

func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo {
return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, true)
}

func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info)
}

// Creates a new PhaseInfo with phase set to PhaseWaitingForCache
func PhaseInfoWaitingForCache(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseWaitingForCache, version, nil, info)
return phaseInfo(PhaseWaitingForCache, version, nil, info, false)
}
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {

case "ImagePullBackOff":
t := c.LastTransitionTime.Time
return pluginsCore.PhaseInfoRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
default:
Expand Down

0 comments on commit 84030bd

Please sign in to comment.