Skip to content

Commit

Permalink
Inline input data for execution events (flyteorg#521)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Katrina Rogan <[email protected]>

* recovery

Signed-off-by: Katrina Rogan <[email protected]>

* test compile

Signed-off-by: Katrina Rogan <[email protected]>

* GH ur my fave diff tool

Signed-off-by: Katrina Rogan <[email protected]>

* revert styling changes

Signed-off-by: Katrina Rogan <[email protected]>

* more test

Signed-off-by: Katrina Rogan <[email protected]>

* goimports

Signed-off-by: Katrina Rogan <[email protected]>

* rm log line

Signed-off-by: Katrina Rogan <[email protected]>

* changes

Signed-off-by: Katrina Rogan <[email protected]>

* lint

Signed-off-by: Katrina Rogan <[email protected]>

* review comment

Signed-off-by: Katrina Rogan <[email protected]>

* go.mod

Signed-off-by: Katrina Rogan <[email protected]>

* review comment

Signed-off-by: Katrina Rogan <[email protected]>

---------

Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Co-authored-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan and Katrina Rogan authored Feb 9, 2023
1 parent 92cd1b7 commit 95a4791
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 57 deletions.
20 changes: 12 additions & 8 deletions events/admin_eventsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ var (
Name: "n",
},
},
Phase: core.NodeExecution_FAILED,
OccurredAt: ptypes.TimestampNow(),
ProducerId: "",
InputUri: "input-uri",
Phase: core.NodeExecution_FAILED,
OccurredAt: ptypes.TimestampNow(),
ProducerId: "",
InputValue: &event.NodeExecutionEvent_InputUri{
InputUri: "input-uri",
},
DeckUri: deckURI,
OutputResult: &event.NodeExecutionEvent_OutputUri{OutputUri: ""},
}
Expand Down Expand Up @@ -200,10 +202,12 @@ func TestIDFromMessage(t *testing.T) {
Name: "n",
},
},
Phase: core.NodeExecution_FAILED,
OccurredAt: ptypes.TimestampNow(),
ProducerId: "",
InputUri: "input-uri",
Phase: core.NodeExecution_FAILED,
OccurredAt: ptypes.TimestampNow(),
ProducerId: "",
InputValue: &event.NodeExecutionEvent_InputUri{
InputUri: "input-uri",
},
OutputResult: &event.NodeExecutionEvent_OutputUri{OutputUri: ""},
RetryGroup: "1",
}
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.6
github.com/flyteorg/flyteidl v1.3.7
github.com/flyteorg/flyteplugins v1.0.34
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
Expand Down Expand Up @@ -123,11 +123,11 @@ require (
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/term v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/api v0.76.0 // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.6 h1:PI846AdnrQZ84pxRVAzA3WGihv+xXmjQHO91nj/kV9g=
github.com/flyteorg/flyteidl v1.3.6/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.7 h1:MA7kOqMr/TmPlYPvJZwfsl+CYneuDOJ+kEKx2DocLhE=
github.com/flyteorg/flyteidl v1.3.7/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.34 h1:fgwC4oq4/UKpPX1S4puhejAz0J+CnT/Rpj5qw/A1tII=
github.com/flyteorg/flyteplugins v1.0.34/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
Expand Down Expand Up @@ -946,8 +946,8 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -1069,11 +1069,12 @@ golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1083,8 +1084,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ func (c *Controller) run(ctx context.Context) error {
}

// Called from leader elector -if configured- to start running as the leader.
func (c *Controller) onStartedLeading(_ context.Context) {
ctx, cancelNow := context.WithCancel(context.Background())
func (c *Controller) onStartedLeading(ctx context.Context) {
backgroundCtx, cancelNow := context.WithCancel(ctx)
logger.Infof(ctx, "Acquired leader lease.")
go func() {
if err := c.run(ctx); err != nil {
logger.Panic(ctx, err)
if err := c.run(backgroundCtx); err != nil {
logger.Panic(backgroundCtx, err)
}
}()

<-ctx.Done()
<-backgroundCtx.Done()
logger.Infof(ctx, "Lost leader lease.")
cancelNow()
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,18 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe
logger.Warnf(ctx, "call to attemptRecovery node [%+v] data returned no error but also no data", nCtx.NodeExecutionMetadata().GetNodeExecutionID())
return handler.PhaseInfoUndefined, nil
}
nodeInputs := recoveredData.GetFullInputs()
// Copy inputs to this node's expected location
if recoveredData.FullInputs != nil {
if err := c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, recoveredData.FullInputs); err != nil {
if nodeInputs != nil {
if err = c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, recoveredData.FullInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath())
return handler.PhaseInfoUndefined, errors.Wrapf(
errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath())
}
} else if len(recovered.InputUri) > 0 {
// If the inputs are too large they won't be returned inline in the RecoverData call. We must fetch them before copying them.
nodeInputs := &core.LiteralMap{}
nodeInputs = &core.LiteralMap{}
if recoveredData.FullInputs == nil {
if err := c.store.ReadProtobuf(ctx, storage.DataReference(recovered.InputUri), nodeInputs); err != nil {
return handler.PhaseInfoUndefined, errors.Wrapf(errors.InputsNotFoundError, nCtx.NodeID(), err, "failed to read data from dataDir [%v].", recovered.InputUri)
Expand Down Expand Up @@ -283,6 +284,7 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe
}

info := &handler.ExecutionInfo{
Inputs: nodeInputs,
OutputInfo: oi,
}

Expand Down Expand Up @@ -312,7 +314,8 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe
// In this method we check if the queue is ready to be processed and if so, we prime it in Admin as queued
// Before we start the node execution, we need to transition this Node status to Queued.
// This is because a node execution has to exist before task/wf executions can start.
func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructure, nCtx handler.NodeExecutionContext) (handler.PhaseInfo, error) {
func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructure, nCtx handler.NodeExecutionContext) (
handler.PhaseInfo, error) {
logger.Debugf(ctx, "Node not yet started")
// Query the nodes information to figure out if it can be executed.
predicatePhase, err := CanExecute(ctx, dag, nCtx.ContextualNodeLookup(), nCtx.Node())
Expand Down Expand Up @@ -360,7 +363,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur
logger.Debugf(ctx, "Node Data Directory [%s].", nodeStatus.GetDataDir())
}

return handler.PhaseInfoQueued("node queued"), nil
return handler.PhaseInfoQueued("node queued", nodeInputs), nil
}

// Now that we have resolved the inputs, we can record as a transition latency. This is because we have completed
Expand Down Expand Up @@ -497,7 +500,8 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor

nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p, nCtx.InputReader().GetInputPath().String(), nodeStatus, nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase)
nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase,
c.eventConfig)
if err != nil {
return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event")
}
Expand Down Expand Up @@ -612,7 +616,8 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node

nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p, nCtx.InputReader().GetInputPath().String(), nCtx.NodeStatus(), nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase)
nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase,
c.eventConfig)
if err != nil {
return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event")
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
}, true, false, true, core.NodeExecution_RUNNING},

{"queued->queued", v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseQueued, executors.NodePhasePending, func() (handler.Transition, error) {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoQueued("reason")), nil
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoQueued("reason", &core.LiteralMap{})), nil
}, true, false, false, core.NodeExecution_QUEUED},

{"queued->failing", v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseFailing, executors.NodePhasePending, func() (handler.Transition, error) {
Expand Down Expand Up @@ -1706,7 +1706,7 @@ func TestNodeExecutionEventStartNode(t *testing.T) {
tID := &core.TaskExecutionIdentifier{
NodeExecutionId: nID,
}
p := handler.PhaseInfoQueued("r")
p := handler.PhaseInfoQueued("r", &core.LiteralMap{})
inputReader := &mocks3.InputReader{}
inputReader.OnGetInputPath().Return("reference")
parentInfo := &mocks4.ImmutableParentInfo{}
Expand All @@ -1725,7 +1725,9 @@ func TestNodeExecutionEventStartNode(t *testing.T) {
ns.OnGetParentTaskID().Return(tID)
ns.OnGetOutputDirMatch(mock.Anything).Return("dummy://dummyOutUrl")
ns.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{})
ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone)
ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{
RawOutputPolicy: config.RawOutputPolicyReference,
})
assert.NoError(t, err)
assert.Equal(t, "start-node", ev.Id.NodeId)
assert.Equal(t, execID, ev.Id.ExecutionId)
Expand All @@ -1752,7 +1754,7 @@ func TestNodeExecutionEventV0(t *testing.T) {
tID := &core.TaskExecutionIdentifier{
NodeExecutionId: nID,
}
p := handler.PhaseInfoQueued("r")
p := handler.PhaseInfoQueued("r", &core.LiteralMap{})
parentInfo := &mocks4.ImmutableParentInfo{}
parentInfo.OnGetUniqueID().Return("np1")
parentInfo.OnCurrentAttempt().Return(uint32(2))
Expand All @@ -1767,7 +1769,9 @@ func TestNodeExecutionEventV0(t *testing.T) {
ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted)
nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns)
ns.OnGetParentTaskID().Return(tID)
ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone)
ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{
RawOutputPolicy: config.RawOutputPolicyReference,
})
assert.NoError(t, err)
assert.Equal(t, "n1", ev.Id.NodeId)
assert.Equal(t, execID, ev.Id.ExecutionId)
Expand All @@ -1791,7 +1795,12 @@ func TestNodeExecutionEventV1(t *testing.T) {
tID := &core.TaskExecutionIdentifier{
NodeExecutionId: nID,
}
p := handler.PhaseInfoQueued("r")
inputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": coreutils.MustMakeLiteral("bar"),
},
}
p := handler.PhaseInfoQueued("r", inputs)
//inputReader := &mocks3.InputReader{}
//inputReader.OnGetInputPath().Return("reference")
parentInfo := &mocks4.ImmutableParentInfo{}
Expand All @@ -1808,7 +1817,9 @@ func TestNodeExecutionEventV1(t *testing.T) {
ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted)
nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns)
ns.OnGetParentTaskID().Return(tID)
eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone)
eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{
RawOutputPolicy: config.RawOutputPolicyInline,
})
assert.NoError(t, err)
assert.Equal(t, "np1-2-n1", eventOpt.Id.NodeId)
assert.Equal(t, execID, eventOpt.Id.ExecutionId)
Expand All @@ -1820,6 +1831,7 @@ func TestNodeExecutionEventV1(t *testing.T) {
assert.Nil(t, eventOpt.ParentTaskMetadata)
assert.Equal(t, "name", eventOpt.NodeName)
assert.Equal(t, "2", eventOpt.RetryGroup)
assert.True(t, proto.Equal(eventOpt.GetInputData(), inputs))
}

