diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index ba43d1ba77..b69f098ea5 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -10,6 +10,7 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/events" eventsErr "github.com/flyteorg/flytepropeller/events/errors" @@ -286,10 +287,25 @@ func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionC s := nl.GetNodeExecutionStatus(ctx, currentNodeID) - // a node is not considered interruptible if the system failures have exceeded the configured threshold - if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { - interruptible = false - c.metrics.InterruptedThresholdHit.Inc(ctx) + if config.GetConfig().NodeConfig.IgnoreRetryCause { + // For the unified retry behavior we execute the last interruptibleFailureThreshold attempts on a non + // interruptible machine + currentAttempt := s.GetAttempts() + 1 + s.GetSystemFailures() + maxAttempts := uint32(config.GetConfig().NodeConfig.DefaultMaxAttempts) + if n.GetRetryStrategy() != nil && n.GetRetryStrategy().MinAttempts != nil && *n.GetRetryStrategy().MinAttempts != 0 { + maxAttempts = uint32(*n.GetRetryStrategy().MinAttempts) + } + + if interruptible && currentAttempt >= maxAttempts-c.interruptibleFailureThreshold { + interruptible = false + c.metrics.InterruptedThresholdHit.Inc(ctx) + } + } else { + // Else a node is not considered interruptible if the system failures have exceeded the configured threshold + if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { + interruptible = false + c.metrics.InterruptedThresholdHit.Inc(ctx) + } } rawOutputPrefix := c.defaultDataSandbox