Skip to content

Commit

Permalink
Using isDynamic flag for distinguishing subworkflows and dynamic work…
Browse files Browse the repository at this point in the history
…flows (flyteorg#351)
  • Loading branch information
pmahindrakar-oss authored Feb 23, 2022
1 parent c2331a3 commit 7a6916d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 17 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/benbjohnson/clock v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.22.1
github.com/flyteorg/flyteidl v0.22.2
github.com/flyteorg/flyteplugins v0.9.1
github.com/flyteorg/flytepropeller v0.16.14
github.com/flyteorg/flytestdlib v0.4.7
Expand Down
6 changes: 2 additions & 4 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.18/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.24 h1:e2wPBK4aiLE+fw2zmhUDNg39QoJk6Lf5lQRvj8XgtFk=
github.com/flyteorg/flyteidl v0.21.24/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.22.1 h1:9OYtiUIDTKsnNRoVGFcvUrIRbD3dxUJYgRTDnNnMRbw=
github.com/flyteorg/flyteidl v0.22.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.22.2 h1:cH1amuHV2AjUAJ7RuQOzrgeeRGEzhNV8Is3kTAIPS4U=
github.com/flyteorg/flyteidl v0.22.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.9.1 h1:Z0gxSvG7LeI+COfEmuzkhz9RnJ4E5wWUcjj5qh1uKuw=
github.com/flyteorg/flyteplugins v0.9.1/go.mod h1:OEGQztPFDJG4DV7tS9lDsRRd521iUINn5dcsBf6bW5k=
github.com/flyteorg/flytepropeller v0.16.14 h1:zG+UnfZLPCQdwh7ORm3BNwXlb6Sp2Wwa7I7NnZYcPDw=
Expand Down
3 changes: 3 additions & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution) (*admin.Nod
}
if len(nodeExecutionModel.ChildNodeExecutions) > 0 {
nodeExecutionMetadata.IsParentNode = true
if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
nodeExecutionMetadata.IsDynamic = true
}
}
return &admin.NodeExecution{
Id: &core.NodeExecutionIdentifier{
Expand Down
46 changes: 34 additions & 12 deletions flyteadmin/pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) {
Name: "name",
},
}
nodeExecution, err := FromNodeExecutionModel(models.NodeExecution{
nodeExecModel := models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "nodey",
ExecutionKey: models.ExecutionKey{
Expand All @@ -411,18 +411,40 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) {
},
InputURI: "input uri",
Duration: duration,
}
t.Run("dynamic workflow", func(t *testing.T) {
nodeExecModel.DynamicWorkflowRemoteClosureReference = "dummy_dynamic_worklfow_ref"
nodeExecution, err := FromNodeExecutionModel(nodeExecModel)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
InputUri: "input uri",
Closure: closure,
Metadata: &admin.NodeExecutionMetaData{
IsParentNode: true,
RetryGroup: "r",
SpecNodeId: "sp",
IsDynamic: true,
},
}, nodeExecution))
})
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
InputUri: "input uri",
Closure: closure,
Metadata: &admin.NodeExecutionMetaData{
IsParentNode: true,
RetryGroup: "r",
SpecNodeId: "sp",
},
}, nodeExecution))
t.Run("non dynamic workflow", func(t *testing.T) {
nodeExecModel.DynamicWorkflowRemoteClosureReference = ""
nodeExecution, err := FromNodeExecutionModel(nodeExecModel)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
InputUri: "input uri",
Closure: closure,
Metadata: &admin.NodeExecutionMetaData{
IsParentNode: true,
RetryGroup: "r",
SpecNodeId: "sp",
IsDynamic: false,
},
}, nodeExecution))
})

}

func TestFromNodeExecutionModels(t *testing.T) {
Expand Down

0 comments on commit 7a6916d

Please sign in to comment.