func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/nodes/handler/transition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type ExecutionInfo struct {
DynamicNodeInfo *DynamicNodeInfo
WorkflowNodeInfo *WorkflowNodeInfo
BranchNodeInfo *BranchNodeInfo
Inputs *core.LiteralMap
OutputInfo *OutputInfo
TaskNodeInfo *TaskNodeInfo
GateNodeInfo *GateNodeInfo
Expand Down Expand Up @@ -120,8 +121,10 @@ func PhaseInfoNotReady(reason string) PhaseInfo {
return phaseInfo(EPhaseNotReady, nil, nil, reason)
}

func PhaseInfoQueued(reason string) PhaseInfo {
return phaseInfo(EPhaseQueued, nil, nil, reason)
func PhaseInfoQueued(reason string, inputs *core.LiteralMap) PhaseInfo {
return phaseInfo(EPhaseQueued, nil, &ExecutionInfo{
Inputs: inputs,
}, reason)
}

func PhaseInfoRunning(info *ExecutionInfo) PhaseInfo {
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/nodes/handler/transition_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package handler
import (
"testing"

"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/golang/protobuf/proto"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
)

func TestPhaseInfoQueued(t *testing.T) {
p := PhaseInfoQueued("Queued")
p := PhaseInfoQueued("Queued", &core.LiteralMap{})
assert.Equal(t, EPhaseQueued, p.p)
}

Expand Down Expand Up @@ -59,11 +62,17 @@ func TestPhaseInfo(t *testing.T) {
})

t.Run("queued", func(t *testing.T) {
p := PhaseInfoQueued("reason")
inputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": coreutils.MustMakeLiteral("bar"),
},
}
p := PhaseInfoQueued("reason", inputs)
assert.Equal(t, EPhaseQueued, p.GetPhase())
assert.Nil(t, p.GetErr())
assert.NotNil(t, p.GetOccurredAt())
assert.Equal(t, "reason", p.GetReason())
assert.True(t, proto.Equal(p.info.Inputs, inputs))
})

t.Run("running", func(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/nodes/handler/transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package handler
import (
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/storage"
"github.com/stretchr/testify/assert"
)
Expand All @@ -13,7 +15,7 @@ func AsPointer[T any](val T) *T {

func TestDoTransition(t *testing.T) {
t.Run("ephemeral", func(t *testing.T) {
tr := DoTransition(TransitionTypeEphemeral, PhaseInfoQueued("queued"))
tr := DoTransition(TransitionTypeEphemeral, PhaseInfoQueued("queued", &core.LiteralMap{}))
assert.Equal(t, TransitionTypeEphemeral, tr.Type())
assert.Equal(t, EPhaseQueued, tr.Info().p)
})
Expand All @@ -30,7 +32,7 @@ func TestDoTransition(t *testing.T) {
}

func TestTransition_WithInfo(t *testing.T) {
tr := DoTransition(TransitionTypeEphemeral, PhaseInfoQueued("queued"))
tr := DoTransition(TransitionTypeEphemeral, PhaseInfoQueued("queued", &core.LiteralMap{}))
assert.Equal(t, EPhaseQueued, tr.info.p)
tr = tr.WithInfo(PhaseInfoSuccess(&ExecutionInfo{}))
assert.Equal(t, EPhaseSuccess, tr.info.p)
Expand Down
Loading

0 comments on commit 95a4791

Please sign in to comment.