diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index f1a33444260..6590aaa04a9 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -318,7 +318,7 @@ type MutableNodeStatus interface { SetOutputDir(d DataReference) SetParentNodeID(n *NodeID) SetParentTaskID(t *core.TaskExecutionIdentifier) - UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) + UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) IncrementAttempts() uint32 IncrementSystemFailures() uint32 SetCached() diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go index e6d2309df16..cdf3f1b6ab2 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go @@ -1187,9 +1187,9 @@ func (_m *ExecutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnAnyTermination, err -func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, clearStateOnAnyTermination, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, enableCRDebugMetadata, err +func (_m *ExecutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, enableCRDebugMetadata, err) } // VisitNodeStatuses provides a mock function with given fields: visitor diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go index d6bdd0510ea..3f103bc2ec5 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go @@ -587,7 +587,7 @@ func (_m *MutableNodeStatus) SetParentTaskID(t *core.TaskExecutionIdentifier) { _m.Called(t) } -// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, clearStateOnAnyTermination, err -func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) { - _m.Called(phase, occurredAt, reason, clearStateOnAnyTermination, err) +// UpdatePhase provides a mock function with given fields: phase, occurredAt, reason, enableCRDebugMetadata, err +func (_m *MutableNodeStatus) UpdatePhase(phase v1alpha1.NodePhase, occurredAt v1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) { + _m.Called(phase, occurredAt, reason, enableCRDebugMetadata, err) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index e4fc8bc41a1..aab034224d7 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -594,7 +594,7 @@ func (in *NodeStatus) GetOrCreateArrayNodeStatus() MutableArrayNodeStatus { return in.ArrayNodeStatus } -func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, clearStateOnAnyTermination bool, err *core.ExecutionError) { +func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) { if in.Phase == p && in.Message == reason { // We will not update the phase multiple times. This prevents the comparison from returning false positive return @@ -629,8 +629,8 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st if in.StoppedAt == nil { in.StoppedAt = &n } - if p == NodePhaseSucceeded || p == NodePhaseSkipped || clearStateOnAnyTermination { - // Clear most status related fields after reaching a terminal state. This keeps the CRD state small to avoid + if p == NodePhaseSucceeded || p == NodePhaseSkipped || !enableCRDebugMetadata { + // Clear most status related fields after reaching a terminal state. This keeps the CR state small to avoid // etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further. in.Message = "" in.QueuedAt = nil diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index 63d8b056e79..0278d62f55a 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -260,7 +260,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { const queued = "queued" const success = "success" - for _, clearStateOnAnyTermination := range []bool{false, true} { + for _, enableCRDebugMetadata := range []bool{false, true} { t.Run("identical-phase", func(t *testing.T) { p := NodePhaseQueued ns := NodeStatus{ @@ -268,7 +268,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { Message: queued, } msg := queued - ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) assert.Nil(t, ns.QueuedAt) }) @@ -276,7 +276,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseQueued ns := NodeStatus{} msg := queued - ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, metav1.NewTime(time.Time{}), msg, enableCRDebugMetadata, nil) assert.NotNil(t, ns.QueuedAt) }) @@ -284,7 +284,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseQueued msg := queued - ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Equal(t, *ns.QueuedAt, n) @@ -300,7 +300,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseRunning msg := "running" - ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) @@ -316,7 +316,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseTimingOut msg := "timing-out" - ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) @@ -332,7 +332,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -348,7 +348,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { ns := NodeStatus{} p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -374,7 +374,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { } p := NodePhaseSucceeded msg := success - ns.UpdatePhase(p, n, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n, msg, enableCRDebugMetadata, nil) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -406,7 +406,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { n2 := metav1.NewTime(time.Now()) p := NodePhaseRunning msg := "running" - ns.UpdatePhase(p, n2, msg, clearStateOnAnyTermination, nil) + ns.UpdatePhase(p, n2, msg, enableCRDebugMetadata, nil) assert.Equal(t, *ns.LastUpdatedAt, n2) assert.Equal(t, *ns.QueuedAt, n) @@ -429,7 +429,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseFailed msg := "failed" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, false, err) + ns.UpdatePhase(p, n, msg, true, err) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) @@ -446,7 +446,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseFailed msg := "failed" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, true, err) + ns.UpdatePhase(p, n, msg, false, err) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) @@ -463,7 +463,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseTimedOut msg := "tm" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, false, err) + ns.UpdatePhase(p, n, msg, true, err) assert.Equal(t, *ns.LastUpdatedAt, n) assert.Nil(t, ns.QueuedAt) @@ -480,7 +480,7 @@ func TestNodeStatus_UpdatePhase(t *testing.T) { p := NodePhaseTimedOut msg := "tm" err := &core.ExecutionError{} - ns.UpdatePhase(p, n, msg, true, err) + ns.UpdatePhase(p, n, msg, false, err) assert.Nil(t, ns.LastUpdatedAt) assert.Nil(t, ns.QueuedAt) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index dc4d135e738..d1afa7f6513 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -98,7 +98,7 @@ var ( InterruptibleFailureThreshold: -1, DefaultMaxAttempts: 1, IgnoreRetryCause: false, - ClearStateOnAnyTermination: false, + EnableCRDebugMetadata: false, }, MaxStreakLength: 8, // Turbo mode is enabled by default ProfilerPort: config.Port{ @@ -212,7 +212,7 @@ type NodeConfig struct { InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"` DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` - ClearStateOnAnyTermination bool `json:"clear-state-on-any-termination" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."` + EnableCRDebugMetadata bool `json:"clear-state-on-any-termination" pflag:",Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd."` } // DefaultDeadlines contains default values for timeouts diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index e8b03cf3767..1e34b5a9278 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -96,7 +96,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'") cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.default-max-attempts"), defaultConfig.NodeConfig.DefaultMaxAttempts, "Default maximum number of attempts for a node") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.ignore-retry-cause"), defaultConfig.NodeConfig.IgnoreRetryCause, "Ignore retry cause and count all attempts toward a node's max attempts") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.clear-state-on-any-termination"), defaultConfig.NodeConfig.ClearStateOnAnyTermination, "Collapse node on any terminal state") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.clear-state-on-any-termination"), defaultConfig.NodeConfig.EnableCRDebugMetadata, "Collapse node on any terminal state, not just successful terminations. This is useful to reduce the size of workflow state in etcd.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index d083ce02228..9c1075ad83a 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -750,7 +750,7 @@ func TestConfig_SetFlags(t *testing.T) { cmdFlags.Set("node-config.clear-state-on-any-termination", testValue) if vBool, err := cmdFlags.GetBool("node-config.clear-state-on-any-termination"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.ClearStateOnAnyTermination) + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.EnableCRDebugMetadata) } else { assert.FailNow(t, err.Error()) diff --git a/flytepropeller/pkg/controller/nodes/branch/evaluator.go b/flytepropeller/pkg/controller/nodes/branch/evaluator.go index ee58570d97e..4bc16767455 100644 --- a/flytepropeller/pkg/controller/nodes/branch/evaluator.go +++ b/flytepropeller/pkg/controller/nodes/branch/evaluator.go @@ -129,9 +129,9 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1. } nStatus := nl.GetNodeExecutionStatus(ctx, n.GetID()) logger.Infof(ctx, "Branch Setting Node[%v] status to Skipped!", skippedNodeID) - // We hard code clearStateOnAnyTermination=false because it has no effect when setting phase to - // NodePhaseSkipped. This saves us passing the config all the way down from the nodeExecutor - nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", false, nil) + // We hard code enableCRDebugMetadata=true because it has no effect when setting phase to + // NodePhaseSkipped. This saves us passing the config all the way down from the nodeExecutor. + nStatus.UpdatePhase(v1alpha1.NodePhaseSkipped, v1.Now(), "Branch evaluated to false", true, nil) } if selectedNodeID == nil { diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index f897ea8693a..53506cb9411 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -475,7 +475,7 @@ func (c *recursiveNodeExecutor) WithNodeExecutionContextBuilder(nCtxBuilder inte type nodeExecutor struct { catalog catalog.Client clusterID string - clearStateOnAnyTermination bool + enableCRDebugMetadata bool defaultActiveDeadline time.Duration defaultDataSandbox storage.DataReference defaultExecutionDeadline time.Duration @@ -1006,7 +1006,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error()) return interfaces.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnAnyTermination) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.enableCRDebugMetadata) c.RecordTransitionLatency(ctx, dag, nCtx.ContextualNodeLookup(), nCtx.Node(), nodeStatus) } @@ -1272,7 +1272,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter } } - UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.clearStateOnAnyTermination) + UpdateNodeStatus(np, p, nCtx.NodeStateReader(), nodeStatus, c.enableCRDebugMetadata) return finalStatus, nil } @@ -1286,7 +1286,7 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfac // NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state // Attempt is used throughout the system to determine the idempotent resource version. nodeStatus.IncrementAttempts() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.clearStateOnAnyTermination, nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", c.enableCRDebugMetadata, nil) // We are going to retry in the next round, so we should clear all current state nodeStatus.ClearSubNodeStatus() nodeStatus.ClearTaskStatus() @@ -1331,7 +1331,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur if startedAt == nil { startedAt = &t } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.clearStateOnAnyTermination, nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -1345,7 +1345,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur return interfaces.NodeStatusUndefined, err } - nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.clearStateOnAnyTermination, nodeStatus.GetExecutionError()) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError()) c.metrics.TimedOutFailure.Inc(ctx) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) @@ -1369,7 +1369,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur stopped = &t } c.metrics.SuccessDuration.Observe(ctx, started.Time, stopped.Time) - nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.clearStateOnAnyTermination, nil) + nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, t, "completed successfully", c.enableCRDebugMetadata, nil) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } @@ -1436,7 +1436,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora nodeExecutor := &nodeExecutor{ catalog: catalogClient, clusterID: clusterID, - clearStateOnAnyTermination: nodeConfig.ClearStateOnAnyTermination, + enableCRDebugMetadata: nodeConfig.EnableCRDebugMetadata, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, defaultDataSandbox: defaultRawOutputPrefix, defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 70ab7fbc883..2bc552bab03 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -659,21 +659,21 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { } tests := []struct { - name string - currentNodePhase v1alpha1.NodePhase - parentNodePhase v1alpha1.NodePhase - clearStateOnAnyTermination bool - expectedNodePhase v1alpha1.NodePhase - expectedPhase interfaces.NodePhase - expectedError bool - updateCalled bool + name string + currentNodePhase v1alpha1.NodePhase + parentNodePhase v1alpha1.NodePhase + enableCRDebugMetadata bool + expectedNodePhase v1alpha1.NodePhase + expectedPhase interfaces.NodePhase + expectedError bool + updateCalled bool }{ {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, false, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, {"notYetStarted->skipped", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, false, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, {"notYetStarted->queued", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, false, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, - {"notYetStarted->skipped clearStateOnAnyTermination", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, - {"notYetStarted->skipped clearStateOnAnyTermination", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, - {"notYetStarted->queued clearStateOnAnyTermination", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, true, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, + {"notYetStarted->skipped enableCRDebugMetadata", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseFailed, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseFailed, false, false}, + {"notYetStarted->skipped enableCRDebugMetadata", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSkipped, true, v1alpha1.NodePhaseSkipped, interfaces.NodePhaseSuccess, false, true}, + {"notYetStarted->queued enableCRDebugMetadata", v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseSucceeded, true, v1alpha1.NodePhaseQueued, interfaces.NodePhasePending, false, true}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -688,13 +688,13 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { h.OnFinalizeRequired().Return(false) hf.OnGetHandler(v1alpha1.NodeKindTask).Return(h, nil) - mockWf, _ := setupNodePhase(test.parentNodePhase, test.currentNodePhase, test.expectedNodePhase, test.clearStateOnAnyTermination) + mockWf, _ := setupNodePhase(test.parentNodePhase, test.currentNodePhase, test.expectedNodePhase, test.enableCRDebugMetadata) startNode := mockWf.StartNode() store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() nodeConfig := config.GetConfig().NodeConfig - nodeConfig.ClearStateOnAnyTermination = test.clearStateOnAnyTermination + nodeConfig.EnableCRDebugMetadata = test.enableCRDebugMetadata execIface, err := NewExecutor(ctx, nodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index d4e8922c560..e341a0d2a14 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -228,10 +228,10 @@ func ToK8sTime(t time.Time) v1.Time { return v1.Time{Time: t} } -func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus, clearStateOnAnyTermination bool) { +func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.NodeStateReader, s v1alpha1.ExecutableNodeStatus, enableCRDebugMetadata bool) { // We update the phase and / or reason only if they are not already updated if np != s.GetPhase() || p.GetReason() != s.GetMessage() { - s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), clearStateOnAnyTermination, p.GetErr()) + s.UpdatePhase(np, ToK8sTime(p.GetOccurredAt()), p.GetReason(), enableCRDebugMetadata, p.GetErr()) } // Update TaskStatus if n.HasTaskNodeState() {