diff --git a/flyteplugins/go/tasks/pluginmachinery/core/phase.go b/flyteplugins/go/tasks/pluginmachinery/core/phase.go index 7847b161fc..6c80cc4d24 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/phase.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/phase.go @@ -261,15 +261,15 @@ func PhaseInfoSuccess(info *TaskInfo) PhaseInfo { } func PhaseInfoSystemFailure(code, reason string, info *TaskInfo) PhaseInfo { - return PhaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info) + return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, false) } func PhaseInfoSystemFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo { - return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, true) + return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, true) } func PhaseInfoFailure(code, reason string, info *TaskInfo) PhaseInfo { - return PhaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info) + return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, false) } func PhaseInfoFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo { @@ -277,7 +277,7 @@ func PhaseInfoFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo } func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo { - return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info) + return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, false) } func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo { @@ -285,7 +285,11 @@ func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) P } func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo { - return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info) + return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, false) +} + +func PhaseInfoSystemRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo { + return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, true) } // Creates a new PhaseInfo with phase set to PhaseWaitingForCache diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 2f3447ad0e..2a17751c62 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -686,7 +686,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration if time.Since(t) >= gracePeriod { - return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ + return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, }), t } @@ -700,7 +700,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti case "CreateContainerConfigError": gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration if time.Since(t) >= gracePeriod { - return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ + return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, }), t } @@ -712,7 +712,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti ), t case "InvalidImageName": - return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ + return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, }), t @@ -737,7 +737,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti // by K8s and we get elusive 'pod not found' errors // So be default if the container is not waiting with the PodInitializing/ContainerCreating // reasons, then we will assume a failure reason, and fail instantly - return pluginsCore.PhaseInfoSystemRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ + return pluginsCore.PhaseInfoSystemRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, }), t } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 925cb00186..b5a51323d2 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -1351,6 +1351,7 @@ func TestDemystifyPending(t *testing.T) { taskStatus, err := DemystifyPending(s2) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) }) t.Run("InvalidImageName", func(t *testing.T) { @@ -1368,6 +1369,7 @@ func TestDemystifyPending(t *testing.T) { taskStatus, err := DemystifyPending(s) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) }) t.Run("RegistryUnavailable", func(t *testing.T) { @@ -1385,6 +1387,7 @@ func TestDemystifyPending(t *testing.T) { taskStatus, err := DemystifyPending(s) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) }) t.Run("RandomError", func(t *testing.T) { @@ -1402,6 +1405,7 @@ func TestDemystifyPending(t *testing.T) { taskStatus, err := DemystifyPending(s) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) }) t.Run("CreateContainerConfigErrorWithinGracePeriod", func(t *testing.T) { @@ -1440,6 +1444,7 @@ func TestDemystifyPending(t *testing.T) { taskStatus, err := DemystifyPending(s2) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) }) t.Run("CreateContainerErrorWithinGracePeriod", func(t *testing.T) { @@ -1478,6 +1483,7 @@ func TestDemystifyPending(t *testing.T) { taskStatus, err := DemystifyPending(s2) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) }) } @@ -1510,6 +1516,7 @@ func TestDemystifyPendingTimeout(t *testing.T) { assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) assert.Equal(t, "PodPendingTimeout", taskStatus.Err().Code) + assert.True(t, taskStatus.CleanupOnFailure()) }) }