Skip to content

Commit

Permalink
Don't send inputURI for start-node (#400)
Browse files Browse the repository at this point in the history
* send empty `inputUri` for `start-node` in node execution event to flyteadmin and therefore, GetNodeExecutionData will not attempt to download non-existing inputUri as it was doing before this change.
* add DB migration to clear `input_uri` in existing `node_executions` table for start nodes.

*TODO: Summarize tests added, integration tests run, or other steps you took to validate this change. Include (or link to) relevant test output or screenshots.*

*TODO: Describe any deployment or compatibility considerations for rolling out this change.*

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

* fixes COR-1134 by sending empty `inputUri` in node execution event to flyteadmin and therefore, `GetNodeExecutionData` will not attempt to download non-existing `inputUri` as it was doing before this change.

* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
iaroslav-ciupin committed Sep 26, 2024
1 parent 0d1583e commit 95c91dd
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
12 changes: 10 additions & 2 deletions flyteadmin/pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,20 +1263,28 @@ var ContinuedMigrations = []*gormigrate.Migration{
return tx.Migrator().DropTable("execution_tags")
},
},

{
ID: "2024-06-06-drop-execution_admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("execution_admin_tags")
},
},

{
ID: "2024-06-06-drop-admin-tags",
Migrate: func(tx *gorm.DB) error {
return tx.Migrator().DropTable("admin_tags")
},
},
{
ID: "2024-08-08-remove-input-uri-for-start-nodes",
Migrate: func(db *gorm.DB) error {
return db.Exec("UPDATE node_executions SET input_uri = '' WHERE node_id = 'start-node'").Error
},
Rollback: func(db *gorm.DB) error {

Check warning on line 1283 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1280-L1283

Added lines #L1280 - L1283 were not covered by tests
// can't rollback missing data
return nil
},

Check warning on line 1286 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1285-L1286

Added lines #L1285 - L1286 were not covered by tests
},
}

var m = append(LegacyMigrations, NoopMigrations...)
Expand Down
15 changes: 11 additions & 4 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,10 +1248,17 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter

targetEntity := common.GetTargetEntity(ctx, nCtx)

nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p, nCtx.InputReader().GetInputPath().String(), nCtx.NodeStatus(), nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(), c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase,
c.eventConfig, targetEntity)
nev, err := ToNodeExecutionEvent(
nCtx.NodeExecutionMetadata().GetNodeExecutionID(),
p,
nCtx.InputReader().GetInputPath().String(),
nCtx.NodeStatus(),
nCtx.ExecutionContext().GetEventVersion(),
nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(),
c.clusterID,
nCtx.NodeStateReader().GetDynamicNodeState().Phase,
c.eventConfig,
targetEntity)
if err != nil {
return interfaces.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event")
}
Expand Down
8 changes: 8 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,7 @@ func TestNodeExecutor_FinalizeHandler(t *testing.T) {
assert.NoError(t, exec.FinalizeHandler(ctx, nil, nil, nl, n))
})
}

func TestNodeExecutionEventStartNode(t *testing.T) {
execID := &core.WorkflowExecutionIdentifier{
Name: "e1",
Expand Down Expand Up @@ -1763,9 +1764,11 @@ func TestNodeExecutionEventStartNode(t *testing.T) {
ns.OnGetParentTaskID().Return(tID)
ns.OnGetOutputDirMatch(mock.Anything).Return("dummy://dummyOutUrl")
ns.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{})

ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{
RawOutputPolicy: config.RawOutputPolicyReference,
}, subWfID)

assert.NoError(t, err)
assert.Equal(t, "start-node", ev.Id.NodeId)
assert.Equal(t, execID, ev.Id.ExecutionId)
Expand All @@ -1778,6 +1781,7 @@ func TestNodeExecutionEventStartNode(t *testing.T) {
ev.OutputResult.(*event.NodeExecutionEvent_OutputUri).OutputUri)
assert.Equal(t, ev.ProducerId, testClusterID)
assert.Equal(t, subWfID, ev.GetTargetEntity())
assert.Nil(t, ev.InputValue)
}

func TestNodeExecutionEventV0(t *testing.T) {
Expand Down Expand Up @@ -1821,6 +1825,7 @@ func TestNodeExecutionEventV0(t *testing.T) {
assert.Empty(t, ev.NodeName)
assert.Empty(t, ev.RetryGroup)
assert.Empty(t, ev.TargetEntity)
assert.Equal(t, "reference", ev.GetInputUri())
}

func TestNodeExecutionEventV1(t *testing.T) {
Expand Down Expand Up @@ -1859,9 +1864,11 @@ func TestNodeExecutionEventV1(t *testing.T) {
ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted)
nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns)
ns.OnGetParentTaskID().Return(tID)

eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{
RawOutputPolicy: config.RawOutputPolicyInline,
}, nil)

assert.NoError(t, err)
assert.Equal(t, "np1-2-n1", eventOpt.Id.NodeId)
assert.Equal(t, execID, eventOpt.Id.ExecutionId)
Expand All @@ -1875,6 +1882,7 @@ func TestNodeExecutionEventV1(t *testing.T) {
assert.Equal(t, "2", eventOpt.RetryGroup)
assert.True(t, proto.Equal(eventOpt.GetInputData(), inputs))
assert.Empty(t, eventOpt.TargetEntity)
assert.Equal(t, inputs, eventOpt.GetInputData())
}

func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) {
Expand Down
30 changes: 16 additions & 14 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase {
}
}

func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
func ToNodeExecutionEvent(
nodeExecID *core.NodeExecutionIdentifier,
info handler.PhaseInfo,
inputPath string,
status v1alpha1.ExecutableNodeStatus,
Expand Down Expand Up @@ -109,9 +110,11 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
dynamicChain = true
}

eInfo := info.GetInfo()
var nev *event.NodeExecutionEvent
// Start node is special case where the Inputs and Outputs are the same and hence here we copy the Output file
// Start node is special case where the Outputs are the same and hence here we copy the Output file
// into the OutputResult and in admin we copy it over into input as well.
// Start node doesn't have inputs.
if nodeExecID.NodeId == v1alpha1.StartNodeID {
outputsFile := v1alpha1.GetOutputsFile(status.GetOutputDir())
nev = &event.NodeExecutionEvent{
Expand Down Expand Up @@ -139,6 +142,17 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
TargetEntity: targetEntity,
IsInDynamicChain: dynamicChain,
}
if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline {
if eInfo != nil {
nev.InputValue = &event.NodeExecutionEvent_InputData{
InputData: eInfo.Inputs,
}
}
} else {
nev.InputValue = &event.NodeExecutionEvent_InputUri{
InputUri: inputPath,
}
}
}

if eventVersion == v1alpha1.EventVersion0 && status.GetParentTaskID() != nil {
Expand All @@ -163,7 +177,6 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
nev.NodeName = node.GetName()
}

eInfo := info.GetInfo()
if eInfo != nil {
if eInfo.WorkflowNodeInfo != nil {
v := ToNodeExecWorkflowNodeMetadata(eInfo.WorkflowNodeInfo)
Expand Down Expand Up @@ -201,17 +214,6 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
nev.IsParent = true
}
}
if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline {
if eInfo != nil {
nev.InputValue = &event.NodeExecutionEvent_InputData{
InputData: eInfo.Inputs,
}
}
} else {
nev.InputValue = &event.NodeExecutionEvent_InputUri{
InputUri: inputPath,
}
}

return nev, nil
}
Expand Down

0 comments on commit 95c91dd

Please sign in to comment.