From 76c7f764f45767f394c049cec3e439945bc6866f Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 5 Dec 2024 13:22:11 -0600 Subject: [PATCH] Support ArrayNode subNode timeouts (#6054) * storing delta timestamps to set subnode lastattemptstartedat Signed-off-by: Daniel Rammer * remove unnecessary print Signed-off-by: Daniel Rammer * tests passing - still need to add validation Signed-off-by: Daniel Rammer * added unit test Signed-off-by: Daniel Rammer * fixed linter Signed-off-by: Daniel Rammer * make generate Signed-off-by: Daniel Rammer * only setting timestamps if SubNodeDeltaTimestamps has been initialized Signed-off-by: Daniel Rammer * correctly checking ItemsCount Signed-off-by: Daniel Rammer * validating SubNodeDeltaTimestamps based on underlying BitSet Signed-off-by: Daniel Rammer * tmp update to strip timeouts and retries for ArrayNode Signed-off-by: Daniel Rammer * removed tmp ArrayNode NodeSpec field overrides Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- .../pkg/apis/flyteworkflow/v1alpha1/iface.go | 2 + .../mocks/ExecutableArrayNodeStatus.go | 32 +++++++ .../v1alpha1/mocks/MutableArrayNodeStatus.go | 37 ++++++++ .../flyteworkflow/v1alpha1/node_status.go | 26 ++++-- .../pkg/controller/nodes/array/handler.go | 32 +++++++ .../controller/nodes/array/handler_test.go | 93 ++++++++++++++----- .../pkg/controller/nodes/handler/state.go | 15 +-- .../controller/nodes/node_state_manager.go | 5 + .../pkg/controller/nodes/transformers.go | 1 + 9 files changed, 208 insertions(+), 35 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index 486ac35a16..c2022dea25 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -290,6 +290,7 @@ type ExecutableArrayNodeStatus interface { GetSubNodeTaskPhases() bitarray.CompactArray GetSubNodeRetryAttempts() bitarray.CompactArray GetSubNodeSystemFailures() bitarray.CompactArray + GetSubNodeDeltaTimestamps() bitarray.CompactArray GetTaskPhaseVersion() uint32 } @@ -302,6 +303,7 @@ type MutableArrayNodeStatus interface { SetSubNodeTaskPhases(subNodeTaskPhases bitarray.CompactArray) SetSubNodeRetryAttempts(subNodeRetryAttempts bitarray.CompactArray) SetSubNodeSystemFailures(subNodeSystemFailures bitarray.CompactArray) + SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) SetTaskPhaseVersion(taskPhaseVersion uint32) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go index f4cce3e643..4aee51f044 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go @@ -82,6 +82,38 @@ func (_m *ExecutableArrayNodeStatus) GetExecutionError() *core.ExecutionError { return r0 } +type ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps struct { + *mock.Call +} + +func (_m ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps) Return(_a0 bitarray.CompactArray) *ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + return &ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableArrayNodeStatus) OnGetSubNodeDeltaTimestamps() *ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps") + return &ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +func (_m *ExecutableArrayNodeStatus) OnGetSubNodeDeltaTimestampsMatch(matchers ...interface{}) *ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps", matchers...) + return &ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +// GetSubNodeDeltaTimestamps provides a mock function with given fields: +func (_m *ExecutableArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { + ret := _m.Called() + + var r0 bitarray.CompactArray + if rf, ok := ret.Get(0).(func() bitarray.CompactArray); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bitarray.CompactArray) + } + + return r0 +} + type ExecutableArrayNodeStatus_GetSubNodePhases struct { *mock.Call } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go index c20f80e349..1e081e20ba 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go @@ -82,6 +82,38 @@ func (_m *MutableArrayNodeStatus) GetExecutionError() *core.ExecutionError { return r0 } +type MutableArrayNodeStatus_GetSubNodeDeltaTimestamps struct { + *mock.Call +} + +func (_m MutableArrayNodeStatus_GetSubNodeDeltaTimestamps) Return(_a0 bitarray.CompactArray) *MutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + return &MutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: _m.Call.Return(_a0)} +} + +func (_m *MutableArrayNodeStatus) OnGetSubNodeDeltaTimestamps() *MutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps") + return &MutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +func (_m *MutableArrayNodeStatus) OnGetSubNodeDeltaTimestampsMatch(matchers ...interface{}) *MutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps", matchers...) + return &MutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +// GetSubNodeDeltaTimestamps provides a mock function with given fields: +func (_m *MutableArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { + ret := _m.Called() + + var r0 bitarray.CompactArray + if rf, ok := ret.Get(0).(func() bitarray.CompactArray); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bitarray.CompactArray) + } + + return r0 +} + type MutableArrayNodeStatus_GetSubNodePhases struct { *mock.Call } @@ -284,6 +316,11 @@ func (_m *MutableArrayNodeStatus) SetExecutionError(executionError *core.Executi _m.Called(executionError) } +// SetSubNodeDeltaTimestamps provides a mock function with given fields: subNodeDeltaTimestamps +func (_m *MutableArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) { + _m.Called(subNodeDeltaTimestamps) +} + // SetSubNodePhases provides a mock function with given fields: subNodePhases func (_m *MutableArrayNodeStatus) SetSubNodePhases(subNodePhases bitarray.CompactArray) { _m.Called(subNodePhases) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 218b045588..c27a8560fc 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -230,13 +230,14 @@ const ( type ArrayNodeStatus struct { MutableStruct - Phase ArrayNodePhase `json:"phase,omitempty"` - ExecutionError *core.ExecutionError `json:"executionError,omitempty"` - SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"` - SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` - SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` - SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"` - TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"` + Phase ArrayNodePhase `json:"phase,omitempty"` + ExecutionError *core.ExecutionError `json:"executionError,omitempty"` + SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"` + SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` + SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` + SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"` + SubNodeDeltaTimestamps bitarray.CompactArray `json:"subtimestamps,omitempty"` + TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"` } func (in *ArrayNodeStatus) GetArrayNodePhase() ArrayNodePhase { @@ -305,6 +306,17 @@ func (in *ArrayNodeStatus) SetSubNodeSystemFailures(subNodeSystemFailures bitarr } } +func (in *ArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { + return in.SubNodeDeltaTimestamps +} + +func (in *ArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) { + if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps { + in.SetDirty() + in.SubNodeDeltaTimestamps = subNodeDeltaTimestamps + } +} + func (in *ArrayNodeStatus) GetTaskPhaseVersion() uint32 { return in.TaskPhaseVersion } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 17f49adcd3..51d3105a0a 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -5,6 +5,9 @@ import ( "fmt" "math" "strconv" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" @@ -28,6 +31,11 @@ import ( "github.com/flyteorg/flyte/flytestdlib/storage" ) +const ( + // value is 3 days of seconds which is covered by 18 bits (262144) + MAX_DELTA_TIMESTAMP = 259200 +) + var ( nilLiteral = &idlcore.Literal{ Value: &idlcore.Literal_Scalar{ @@ -254,6 +262,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: MAX_DELTA_TIMESTAMP}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 @@ -380,6 +389,20 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures())) + if arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil { + startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt() + subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt() + if subNodeStartedAt == nil { + // subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has + // been reset (ex. retryable failure). in both cases we set the delta timestamp to 0 + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0) + } else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 { + // otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it + deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds()) + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration) + } + } + // increment task phase version if subNode phase or task phase changed if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase { incrementTaskPhaseVersion = true @@ -767,6 +790,14 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter return nil, nil, nil, nil, nil, nil, err } + // compute start time for subNode using delta timestamp from ArrayNode NodeStatus + var startedAt *metav1.Time + if nCtx.NodeStatus().GetLastAttemptStartedAt() != nil && arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil { + if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { + startedAt = &metav1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} // #nosec G115 + } + } + subNodeStatus := &v1alpha1.NodeStatus{ Phase: nodePhase, DataDir: subDataDir, @@ -777,6 +808,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter Phase: taskPhase, PluginState: pluginStateBytes, }, + LastAttemptStartedAt: startedAt, } // initialize mocks diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index a759327423..ac0e4b45ad 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -184,9 +186,15 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte nCtx.OnNodeStateWriter().Return(nodeStateWriter) // NodeStatus + nowMinus := time.Now().Add(time.Duration(-5) * time.Second) + metav1NowMinus := metav1.Time{ + Time: nowMinus, + } nCtx.OnNodeStatus().Return(&v1alpha1.NodeStatus{ - DataDir: storage.DataReference("s3://bucket/data"), - OutputDir: storage.DataReference("s3://bucket/output"), + DataDir: storage.DataReference("s3://bucket/data"), + OutputDir: storage.DataReference("s3://bucket/output"), + LastAttemptStartedAt: &metav1NowMinus, + StartedAt: &metav1NowMinus, }) return nCtx @@ -252,6 +260,7 @@ func TestAbort(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 @@ -348,6 +357,7 @@ func TestFinalize(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) @@ -506,25 +516,27 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } tests := []struct { - name string - parallelism *uint32 - minSuccessRatio *float32 - subNodePhases []v1alpha1.NodePhase - subNodeTaskPhases []core.Phase - subNodeTransitions []handler.Transition - expectedArrayNodePhase v1alpha1.ArrayNodePhase - expectedArrayNodeSubPhases []v1alpha1.NodePhase - expectedTransitionPhase handler.EPhase - expectedExternalResourcePhases []idlcore.TaskExecution_Phase - currentWfParallelism uint32 - maxWfParallelism uint32 - incrementParallelismCount uint32 - useFakeEventRecorder bool - eventRecorderFailures uint32 - eventRecorderError error - expectedTaskPhaseVersion uint32 - expectHandleError bool - expectedEventingCalls int + name string + parallelism *uint32 + minSuccessRatio *float32 + subNodePhases []v1alpha1.NodePhase + subNodeTaskPhases []core.Phase + subNodeDeltaTimestamps []uint64 + subNodeTransitions []handler.Transition + expectedArrayNodePhase v1alpha1.ArrayNodePhase + expectedArrayNodeSubPhases []v1alpha1.NodePhase + expectedDiffArrayNodeSubDeltaTimestamps []bool + expectedTransitionPhase handler.EPhase + expectedExternalResourcePhases []idlcore.TaskExecution_Phase + currentWfParallelism uint32 + maxWfParallelism uint32 + incrementParallelismCount uint32 + useFakeEventRecorder bool + eventRecorderFailures uint32 + eventRecorderError error + expectedTaskPhaseVersion uint32 + expectHandleError bool + expectedEventingCalls int }{ { name: "StartAllSubNodes", @@ -827,6 +839,31 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectHandleError: true, expectedEventingCalls: 1, }, + { + name: "DeltaTimestampUpdates", + parallelism: uint32Ptr(0), + subNodePhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseRunning, + }, + subNodeTaskPhases: []core.Phase{ + core.PhaseUndefined, + core.PhaseUndefined, + }, + subNodeTransitions: []handler.Transition{ + handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), + handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})), + }, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRetryableFailure, + }, + expectedTaskPhaseVersion: 1, + expectedTransitionPhase: handler.EPhaseRunning, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_FAILED}, + incrementParallelismCount: 1, + }, } for _, test := range tests { @@ -858,6 +895,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) @@ -867,6 +905,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { arrayNodeState.SubNodePhases.SetItem(i, bitarray.Item(nodePhase)) // #nosec G115 } + for i, deltaTimestmap := range test.subNodeDeltaTimestamps { + arrayNodeState.SubNodeDeltaTimestamps.SetItem(i, deltaTimestmap) // #nosec G115 + } + nodeSpec := arrayNodeSpec nodeSpec.ArrayNode.Parallelism = test.parallelism nodeSpec.ArrayNode.MinSuccessRatio = test.minSuccessRatio @@ -922,6 +964,14 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { assert.Equal(t, expectedPhase, v1alpha1.NodePhase(arrayNodeState.SubNodePhases.GetItem(i))) // #nosec G115 } + for i, expectedDiffDeltaTimestamps := range test.expectedDiffArrayNodeSubDeltaTimestamps { + if expectedDiffDeltaTimestamps { + assert.NotEqual(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i]) + } else { + assert.Equal(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i]) + } + } + bufferedEventRecorder, ok := eventRecorder.(*bufferedEventRecorder) if ok { if len(test.expectedExternalResourcePhases) > 0 { @@ -1301,6 +1351,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(len(test.subNodePhases)), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) diff --git a/flytepropeller/pkg/controller/nodes/handler/state.go b/flytepropeller/pkg/controller/nodes/handler/state.go index a7fa7bdf87..c3e35e67d7 100644 --- a/flytepropeller/pkg/controller/nodes/handler/state.go +++ b/flytepropeller/pkg/controller/nodes/handler/state.go @@ -48,11 +48,12 @@ type GateNodeState struct { } type ArrayNodeState struct { - Phase v1alpha1.ArrayNodePhase - TaskPhaseVersion uint32 - Error *core.ExecutionError - SubNodePhases bitarray.CompactArray - SubNodeTaskPhases bitarray.CompactArray - SubNodeRetryAttempts bitarray.CompactArray - SubNodeSystemFailures bitarray.CompactArray + Phase v1alpha1.ArrayNodePhase + TaskPhaseVersion uint32 + Error *core.ExecutionError + SubNodePhases bitarray.CompactArray + SubNodeTaskPhases bitarray.CompactArray + SubNodeRetryAttempts bitarray.CompactArray + SubNodeSystemFailures bitarray.CompactArray + SubNodeDeltaTimestamps bitarray.CompactArray } diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index fd74d107a0..25b0bc55df 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -181,6 +181,11 @@ func (n nodeStateManager) GetArrayNodeState() handler.ArrayNodeState { if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil { as.SubNodeSystemFailures = *subNodeSystemFailuresCopy } + + subNodeDeltaTimestamps := an.GetSubNodeDeltaTimestamps() + if subNodeDeltaTimestampsCopy := subNodeDeltaTimestamps.DeepCopy(); subNodeDeltaTimestampsCopy != nil { + as.SubNodeDeltaTimestamps = *subNodeDeltaTimestampsCopy + } } return as } diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 9d19081cc4..ceeaf5aaec 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -314,6 +314,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.N t.SetSubNodeTaskPhases(na.SubNodeTaskPhases) t.SetSubNodeRetryAttempts(na.SubNodeRetryAttempts) t.SetSubNodeSystemFailures(na.SubNodeSystemFailures) + t.SetSubNodeDeltaTimestamps(na.SubNodeDeltaTimestamps) t.SetTaskPhaseVersion(na.TaskPhaseVersion) } }