Skip to content

Commit

Permalink
Track lineage for child workflows (flyteorg#98)
Browse files Browse the repository at this point in the history
katrogan authored May 19, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 0ce1932 commit 026c40f
Showing 4 changed files with 44 additions and 2 deletions.
12 changes: 11 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
@@ -346,8 +346,9 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
}
ctx = getExecutionContext(ctx, &workflowExecutionID)

// Get the node execution (if any) that launched this execution
// Get the node and parent execution (if any) that launched this execution
var parentNodeExecutionID uint
var sourceExecutionID uint
if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil {
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution)
if err != nil {
@@ -357,6 +358,14 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
}

parentNodeExecutionID = parentNodeExecutionModel.ID

sourceExecutionModel, err := util.GetExecutionModel(ctx, m.db, *request.Spec.Metadata.ParentNodeExecution.ExecutionId)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return nil, nil, err
}
sourceExecutionID = sourceExecutionModel.ID
}

// Dynamically assign task resource defaults.
@@ -424,6 +433,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
SourceExecutionID: sourceExecutionID,
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
29 changes: 28 additions & 1 deletion pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
@@ -264,9 +264,33 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
}, nil
},
)
getExecutionCalled := false
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error) {
assert.EqualValues(t, input.Project, parentNodeExecutionID.ExecutionId.Project)
assert.EqualValues(t, input.Domain, parentNodeExecutionID.ExecutionId.Domain)
assert.EqualValues(t, input.Name, parentNodeExecutionID.ExecutionId.Name)
getExecutionCalled = true
return models.Execution{
BaseModel: models.BaseModel{
ID: 2,
},
}, nil
},
)

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
assert.Equal(t, input.ParentNodeExecutionID, uint(1))
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, admin.ExecutionMetadata_CHILD_WORKFLOW, spec.Metadata.Mode)
assert.Equal(t, "feeny", spec.Metadata.Principal)
assert.EqualValues(t, 1, spec.Metadata.Nesting)
assert.True(t, proto.Equal(&parentNodeExecutionID, spec.Metadata.ParentNodeExecution))
assert.EqualValues(t, input.ParentNodeExecutionID, 1)
assert.EqualValues(t, input.SourceExecutionID, 2)
return nil
},
)
@@ -276,12 +300,15 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL)
request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Mode: admin.ExecutionMetadata_SYSTEM,
Mode: admin.ExecutionMetadata_CHILD_WORKFLOW,
Nesting: 1,
ParentNodeExecution: &parentNodeExecutionID,
Principal: "feeny",
}
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
assert.True(t, getNodeExecutionCalled)
assert.True(t, getExecutionCalled)
expectedResponse := &admin.ExecutionCreateResponse{
Id: &executionIdentifier,
}
2 changes: 2 additions & 0 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ type CreateExecutionModelInput struct {
Notifications []*admin.Notification
WorkflowIdentifier *core.Identifier
ParentNodeExecutionID uint
SourceExecutionID uint
Cluster string
InputsURI storage.DataReference
UserInputsURI storage.DataReference
@@ -85,6 +86,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
ExecutionCreatedAt: &input.CreatedAt,
ExecutionUpdatedAt: &input.CreatedAt,
ParentNodeExecutionID: input.ParentNodeExecutionID,
SourceExecutionID: input.SourceExecutionID,
Cluster: input.Cluster,
InputsURI: input.InputsURI,
UserInputsURI: input.UserInputsURI,
3 changes: 3 additions & 0 deletions pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ func TestCreateExecutionModel(t *testing.T) {
lpID := uint(33)
wfID := uint(23)
nodeID := uint(11)
sourceID := uint(12)
createdAt := time.Now()
workflowIdentifier := &core.Identifier{
Project: "project",
@@ -67,6 +68,7 @@ func TestCreateExecutionModel(t *testing.T) {
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
SourceExecutionID: sourceID,
Principal: principal,
Cluster: cluster,
})
@@ -80,6 +82,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.EqualValues(t, createdAt, *execution.ExecutionUpdatedAt)
assert.Equal(t, int32(admin.ExecutionMetadata_SYSTEM), execution.Mode)
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{

0 comments on commit 026c40f

Please sign in to comment.