From 790727496cb9d856361da04da8ce880ab361a870 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 27 Nov 2024 14:21:51 -0600 Subject: [PATCH 01/11] storing delta timestamps to set subnode lastattemptstartedat Signed-off-by: Daniel Rammer --- .../pkg/apis/flyteworkflow/v1alpha1/iface.go | 2 ++ .../flyteworkflow/v1alpha1/node_status.go | 26 ++++++++++++++----- .../pkg/controller/nodes/array/handler.go | 24 +++++++++++++++++ .../pkg/controller/nodes/handler/state.go | 15 ++++++----- .../controller/nodes/node_state_manager.go | 7 ++++- .../pkg/controller/nodes/transformers.go | 1 + 6 files changed, 60 insertions(+), 15 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index 486ac35a16..c2022dea25 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -290,6 +290,7 @@ type ExecutableArrayNodeStatus interface { GetSubNodeTaskPhases() bitarray.CompactArray GetSubNodeRetryAttempts() bitarray.CompactArray GetSubNodeSystemFailures() bitarray.CompactArray + GetSubNodeDeltaTimestamps() bitarray.CompactArray GetTaskPhaseVersion() uint32 } @@ -302,6 +303,7 @@ type MutableArrayNodeStatus interface { SetSubNodeTaskPhases(subNodeTaskPhases bitarray.CompactArray) SetSubNodeRetryAttempts(subNodeRetryAttempts bitarray.CompactArray) SetSubNodeSystemFailures(subNodeSystemFailures bitarray.CompactArray) + SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) SetTaskPhaseVersion(taskPhaseVersion uint32) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 218b045588..2e6fde25d3 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -230,13 +230,14 @@ const ( type ArrayNodeStatus struct { MutableStruct - Phase ArrayNodePhase `json:"phase,omitempty"` - ExecutionError *core.ExecutionError `json:"executionError,omitempty"` - SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"` - SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` - SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` - SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"` - TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"` + Phase ArrayNodePhase `json:"phase,omitempty"` + ExecutionError *core.ExecutionError `json:"executionError,omitempty"` + SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"` + SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` + SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` + SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"` + SubNodeDeltaTimestamps bitarray.CompactArray `json: "subtimestamps",omitempty"` + TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"` } func (in *ArrayNodeStatus) GetArrayNodePhase() ArrayNodePhase { @@ -305,6 +306,17 @@ func (in *ArrayNodeStatus) SetSubNodeSystemFailures(subNodeSystemFailures bitarr } } +func (in *ArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { + return in.SubNodeDeltaTimestamps +} + +func (in *ArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) { + if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps { + in.SetDirty() + in.SubNodeDeltaTimestamps = subNodeDeltaTimestamps + } +} + func (in *ArrayNodeStatus) GetTaskPhaseVersion() uint32 { return in.TaskPhaseVersion } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 17f49adcd3..db31f0def4 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -5,6 +5,9 @@ import ( "fmt" "math" "strconv" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" @@ -254,6 +257,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 259200}, // max value is 3 days of seconds which is coverd by 18 bits (262144) } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 @@ -380,6 +384,18 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures())) + startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt() + subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt() + if subNodeStartedAt == nil { + // subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has + // been reset (ex. retryable failure). in both cases we set the delta timestamp to 0 + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0) + } else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 { + // otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it + deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds()) + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration) + } + // increment task phase version if subNode phase or task phase changed if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase { incrementTaskPhaseVersion = true @@ -767,6 +783,13 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter return nil, nil, nil, nil, nil, nil, err } + // compute start time for subNode using delta timestamp from ArrayNode NodeStatus + var startedAt *v1.Time + fmt.Printf("HAMERSAW - retrieving subNodeIndex %d/%d\n", subNodeIndex, arrayNodeState.SubNodeDeltaTimestamps.ItemsCount) + if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { + startedAt = &v1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} + } + subNodeStatus := &v1alpha1.NodeStatus{ Phase: nodePhase, DataDir: subDataDir, @@ -777,6 +800,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter Phase: taskPhase, PluginState: pluginStateBytes, }, + LastAttemptStartedAt: startedAt, } // initialize mocks diff --git a/flytepropeller/pkg/controller/nodes/handler/state.go b/flytepropeller/pkg/controller/nodes/handler/state.go index a7fa7bdf87..c3e35e67d7 100644 --- a/flytepropeller/pkg/controller/nodes/handler/state.go +++ b/flytepropeller/pkg/controller/nodes/handler/state.go @@ -48,11 +48,12 @@ type GateNodeState struct { } type ArrayNodeState struct { - Phase v1alpha1.ArrayNodePhase - TaskPhaseVersion uint32 - Error *core.ExecutionError - SubNodePhases bitarray.CompactArray - SubNodeTaskPhases bitarray.CompactArray - SubNodeRetryAttempts bitarray.CompactArray - SubNodeSystemFailures bitarray.CompactArray + Phase v1alpha1.ArrayNodePhase + TaskPhaseVersion uint32 + Error *core.ExecutionError + SubNodePhases bitarray.CompactArray + SubNodeTaskPhases bitarray.CompactArray + SubNodeRetryAttempts bitarray.CompactArray + SubNodeSystemFailures bitarray.CompactArray + SubNodeDeltaTimestamps bitarray.CompactArray } diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index fd74d107a0..6e58fe2783 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -65,7 +65,7 @@ func (n *nodeStateManager) HasWorkflowNodeState() bool { } func (n *nodeStateManager) HasGateNodeState() bool { - return n.g != nil +return n.g != nil } func (n *nodeStateManager) HasArrayNodeState() bool { @@ -181,6 +181,11 @@ func (n nodeStateManager) GetArrayNodeState() handler.ArrayNodeState { if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil { as.SubNodeSystemFailures = *subNodeSystemFailuresCopy } + + subNodeDeltaTimestamps := an.GetSubNodeDeltaTimestamps() + if subNodeDeltaTimestampsCopy := subNodeDeltaTimestamps.DeepCopy(); subNodeDeltaTimestampsCopy != nil { + as.SubNodeDeltaTimestamps = *subNodeDeltaTimestampsCopy + } } return as } diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 9d19081cc4..ceeaf5aaec 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -314,6 +314,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n interfaces.N t.SetSubNodeTaskPhases(na.SubNodeTaskPhases) t.SetSubNodeRetryAttempts(na.SubNodeRetryAttempts) t.SetSubNodeSystemFailures(na.SubNodeSystemFailures) + t.SetSubNodeDeltaTimestamps(na.SubNodeDeltaTimestamps) t.SetTaskPhaseVersion(na.TaskPhaseVersion) } } From d9ba7c42cfffbefa6e8c6a8eea9cf883e45f81df Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 27 Nov 2024 14:24:16 -0600 Subject: [PATCH 02/11] remove unnecessary print Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index db31f0def4..e282b4e7ad 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -785,7 +785,6 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter // compute start time for subNode using delta timestamp from ArrayNode NodeStatus var startedAt *v1.Time - fmt.Printf("HAMERSAW - retrieving subNodeIndex %d/%d\n", subNodeIndex, arrayNodeState.SubNodeDeltaTimestamps.ItemsCount) if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { startedAt = &v1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} } From f34fb8f976aa4dbe2c11b793bbfb093cde483c08 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 27 Nov 2024 14:41:13 -0600 Subject: [PATCH 03/11] tests passing - still need to add validation Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler.go | 7 ++++++- flytepropeller/pkg/controller/nodes/array/handler_test.go | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index e282b4e7ad..f2021006a5 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -31,6 +31,11 @@ import ( "github.com/flyteorg/flyte/flytestdlib/storage" ) +const ( + // value is 3 days of seconds which is covered by 18 bits (262144) + MAX_DELTA_TIMESTAMP = 259200 +) + var ( nilLiteral = &idlcore.Literal{ Value: &idlcore.Literal_Scalar{ @@ -257,7 +262,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue}, - {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 259200}, // max value is 3 days of seconds which is coverd by 18 bits (262144) + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: MAX_DELTA_TIMESTAMP}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index a759327423..268a4a6fdd 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -252,6 +252,7 @@ func TestAbort(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 @@ -348,6 +349,7 @@ func TestFinalize(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) @@ -858,6 +860,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) @@ -1301,6 +1304,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) { {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1}, {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1}, + {arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(len(test.subNodePhases)), bitarray.Item(item.maxValue)) // #nosec G115 assert.NoError(t, err) From 22a83372e74a0d95d70ea6b34b7943f9404ed5cb Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 3 Dec 2024 11:01:36 -0600 Subject: [PATCH 04/11] added unit test Signed-off-by: Daniel Rammer --- .../controller/nodes/array/handler_test.go | 85 ++++++++++++++----- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 268a4a6fdd..bd54764d07 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -184,9 +186,15 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte nCtx.OnNodeStateWriter().Return(nodeStateWriter) // NodeStatus + nowMinus := time.Now().Add(time.Duration(-5) * time.Second) + metav1NowMinus := metav1.Time{ + Time: nowMinus, + } nCtx.OnNodeStatus().Return(&v1alpha1.NodeStatus{ DataDir: storage.DataReference("s3://bucket/data"), OutputDir: storage.DataReference("s3://bucket/output"), + LastAttemptStartedAt: &metav1NowMinus, + StartedAt: &metav1NowMinus, }) return nCtx @@ -508,25 +516,27 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } tests := []struct { - name string - parallelism *uint32 - minSuccessRatio *float32 - subNodePhases []v1alpha1.NodePhase - subNodeTaskPhases []core.Phase - subNodeTransitions []handler.Transition - expectedArrayNodePhase v1alpha1.ArrayNodePhase - expectedArrayNodeSubPhases []v1alpha1.NodePhase - expectedTransitionPhase handler.EPhase - expectedExternalResourcePhases []idlcore.TaskExecution_Phase - currentWfParallelism uint32 - maxWfParallelism uint32 - incrementParallelismCount uint32 - useFakeEventRecorder bool - eventRecorderFailures uint32 - eventRecorderError error - expectedTaskPhaseVersion uint32 - expectHandleError bool - expectedEventingCalls int + name string + parallelism *uint32 + minSuccessRatio *float32 + subNodePhases []v1alpha1.NodePhase + subNodeTaskPhases []core.Phase + subNodeDeltaTimestamps []uint64 + subNodeTransitions []handler.Transition + expectedArrayNodePhase v1alpha1.ArrayNodePhase + expectedArrayNodeSubPhases []v1alpha1.NodePhase + expectedDiffArrayNodeSubDeltaTimestamps []bool + expectedTransitionPhase handler.EPhase + expectedExternalResourcePhases []idlcore.TaskExecution_Phase + currentWfParallelism uint32 + maxWfParallelism uint32 + incrementParallelismCount uint32 + useFakeEventRecorder bool + eventRecorderFailures uint32 + eventRecorderError error + expectedTaskPhaseVersion uint32 + expectHandleError bool + expectedEventingCalls int }{ { name: "StartAllSubNodes", @@ -829,6 +839,31 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectHandleError: true, expectedEventingCalls: 1, }, + { + name: "DeltaTimestampUpdates", + parallelism: uint32Ptr(0), + subNodePhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseRunning, + }, + subNodeTaskPhases: []core.Phase{ + core.PhaseUndefined, + core.PhaseUndefined, + }, + subNodeTransitions: []handler.Transition{ + handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), + handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})), + }, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRetryableFailure, + }, + expectedTaskPhaseVersion: 1, + expectedTransitionPhase: handler.EPhaseRunning, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_FAILED}, + incrementParallelismCount: 1, + }, } for _, test := range tests { @@ -870,6 +905,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { arrayNodeState.SubNodePhases.SetItem(i, bitarray.Item(nodePhase)) // #nosec G115 } + for i, deltaTimestmap := range test.subNodeDeltaTimestamps { + arrayNodeState.SubNodeDeltaTimestamps.SetItem(i, bitarray.Item(deltaTimestmap)) // #nosec G115 + } + nodeSpec := arrayNodeSpec nodeSpec.ArrayNode.Parallelism = test.parallelism nodeSpec.ArrayNode.MinSuccessRatio = test.minSuccessRatio @@ -925,6 +964,14 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { assert.Equal(t, expectedPhase, v1alpha1.NodePhase(arrayNodeState.SubNodePhases.GetItem(i))) // #nosec G115 } + for i, expectedDiffDeltaTimestamps := range test.expectedDiffArrayNodeSubDeltaTimestamps { + if expectedDiffDeltaTimestamps { + assert.NotEqual(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i]) + } else { + assert.Equal(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i]) + } + } + bufferedEventRecorder, ok := eventRecorder.(*bufferedEventRecorder) if ok { if len(test.expectedExternalResourcePhases) > 0 { From 6cec74c3c4ed1ee2e9f7e624a3e4cf0e18b49233 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 3 Dec 2024 11:21:50 -0600 Subject: [PATCH 05/11] fixed linter Signed-off-by: Daniel Rammer --- .../flyteworkflow/v1alpha1/node_status.go | 4 +- .../pkg/controller/nodes/array/handler.go | 11 +++-- .../controller/nodes/array/handler_test.go | 48 +++++++++---------- .../controller/nodes/node_state_manager.go | 2 +- 4 files changed, 34 insertions(+), 31 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 2e6fde25d3..c27a8560fc 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -236,7 +236,7 @@ type ArrayNodeStatus struct { SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"` SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"` SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"` - SubNodeDeltaTimestamps bitarray.CompactArray `json: "subtimestamps",omitempty"` + SubNodeDeltaTimestamps bitarray.CompactArray `json:"subtimestamps,omitempty"` TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"` } @@ -311,7 +311,7 @@ func (in *ArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { } func (in *ArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) { - if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps { + if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps { in.SetDirty() in.SubNodeDeltaTimestamps = subNodeDeltaTimestamps } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index f2021006a5..0918ab6035 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -7,7 +7,7 @@ import ( "strconv" "time" - "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" @@ -789,9 +789,12 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter } // compute start time for subNode using delta timestamp from ArrayNode NodeStatus - var startedAt *v1.Time - if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { - startedAt = &v1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} + var startedAt *metav1.Time + subNodeIndexUint := uint(subNodeIndex) // #nosec G115 + if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount >= subNodeIndexUint { + if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { + startedAt = &metav1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} // #nosec G115 + } } subNodeStatus := &v1alpha1.NodeStatus{ diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index bd54764d07..ac0e4b45ad 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -191,10 +191,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte Time: nowMinus, } nCtx.OnNodeStatus().Return(&v1alpha1.NodeStatus{ - DataDir: storage.DataReference("s3://bucket/data"), - OutputDir: storage.DataReference("s3://bucket/output"), + DataDir: storage.DataReference("s3://bucket/data"), + OutputDir: storage.DataReference("s3://bucket/output"), LastAttemptStartedAt: &metav1NowMinus, - StartedAt: &metav1NowMinus, + StartedAt: &metav1NowMinus, }) return nCtx @@ -516,27 +516,27 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } tests := []struct { - name string - parallelism *uint32 - minSuccessRatio *float32 - subNodePhases []v1alpha1.NodePhase - subNodeTaskPhases []core.Phase - subNodeDeltaTimestamps []uint64 - subNodeTransitions []handler.Transition - expectedArrayNodePhase v1alpha1.ArrayNodePhase - expectedArrayNodeSubPhases []v1alpha1.NodePhase + name string + parallelism *uint32 + minSuccessRatio *float32 + subNodePhases []v1alpha1.NodePhase + subNodeTaskPhases []core.Phase + subNodeDeltaTimestamps []uint64 + subNodeTransitions []handler.Transition + expectedArrayNodePhase v1alpha1.ArrayNodePhase + expectedArrayNodeSubPhases []v1alpha1.NodePhase expectedDiffArrayNodeSubDeltaTimestamps []bool - expectedTransitionPhase handler.EPhase - expectedExternalResourcePhases []idlcore.TaskExecution_Phase - currentWfParallelism uint32 - maxWfParallelism uint32 - incrementParallelismCount uint32 - useFakeEventRecorder bool - eventRecorderFailures uint32 - eventRecorderError error - expectedTaskPhaseVersion uint32 - expectHandleError bool - expectedEventingCalls int + expectedTransitionPhase handler.EPhase + expectedExternalResourcePhases []idlcore.TaskExecution_Phase + currentWfParallelism uint32 + maxWfParallelism uint32 + incrementParallelismCount uint32 + useFakeEventRecorder bool + eventRecorderFailures uint32 + eventRecorderError error + expectedTaskPhaseVersion uint32 + expectHandleError bool + expectedEventingCalls int }{ { name: "StartAllSubNodes", @@ -906,7 +906,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } for i, deltaTimestmap := range test.subNodeDeltaTimestamps { - arrayNodeState.SubNodeDeltaTimestamps.SetItem(i, bitarray.Item(deltaTimestmap)) // #nosec G115 + arrayNodeState.SubNodeDeltaTimestamps.SetItem(i, deltaTimestmap) // #nosec G115 } nodeSpec := arrayNodeSpec diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index 6e58fe2783..25b0bc55df 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -65,7 +65,7 @@ func (n *nodeStateManager) HasWorkflowNodeState() bool { } func (n *nodeStateManager) HasGateNodeState() bool { -return n.g != nil + return n.g != nil } func (n *nodeStateManager) HasArrayNodeState() bool { From c2fbd2e6933aff86af96dcf0a543c653b874cd89 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 3 Dec 2024 11:22:47 -0600 Subject: [PATCH 06/11] make generate Signed-off-by: Daniel Rammer --- .../mocks/ExecutableArrayNodeStatus.go | 32 ++++++++++++++++ .../v1alpha1/mocks/MutableArrayNodeStatus.go | 37 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go index f4cce3e643..4aee51f044 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableArrayNodeStatus.go @@ -82,6 +82,38 @@ func (_m *ExecutableArrayNodeStatus) GetExecutionError() *core.ExecutionError { return r0 } +type ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps struct { + *mock.Call +} + +func (_m ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps) Return(_a0 bitarray.CompactArray) *ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + return &ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableArrayNodeStatus) OnGetSubNodeDeltaTimestamps() *ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps") + return &ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +func (_m *ExecutableArrayNodeStatus) OnGetSubNodeDeltaTimestampsMatch(matchers ...interface{}) *ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps", matchers...) + return &ExecutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +// GetSubNodeDeltaTimestamps provides a mock function with given fields: +func (_m *ExecutableArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { + ret := _m.Called() + + var r0 bitarray.CompactArray + if rf, ok := ret.Get(0).(func() bitarray.CompactArray); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bitarray.CompactArray) + } + + return r0 +} + type ExecutableArrayNodeStatus_GetSubNodePhases struct { *mock.Call } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go index c20f80e349..1e081e20ba 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MutableArrayNodeStatus.go @@ -82,6 +82,38 @@ func (_m *MutableArrayNodeStatus) GetExecutionError() *core.ExecutionError { return r0 } +type MutableArrayNodeStatus_GetSubNodeDeltaTimestamps struct { + *mock.Call +} + +func (_m MutableArrayNodeStatus_GetSubNodeDeltaTimestamps) Return(_a0 bitarray.CompactArray) *MutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + return &MutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: _m.Call.Return(_a0)} +} + +func (_m *MutableArrayNodeStatus) OnGetSubNodeDeltaTimestamps() *MutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps") + return &MutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +func (_m *MutableArrayNodeStatus) OnGetSubNodeDeltaTimestampsMatch(matchers ...interface{}) *MutableArrayNodeStatus_GetSubNodeDeltaTimestamps { + c_call := _m.On("GetSubNodeDeltaTimestamps", matchers...) + return &MutableArrayNodeStatus_GetSubNodeDeltaTimestamps{Call: c_call} +} + +// GetSubNodeDeltaTimestamps provides a mock function with given fields: +func (_m *MutableArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray { + ret := _m.Called() + + var r0 bitarray.CompactArray + if rf, ok := ret.Get(0).(func() bitarray.CompactArray); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bitarray.CompactArray) + } + + return r0 +} + type MutableArrayNodeStatus_GetSubNodePhases struct { *mock.Call } @@ -284,6 +316,11 @@ func (_m *MutableArrayNodeStatus) SetExecutionError(executionError *core.Executi _m.Called(executionError) } +// SetSubNodeDeltaTimestamps provides a mock function with given fields: subNodeDeltaTimestamps +func (_m *MutableArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) { + _m.Called(subNodeDeltaTimestamps) +} + // SetSubNodePhases provides a mock function with given fields: subNodePhases func (_m *MutableArrayNodeStatus) SetSubNodePhases(subNodePhases bitarray.CompactArray) { _m.Called(subNodePhases) From c0acaa9f76f46b6bccc9e4e00d3425617b43d4ee Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 4 Dec 2024 12:59:18 -0600 Subject: [PATCH 07/11] only setting timestamps if SubNodeDeltaTimestamps has been initialized Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/handler.go | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 0918ab6035..4f98e66796 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -389,16 +389,19 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures())) - startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt() - subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt() - if subNodeStartedAt == nil { - // subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has - // been reset (ex. retryable failure). in both cases we set the delta timestamp to 0 - arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0) - } else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 { - // otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it - deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds()) - arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration) + indexUint := uint(index) // #nosec G115 + if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount >= indexUint { + startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt() + subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt() + if subNodeStartedAt == nil { + // subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has + // been reset (ex. retryable failure). in both cases we set the delta timestamp to 0 + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0) + } else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 { + // otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it + deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds()) + arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration) + } } // increment task phase version if subNode phase or task phase changed From 6893c6382a1a91396fcfa06727f695464454b117 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 4 Dec 2024 13:05:47 -0600 Subject: [PATCH 08/11] correctly checking ItemsCount Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 4f98e66796..b286e9ba36 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -390,7 +390,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures())) indexUint := uint(index) // #nosec G115 - if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount >= indexUint { + if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount > indexUint { startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt() subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt() if subNodeStartedAt == nil { @@ -794,7 +794,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter // compute start time for subNode using delta timestamp from ArrayNode NodeStatus var startedAt *metav1.Time subNodeIndexUint := uint(subNodeIndex) // #nosec G115 - if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount >= subNodeIndexUint { + if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount > subNodeIndexUint { if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { startedAt = &metav1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} // #nosec G115 } From 7f00d114580de5920f6023ea826772b80b0761d1 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 4 Dec 2024 15:07:43 -0600 Subject: [PATCH 09/11] validating SubNodeDeltaTimestamps based on underlying BitSet Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index b286e9ba36..51d3105a0a 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -389,8 +389,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures())) - indexUint := uint(index) // #nosec G115 - if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount > indexUint { + if arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil { startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt() subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt() if subNodeStartedAt == nil { @@ -793,8 +792,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter // compute start time for subNode using delta timestamp from ArrayNode NodeStatus var startedAt *metav1.Time - subNodeIndexUint := uint(subNodeIndex) // #nosec G115 - if arrayNodeState.SubNodeDeltaTimestamps.ItemsCount > subNodeIndexUint { + if nCtx.NodeStatus().GetLastAttemptStartedAt() != nil && arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil { if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 { startedAt = &metav1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} // #nosec G115 } From 3aa283d75f26e7b4bb986628698489986b576ccd Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 4 Dec 2024 15:21:49 -0600 Subject: [PATCH 10/11] tmp update to strip timeouts and retries for ArrayNode Signed-off-by: Daniel Rammer --- flytepropeller/pkg/compiler/transformers/k8s/node.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/node.go b/flytepropeller/pkg/compiler/transformers/k8s/node.go index 18ec1ba02f..7ca72b69c2 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/node.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/node.go @@ -197,6 +197,11 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile case *core.ArrayNode_MinSuccessRatio: nodeSpec.ArrayNode.MinSuccessRatio = &successCriteria.MinSuccessRatio } + + // TODO @hamersaw - tmp + nodeSpec.ActiveDeadline = nil + nodeSpec.ExecutionDeadline = nil + nodeSpec.RetryStrategy = nil default: if n.GetId() == v1alpha1.StartNodeID { nodeSpec.Kind = v1alpha1.NodeKindStart From 36601df43924efb35f1c33a3165af7e2e3571f5b Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 5 Dec 2024 08:59:30 -0600 Subject: [PATCH 11/11] removed tmp ArrayNode NodeSpec field overrides Signed-off-by: Daniel Rammer --- flytepropeller/pkg/compiler/transformers/k8s/node.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/node.go b/flytepropeller/pkg/compiler/transformers/k8s/node.go index 7ca72b69c2..18ec1ba02f 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/node.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/node.go @@ -197,11 +197,6 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile case *core.ArrayNode_MinSuccessRatio: nodeSpec.ArrayNode.MinSuccessRatio = &successCriteria.MinSuccessRatio } - - // TODO @hamersaw - tmp - nodeSpec.ActiveDeadline = nil - nodeSpec.ExecutionDeadline = nil - nodeSpec.RetryStrategy = nil default: if n.GetId() == v1alpha1.StartNodeID { nodeSpec.Kind = v1alpha1.NodeKindStart