Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] use deep copy of bit arrays when getting array node state #5681

Merged
merged 9 commits into from
Aug 23, 2024
21 changes: 21 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,27 @@
}
}

func (in *ArrayNodeStatus) DeepCopyInto(out *ArrayNodeStatus) {
*out = *in
out.MutableStruct = in.MutableStruct

Check warning on line 321 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L319-L321

Added lines #L319 - L321 were not covered by tests

if in.ExecutionError != nil {
in, out := &in.ExecutionError, &out.ExecutionError
*out = new(core.ExecutionError)
*out = *in

Check warning on line 326 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L323-L326

Added lines #L323 - L326 were not covered by tests
}
}

func (in *ArrayNodeStatus) DeepCopy() *ArrayNodeStatus {
if in == nil {
return nil

Check warning on line 332 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L330-L332

Added lines #L330 - L332 were not covered by tests
}

out := &ArrayNodeStatus{}
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
in.DeepCopyInto(out)
return out

Check warning on line 337 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L335-L337

Added lines #L335 - L337 were not covered by tests
}

type NodeStatus struct {
MutableStruct
Phase NodePhase `json:"phase,omitempty"`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ const (
type EventConfig struct {
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."`
// only meant to be overridden for certain node types that have different eventing behavior such as ArrayNode
ErrorOnAlreadyExists bool `json:"-"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the purpose of this now, thanks for the comment. We're adding a field to a central config object and assuming that such field will only be used in a particular situation. This feels hacky to me, but maybe there's a precedent and good reason to follow this pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup - it definitely is hacky. Looking forward when/if we eventually re-work eventing in propeller

}

// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 59 additions & 11 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -559,7 +563,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseQueued,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
Expand All @@ -580,7 +588,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -601,7 +613,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseQueued,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
Expand All @@ -619,8 +635,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
core.PhaseUndefined,
core.PhaseUndefined,
},
subNodeTransitions: []handler.Transition{},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
subNodeTransitions: []handler.Transition{},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{},
Expand All @@ -642,7 +662,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -663,7 +687,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseSucceeded,
v1alpha1.NodePhaseSucceeded,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_SUCCEEDED},
Expand All @@ -684,7 +712,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseSucceeded,
v1alpha1.NodePhaseFailed,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_FAILED},
Expand All @@ -704,7 +736,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseFailed,
v1alpha1.NodePhaseSucceeded,
},
expectedTaskPhaseVersion: 0,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_FAILED, idlcore.TaskExecution_SUCCEEDED},
Expand All @@ -724,7 +760,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRunning,
},
expectedTaskPhaseVersion: 2,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
Expand All @@ -749,6 +789,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
useFakeEventRecorder: true,
eventRecorderFailures: 5,
Expand All @@ -771,6 +815,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
useFakeEventRecorder: true,
eventRecorderError: fmt.Errorf("err"),
Expand Down
24 changes: 20 additions & 4 deletions flytepropeller/pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,27 @@
if an != nil {
as.Phase = an.GetArrayNodePhase()
as.Error = an.GetExecutionError()
as.SubNodePhases = an.GetSubNodePhases()
as.SubNodeTaskPhases = an.GetSubNodeTaskPhases()
as.SubNodeRetryAttempts = an.GetSubNodeRetryAttempts()
as.SubNodeSystemFailures = an.GetSubNodeSystemFailures()
as.TaskPhaseVersion = an.GetTaskPhaseVersion()

subNodePhases := an.GetSubNodePhases()
if subNodePhasesCopy := subNodePhases.DeepCopy(); subNodePhasesCopy != nil {
as.SubNodePhases = *subNodePhasesCopy

Check warning on line 167 in flytepropeller/pkg/controller/nodes/node_state_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/node_state_manager.go#L165-L167

Added lines #L165 - L167 were not covered by tests
}

subNodeTaskPhases := an.GetSubNodeTaskPhases()
if subNodeTaskPhasesCopy := subNodeTaskPhases.DeepCopy(); subNodeTaskPhasesCopy != nil {
as.SubNodeTaskPhases = *subNodeTaskPhasesCopy

Check warning on line 172 in flytepropeller/pkg/controller/nodes/node_state_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/node_state_manager.go#L170-L172

Added lines #L170 - L172 were not covered by tests
}

subNodeRetryAttempts := an.GetSubNodeRetryAttempts()
if subNodeRetryAttemptsCopy := subNodeRetryAttempts.DeepCopy(); subNodeRetryAttemptsCopy != nil {
as.SubNodeRetryAttempts = *subNodeRetryAttemptsCopy

Check warning on line 177 in flytepropeller/pkg/controller/nodes/node_state_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/node_state_manager.go#L175-L177

Added lines #L175 - L177 were not covered by tests
}

subNodeSystemFailures := an.GetSubNodeSystemFailures()
if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil {
as.SubNodeSystemFailures = *subNodeSystemFailuresCopy

Check warning on line 182 in flytepropeller/pkg/controller/nodes/node_state_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/node_state_manager.go#L180-L182

Added lines #L180 - L182 were not covered by tests
}
}
return as
}
Expand Down
Loading