diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index a58ed4410..91ae00193 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -54,10 +54,9 @@ const ( alreadyInTerminalStatus ) -const addIsParentFilter = true - var isParent = common.NewMapFilter(map[string]interface{}{ shared.ParentTaskExecutionID: nil, + shared.ParentID: nil, }) func getNodeExecutionContext(ctx context.Context, identifier *core.NodeExecutionIdentifier) context.Context { @@ -77,9 +76,23 @@ func (m *NodeExecutionManager) createNodeExecutionWithEvent( } parentTaskExecutionID = taskExecutionModel.ID } + var parentID *uint + if request.Event.ParentNodeMetadata != nil { + parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, &core.NodeExecutionIdentifier{ + ExecutionId: request.Event.Id.ExecutionId, + NodeId: request.Event.ParentNodeMetadata.NodeId, + }) + if err != nil { + logger.Errorf(ctx, "failed to fetch node execution for the parent node: %v %s with err", + request.Event.Id.ExecutionId, request.Event.ParentNodeMetadata.NodeId, err) + return err + } + parentID = &parentNodeExecutionModel.ID + } nodeExecutionModel, err := transformers.CreateNodeExecutionModel(transformers.ToNodeExecutionModelInput{ Request: request, ParentTaskExecutionID: parentTaskExecutionID, + ParentID: parentID, }) if err != nil { logger.Debugf(ctx, "failed to create node execution model for event request: %s with err: %v", @@ -237,7 +250,7 @@ func (m *NodeExecutionManager) GetNodeExecution( func (m *NodeExecutionManager) listNodeExecutions( ctx context.Context, identifierFilters []common.InlineFilter, - requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, addIsParentFilter bool) ( + requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) ( *admin.NodeExecutionList, error) { filters, err := util.AddRequestFilters(requestFilters, common.NodeExecution, identifierFilters) @@ -262,11 +275,8 @@ func (m *NodeExecutionManager) listNodeExecutions( InlineFilters: filters, SortParameter: sortParameter, } - if addIsParentFilter { - listInput.MapFilters = []common.MapFilter{ - isParent, - } - } + + listInput.MapFilters = mapFilters output, err := m.db.NodeExecutionRepo().List(ctx, listInput) if err != nil { logger.Debugf(ctx, "Failed to list node executions for request with err %v", err) @@ -301,8 +311,28 @@ func (m *NodeExecutionManager) ListNodeExecutions( if err != nil { return nil, err } + var mapFilters []common.MapFilter + if request.UniqueParentId != "" { + parentNodeExecution, err := util.GetNodeExecutionModel(ctx, m.db, &core.NodeExecutionIdentifier{ + ExecutionId: request.WorkflowExecutionId, + NodeId: request.UniqueParentId, + }) + if err != nil { + return nil, err + } + parentIDFilter, err := common.NewSingleValueFilter( + common.NodeExecution, common.Equal, shared.ParentID, parentNodeExecution.ID) + if err != nil { + return nil, err + } + identifierFilters = append(identifierFilters, parentIDFilter) + } else { + mapFilters = []common.MapFilter{ + isParent, + } + } return m.listNodeExecutions( - ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, addIsParentFilter) + ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, mapFilters) } // Filters on node executions matching the execution parameters (execution project, domain, and name) as well as the @@ -330,7 +360,7 @@ func (m *NodeExecutionManager) ListNodeExecutionsForTask( } identifierFilters = append(identifierFilters, nodeIDFilter) return m.listNodeExecutions( - ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, !addIsParentFilter) + ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, nil) } func (m *NodeExecutionManager) GetNodeExecutionData( diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index 23dc7a7b4..3e8d89d1f 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -123,6 +123,7 @@ func TestCreateNodeEvent(t *testing.T) { InputURI: "input uri", StartedAt: &occurredAt, Closure: closureBytes, + NodeExecutionMetadata: []byte{}, NodeExecutionCreatedAt: &occurredAt, NodeExecutionUpdatedAt: &occurredAt, }, *input) @@ -356,6 +357,11 @@ func TestGetNodeExecution(t *testing.T) { expectedClosure := admin.NodeExecutionClosure{ Phase: core.NodeExecution_SUCCEEDED, } + expectedMetadata := admin.NodeExecutionMetaData{ + SpecNodeId: "spec_node_id", + RetryGroup: "retry_group", + } + metadataBytes, _ := proto.Marshal(&expectedMetadata) closureBytes, _ := proto.Marshal(&expectedClosure) repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetCallback( func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { @@ -377,10 +383,11 @@ func TestGetNodeExecution(t *testing.T) { Name: "name", }, }, - Phase: core.NodeExecution_SUCCEEDED.String(), - InputURI: "input uri", - StartedAt: &occurredAt, - Closure: closureBytes, + Phase: core.NodeExecution_SUCCEEDED.String(), + InputURI: "input uri", + StartedAt: &occurredAt, + Closure: closureBytes, + NodeExecutionMetadata: metadataBytes, }, nil }) nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) @@ -392,6 +399,71 @@ func TestGetNodeExecution(t *testing.T) { Id: &nodeExecutionIdentifier, InputUri: "input uri", Closure: &expectedClosure, + Metadata: &expectedMetadata, + }, nodeExecution)) +} + +func TestGetNodeExecutionParentNode(t *testing.T) { + repository := repositoryMocks.NewMockRepository() + expectedClosure := admin.NodeExecutionClosure{ + Phase: core.NodeExecution_SUCCEEDED, + } + expectedMetadata := admin.NodeExecutionMetaData{ + SpecNodeId: "spec_node_id", + RetryGroup: "retry_group", + } + metadataBytes, _ := proto.Marshal(&expectedMetadata) + closureBytes, _ := proto.Marshal(&expectedClosure) + repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetCallback( + func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { + workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + } + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "node id", + ExecutionId: &workflowExecutionIdentifier, + }, &input.NodeExecutionIdentifier)) + return models.NodeExecution{ + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "node id", + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, + Phase: core.NodeExecution_SUCCEEDED.String(), + InputURI: "input uri", + StartedAt: &occurredAt, + Closure: closureBytes, + NodeExecutionMetadata: metadataBytes, + ChildNodeExecutions: []models.NodeExecution{ + { + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "node-child", + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, + }, + }, + }, nil + }) + nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ + Id: &nodeExecutionIdentifier, + }) + assert.Nil(t, err) + expectedMetadata.IsParentNode = true + assert.True(t, proto.Equal(&admin.NodeExecution{ + Id: &nodeExecutionIdentifier, + InputUri: "input uri", + Closure: &expectedClosure, + Metadata: &expectedMetadata, }, nodeExecution)) } @@ -437,12 +509,18 @@ func TestGetNodeExecution_TransformerError(t *testing.T) { assert.Equal(t, err.(flyteAdminErrors.FlyteAdminError).Code(), codes.Internal) } -func TestListNodeExecutions(t *testing.T) { +func TestListNodeExecutionsLevelZero(t *testing.T) { repository := repositoryMocks.NewMockRepository() expectedClosure := admin.NodeExecutionClosure{ Phase: core.NodeExecution_SUCCEEDED, } + expectedMetadata := admin.NodeExecutionMetaData{ + SpecNodeId: "spec_node_id", + RetryGroup: "retry_group", + } + metadataBytes, _ := proto.Marshal(&expectedMetadata) closureBytes, _ := proto.Marshal(&expectedClosure) + repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetListCallback( func(ctx context.Context, input interfaces.ListResourceInput) ( interfaces.NodeExecutionCollectionOutput, error) { @@ -467,6 +545,7 @@ func TestListNodeExecutions(t *testing.T) { assert.Len(t, input.MapFilters, 1) filter := input.MapFilters[0].GetFilter() assert.Equal(t, map[string]interface{}{ + "parent_id": nil, "parent_task_execution_id": nil, }, filter) @@ -482,10 +561,11 @@ func TestListNodeExecutions(t *testing.T) { Name: "name", }, }, - Phase: core.NodeExecution_SUCCEEDED.String(), - InputURI: "input uri", - StartedAt: &occurredAt, - Closure: closureBytes, + Phase: core.NodeExecution_SUCCEEDED.String(), + InputURI: "input uri", + StartedAt: &occurredAt, + Closure: closureBytes, + NodeExecutionMetadata: metadataBytes, }, }, }, nil @@ -517,6 +597,107 @@ func TestListNodeExecutions(t *testing.T) { }, InputUri: "input uri", Closure: &expectedClosure, + Metadata: &expectedMetadata, + }, nodeExecutions.NodeExecutions[0])) + assert.Equal(t, "3", nodeExecutions.Token) +} + +func TestListNodeExecutionsWithParent(t *testing.T) { + repository := repositoryMocks.NewMockRepository() + expectedClosure := admin.NodeExecutionClosure{ + Phase: core.NodeExecution_SUCCEEDED, + } + expectedMetadata := admin.NodeExecutionMetaData{ + SpecNodeId: "spec_node_id", + RetryGroup: "retry_group", + } + metadataBytes, _ := proto.Marshal(&expectedMetadata) + closureBytes, _ := proto.Marshal(&expectedClosure) + parentID := uint(12) + repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetCallback(func(ctx context.Context, input interfaces.GetNodeExecutionInput) (execution models.NodeExecution, e error) { + assert.Equal(t, "parent_1", input.NodeExecutionIdentifier.NodeId) + return models.NodeExecution{ + BaseModel: models.BaseModel{ + ID: parentID, + }, + }, nil + }) + repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetListCallback( + func(ctx context.Context, input interfaces.ListResourceInput) ( + interfaces.NodeExecutionCollectionOutput, error) { + assert.Equal(t, 1, input.Limit) + assert.Equal(t, 2, input.Offset) + assert.Len(t, input.InlineFilters, 4) + assert.Equal(t, common.Execution, input.InlineFilters[0].GetEntity()) + queryExpr, _ := input.InlineFilters[0].GetGormQueryExpr() + assert.Equal(t, "project", queryExpr.Args) + assert.Equal(t, "execution_project = ?", queryExpr.Query) + + assert.Equal(t, common.Execution, input.InlineFilters[1].GetEntity()) + queryExpr, _ = input.InlineFilters[1].GetGormQueryExpr() + assert.Equal(t, "domain", queryExpr.Args) + assert.Equal(t, "execution_domain = ?", queryExpr.Query) + + assert.Equal(t, common.Execution, input.InlineFilters[2].GetEntity()) + queryExpr, _ = input.InlineFilters[2].GetGormQueryExpr() + assert.Equal(t, "name", queryExpr.Args) + assert.Equal(t, "execution_name = ?", queryExpr.Query) + + assert.Equal(t, common.NodeExecution, input.InlineFilters[3].GetEntity()) + queryExpr, _ = input.InlineFilters[3].GetGormQueryExpr() + assert.Equal(t, parentID, queryExpr.Args) + assert.Equal(t, "parent_id = ?", queryExpr.Query) + + assert.Equal(t, "domain asc", input.SortParameter.GetGormOrderExpr()) + return interfaces.NodeExecutionCollectionOutput{ + NodeExecutions: []models.NodeExecution{ + { + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "node id", + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, + Phase: core.NodeExecution_SUCCEEDED.String(), + InputURI: "input uri", + StartedAt: &occurredAt, + Closure: closureBytes, + NodeExecutionMetadata: metadataBytes, + }, + }, + }, nil + }) + nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ + WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Limit: 1, + Token: "2", + SortBy: &admin.Sort{ + Direction: admin.Sort_ASCENDING, + Key: "domain", + }, + UniqueParentId: "parent_1", + }) + assert.Nil(t, err) + assert.Len(t, nodeExecutions.NodeExecutions, 1) + assert.True(t, proto.Equal(&admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "node id", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, + InputUri: "input uri", + Closure: &expectedClosure, + Metadata: &expectedMetadata, }, nodeExecutions.NodeExecutions[0])) assert.Equal(t, "3", nodeExecutions.Token) } @@ -637,7 +818,13 @@ func TestListNodeExecutionsForTask(t *testing.T) { expectedClosure := admin.NodeExecutionClosure{ Phase: core.NodeExecution_SUCCEEDED, } + execMetadata := admin.NodeExecutionMetaData{ + SpecNodeId: "spec-n1", + } + closureBytes, _ := proto.Marshal(&expectedClosure) + execMetadataBytes, _ := proto.Marshal(&execMetadata) + repository.TaskExecutionRepo().(*repositoryMocks.MockTaskExecutionRepo).SetGetCallback( func(ctx context.Context, input interfaces.GetTaskExecutionInput) (models.TaskExecution, error) { return models.TaskExecution{ @@ -684,10 +871,23 @@ func TestListNodeExecutionsForTask(t *testing.T) { Name: "name", }, }, - Phase: core.NodeExecution_SUCCEEDED.String(), - InputURI: "input uri", - StartedAt: &occurredAt, - Closure: closureBytes, + Phase: core.NodeExecution_SUCCEEDED.String(), + InputURI: "input uri", + StartedAt: &occurredAt, + Closure: closureBytes, + NodeExecutionMetadata: execMetadataBytes, + ChildNodeExecutions: []models.NodeExecution{ + { + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "node-c", + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, + }, + }, }, }, }, nil @@ -720,6 +920,10 @@ func TestListNodeExecutionsForTask(t *testing.T) { }) assert.Nil(t, err) assert.Len(t, nodeExecutions.NodeExecutions, 1) + expectedMetadata := admin.NodeExecutionMetaData{ + SpecNodeId: "spec-n1", + IsParentNode: true, + } assert.True(t, proto.Equal(&admin.NodeExecution{ Id: &core.NodeExecutionIdentifier{ NodeId: "node id", @@ -731,6 +935,7 @@ func TestListNodeExecutionsForTask(t *testing.T) { }, InputUri: "input uri", Closure: &expectedClosure, + Metadata: &expectedMetadata, }, nodeExecutions.NodeExecutions[0])) assert.Equal(t, "3", nodeExecutions.Token) } @@ -743,6 +948,7 @@ func TestGetNodeExecutionData(t *testing.T) { OutputUri: "output uri", }, } + closureBytes, _ := proto.Marshal(&expectedClosure) repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetCallback( func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { diff --git a/pkg/manager/impl/shared/constants.go b/pkg/manager/impl/shared/constants.go index 532ebdb30..8b0297d5c 100644 --- a/pkg/manager/impl/shared/constants.go +++ b/pkg/manager/impl/shared/constants.go @@ -35,4 +35,6 @@ const ( Attributes = "attributes" MatchingAttributes = "matching_attributes" Resourcetype = "resource_type" + // Parent of a node execution in the node executions table + ParentID = "parent_id" ) diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index 8c4e243ad..2debdd84c 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -233,4 +233,14 @@ var Migrations = []*gormigrate.Migration{ return tx.Model(&models.NodeExecution{}).DropColumn("cache_status").Error }, }, + + { + ID: "2020-07-31-node-execution", + Migrate: func(tx *gorm.DB) error { + return tx.AutoMigrate(&models.NodeExecution{}).Error + }, + Rollback: func(tx *gorm.DB) error { + return tx.Model(&models.NodeExecution{}).DropColumn("parent_id").DropColumn("node_execution_metadata").Error + }, + }, } diff --git a/pkg/repositories/gormimpl/node_execution_repo.go b/pkg/repositories/gormimpl/node_execution_repo.go index c74bfc456..802391d4c 100644 --- a/pkg/repositories/gormimpl/node_execution_repo.go +++ b/pkg/repositories/gormimpl/node_execution_repo.go @@ -56,7 +56,7 @@ func (r *NodeExecutionRepo) Get(ctx context.Context, input interfaces.GetNodeExe Name: input.NodeExecutionIdentifier.ExecutionId.Name, }, }, - }).First(&nodeExecution) + }).Preload("ChildNodeExecutions").First(&nodeExecution) timer.Stop() if tx.Error != nil { return models.NodeExecution{}, r.errorTransformer.ToFlyteAdminError(tx.Error) @@ -103,7 +103,7 @@ func (r *NodeExecutionRepo) List(ctx context.Context, input interfaces.ListResou return interfaces.NodeExecutionCollectionOutput{}, err } var nodeExecutions []models.NodeExecution - tx := r.db.Limit(input.Limit).Offset(input.Offset) + tx := r.db.Limit(input.Limit).Offset(input.Offset).Preload("ChildNodeExecutions") // And add join condition (joining multiple tables is fine even we only filter on a subset of table attributes). // (this query isn't called for deletes). tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.execution_project = %s.execution_project AND "+ diff --git a/pkg/repositories/gormimpl/node_execution_repo_test.go b/pkg/repositories/gormimpl/node_execution_repo_test.go index ddace1540..9ad8f81c7 100644 --- a/pkg/repositories/gormimpl/node_execution_repo_test.go +++ b/pkg/repositories/gormimpl/node_execution_repo_test.go @@ -32,8 +32,8 @@ func TestCreateNodeExecution(t *testing.T) { nodeExecutionQuery := GlobalMock.NewMock() nodeExecutionQuery.WithQuery(`INSERT INTO "node_executions" ("id","created_at","updated_at","deleted_at",` + `"execution_project","execution_domain","execution_name","node_id","phase","input_uri","closure","started_at",` + - `"node_execution_created_at","node_execution_updated_at","duration","error_kind","error_code","cache_status") VALUES ` + - `(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`) + `"node_execution_created_at","node_execution_updated_at","duration","node_execution_metadata","parent_id","error_kind","error_code","cache_status") VALUES ` + + `(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`) nodeExecutionEventQuery := GlobalMock.NewMock() nodeExecutionEventQuery.WithQuery(`INSERT INTO "node_execution_events" ("created_at","updated_at",` + @@ -53,7 +53,7 @@ func TestCreateNodeExecution(t *testing.T) { Phase: nodePhase, OccurredAt: nodeStartedAt, } - + parentID := uint(10) nodeExecution := models.NodeExecution{ BaseModel: models.BaseModel{ ID: 2, @@ -68,11 +68,13 @@ func TestCreateNodeExecution(t *testing.T) { }, Phase: nodePhase, Closure: []byte("closure"), + NodeExecutionMetadata: []byte("closure"), InputURI: "input uri", StartedAt: &nodeStartedAt, Duration: time.Hour, NodeExecutionCreatedAt: &nodeCreatedAt, NodeExecutionUpdatedAt: &nodeCreatedAt, + ParentID: &parentID, } err := nodeExecutionRepo.Create(context.Background(), &nodeExecutionEvent, &nodeExecution) assert.NoError(t, err) @@ -83,7 +85,6 @@ func TestCreateNodeExecution(t *testing.T) { func TestUpdateNodeExecution(t *testing.T) { nodeExecutionRepo := NewNodeExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope()) GlobalMock := mocket.Catcher.Reset() - // Only match on queries that append the name filter nodeExecutionEventQuery := GlobalMock.NewMock() nodeExecutionEventQuery.WithQuery(`INSERT INTO "node_execution_events" ("created_at","updated_at",` + @@ -146,11 +147,16 @@ func getMockNodeExecutionResponseFromDb(expected models.NodeExecution) map[strin nodeExecution["duration"] = expected.Duration nodeExecution["node_execution_created_at"] = expected.NodeExecutionCreatedAt nodeExecution["node_execution_updated_at"] = expected.NodeExecutionUpdatedAt + nodeExecution["parent_id"] = expected.ParentID + if expected.NodeExecutionMetadata != nil { + nodeExecution["node_execution_metadata"] = expected.NodeExecutionMetadata + } return nodeExecution } func TestGetNodeExecution(t *testing.T) { nodeExecutionRepo := NewNodeExecutionRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope()) + parentID := uint(10) expectedNodeExecution := models.NodeExecution{ NodeExecutionKey: models.NodeExecutionKey{ NodeID: "1", @@ -167,6 +173,8 @@ func TestGetNodeExecution(t *testing.T) { Duration: time.Hour, NodeExecutionCreatedAt: &nodeCreatedAt, NodeExecutionUpdatedAt: &nodePlanUpdatedAt, + NodeExecutionMetadata: []byte("NodeExecutionMetadata"), + ParentID: &parentID, } nodeExecutions := make([]map[string]interface{}, 0) @@ -174,7 +182,6 @@ func TestGetNodeExecution(t *testing.T) { nodeExecutions = append(nodeExecutions, nodeExecution) GlobalMock := mocket.Catcher.Reset() - GlobalMock.NewMock().WithQuery( `SELECT * FROM "node_executions" WHERE "node_executions"."deleted_at" IS NULL AND ` + `(("node_executions"."execution_project" = execution_project) AND ("node_executions"."execution_domain" ` + @@ -300,11 +307,12 @@ func TestListNodeExecutionsForExecution(t *testing.T) { Name: "1", }, }, - Phase: nodePhase, - Closure: []byte("closure"), - InputURI: "input uri", - StartedAt: &nodeStartedAt, - Duration: time.Hour, + Phase: nodePhase, + Closure: []byte("closure"), + InputURI: "input uri", + StartedAt: &nodeStartedAt, + Duration: time.Hour, + NodeExecutionMetadata: []byte("NodeExecutionMetadata"), }) nodeExecutions = append(nodeExecutions, nodeExecution) @@ -337,6 +345,9 @@ func TestListNodeExecutionsForExecution(t *testing.T) { assert.Equal(t, "input uri", nodeExecution.InputURI) assert.Equal(t, nodeStartedAt, *nodeExecution.StartedAt) assert.Equal(t, time.Hour, nodeExecution.Duration) + assert.Equal(t, []byte("NodeExecutionMetadata"), nodeExecution.NodeExecutionMetadata) + assert.Empty(t, nodeExecution.ChildNodeExecutions) + assert.Empty(t, nodeExecution.ParentID) } } diff --git a/pkg/repositories/models/node_execution.go b/pkg/repositories/models/node_execution.go index 7dae1c8f7..89ddb3b70 100644 --- a/pkg/repositories/models/node_execution.go +++ b/pkg/repositories/models/node_execution.go @@ -29,9 +29,17 @@ type NodeExecution struct { NodeExecutionUpdatedAt *time.Time Duration time.Duration NodeExecutionEvents []NodeExecutionEvent + // Metadata about the node execution. + NodeExecutionMetadata []byte + // Parent that spawned this node execution - value is empty for executions at level 0 + ParentID *uint `sql:"default:null"` + // List of child node executions - for cases like Dynamic task, sub workflow, etc + ChildNodeExecutions []NodeExecution `gorm:"foreignkey:ParentID"` // The task execution (if any) which launched this node execution. + // TO BE DEPRECATED - as we have now introduced ParentID ParentTaskExecutionID uint `sql:"default:null" gorm:"index"` // The workflow execution (if any) which this node execution launched + // NOTE: LaunchedExecution[foreignkey:ParentNodeExecutionID] refers to Workflow execution launched and is different from ParentID LaunchedExecution Execution `gorm:"foreignkey:ParentNodeExecutionID"` // Execution Error Kind. nullable, can be one of core.ExecutionError_ErrorKind ErrorKind *string `gorm:"index"` diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index 9a403be0e..9636b35be 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -22,6 +22,7 @@ import ( type ToNodeExecutionModelInput struct { Request *admin.NodeExecutionEventRequest ParentTaskExecutionID uint + ParentID *uint } func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecutionModel *models.NodeExecution, @@ -92,6 +93,11 @@ func CreateNodeExecutionModel(input ToNodeExecutionModelInput) (*models.NodeExec UpdatedAt: input.Request.Event.OccurredAt, } + nodeExecutionMetadata := admin.NodeExecutionMetaData{ + RetryGroup: input.Request.Event.RetryGroup, + SpecNodeId: input.Request.Event.SpecNodeId, + } + if input.Request.Event.Phase == core.NodeExecution_RUNNING { err := addNodeRunningState(input.Request, nodeExecution, &closure) if err != nil { @@ -109,7 +115,13 @@ func CreateNodeExecutionModel(input ToNodeExecutionModelInput) (*models.NodeExec return nil, errors.NewFlyteAdminErrorf( codes.Internal, "failed to marshal node execution closure with error: %v", err) } + marshaledNodeExecutionMetadata, err := proto.Marshal(&nodeExecutionMetadata) + if err != nil { + return nil, errors.NewFlyteAdminErrorf( + codes.Internal, "failed to marshal node execution metadata with error: %v", err) + } nodeExecution.Closure = marshaledClosure + nodeExecution.NodeExecutionMetadata = marshaledNodeExecutionMetadata nodeExecutionCreatedAt, err := ptypes.Timestamp(input.Request.Event.OccurredAt) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read event timestamp") @@ -119,6 +131,7 @@ func CreateNodeExecutionModel(input ToNodeExecutionModelInput) (*models.NodeExec if input.Request.Event.ParentTaskMetadata != nil { nodeExecution.ParentTaskExecutionID = input.ParentTaskExecutionID } + nodeExecution.ParentID = input.ParentID return nodeExecution, nil } @@ -134,6 +147,7 @@ func UpdateNodeExecutionModel( nodeExecutionModel.Phase = request.Event.Phase.String() nodeExecutionClosure.Phase = request.Event.Phase nodeExecutionClosure.UpdatedAt = request.Event.OccurredAt + if request.Event.Phase == core.NodeExecution_RUNNING { err := addNodeRunningState(request, nodeExecutionModel, &nodeExecutionClosure) if err != nil { @@ -173,6 +187,7 @@ func UpdateNodeExecutionModel( return errors.NewFlyteAdminErrorf( codes.Internal, "failed to marshal node execution closure with error: %v", err) } + nodeExecutionModel.Closure = marshaledClosure updatedAt, err := ptypes.Timestamp(request.Event.OccurredAt) if err != nil { @@ -189,6 +204,14 @@ func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution) (*admin.Nod return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure") } + var nodeExecutionMetadata admin.NodeExecutionMetaData + err = proto.Unmarshal(nodeExecutionModel.NodeExecutionMetadata, &nodeExecutionMetadata) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal nodeExecutionMetadata") + } + if len(nodeExecutionModel.ChildNodeExecutions) > 0 { + nodeExecutionMetadata.IsParentNode = true + } return &admin.NodeExecution{ Id: &core.NodeExecutionIdentifier{ NodeId: nodeExecutionModel.NodeID, @@ -200,6 +223,7 @@ func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution) (*admin.Nod }, InputUri: nodeExecutionModel.InputURI, Closure: &closure, + Metadata: &nodeExecutionMetadata, }, nil } diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index 5bca4b72c..e94ee2efd 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -27,6 +27,12 @@ var closure = &admin.NodeExecutionClosure{ Duration: ptypes.DurationProto(duration), } var closureBytes, _ = proto.Marshal(closure) +var nodeExecutionMetadata = admin.NodeExecutionMetaData{ + IsParentNode: false, + RetryGroup: "r", + SpecNodeId: "sp", +} +var nodeExecutionMetadataBytes, _ = proto.Marshal(&nodeExecutionMetadata) var childExecutionID = &core.WorkflowExecutionIdentifier{ Project: "p", @@ -154,6 +160,7 @@ func TestCreateNodeExecutionModel(t *testing.T) { StartedAt: &occurredAt, NodeExecutionCreatedAt: &occurredAt, NodeExecutionUpdatedAt: &occurredAt, + NodeExecutionMetadata: []byte{}, ParentTaskExecutionID: 8, }, nodeExecutionModel) } @@ -260,8 +267,52 @@ func TestFromNodeExecutionModel(t *testing.T) { Name: "name", }, }, - Phase: "NodeExecutionPhase_NODE_PHASE_RUNNING", - Closure: closureBytes, + Phase: "NodeExecutionPhase_NODE_PHASE_RUNNING", + Closure: closureBytes, + NodeExecutionMetadata: nodeExecutionMetadataBytes, + InputURI: "input uri", + Duration: duration, + }) + assert.Nil(t, err) + assert.True(t, proto.Equal(&admin.NodeExecution{ + Id: &nodeExecutionIdentifier, + InputUri: "input uri", + Closure: closure, + Metadata: &nodeExecutionMetadata, + }, nodeExecution)) +} + +func TestFromNodeExecutionModelWithChildren(t *testing.T) { + nodeExecutionIdentifier := core.NodeExecutionIdentifier{ + NodeId: "nodey", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + }, + } + nodeExecution, err := FromNodeExecutionModel(models.NodeExecution{ + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "nodey", + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }, + Phase: "NodeExecutionPhase_NODE_PHASE_RUNNING", + Closure: closureBytes, + NodeExecutionMetadata: nodeExecutionMetadataBytes, + ChildNodeExecutions: []models.NodeExecution{ + {NodeExecutionKey: models.NodeExecutionKey{ + NodeID: "nodec1", + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + }}, + }, InputURI: "input uri", Duration: duration, }) @@ -270,6 +321,11 @@ func TestFromNodeExecutionModel(t *testing.T) { Id: &nodeExecutionIdentifier, InputUri: "input uri", Closure: closure, + Metadata: &admin.NodeExecutionMetaData{ + IsParentNode: true, + RetryGroup: "r", + SpecNodeId: "sp", + }, }, nodeExecution)) } @@ -284,10 +340,11 @@ func TestFromNodeExecutionModels(t *testing.T) { Name: "name", }, }, - Phase: "NodeExecutionPhase_NODE_PHASE_RUNNING", - Closure: closureBytes, - InputURI: "input uri", - Duration: duration, + Phase: "NodeExecutionPhase_NODE_PHASE_RUNNING", + Closure: closureBytes, + NodeExecutionMetadata: nodeExecutionMetadataBytes, + InputURI: "input uri", + Duration: duration, }, }) assert.Nil(t, err) @@ -303,5 +360,6 @@ func TestFromNodeExecutionModels(t *testing.T) { }, InputUri: "input uri", Closure: closure, + Metadata: &nodeExecutionMetadata, }, nodeExecutions[0])) } diff --git a/tests/node_execution_test.go b/tests/node_execution_test.go index 9fbe05487..13a0c8aea 100644 --- a/tests/node_execution_test.go +++ b/tests/node_execution_test.go @@ -59,6 +59,73 @@ func TestCreateNodeExecution(t *testing.T) { assert.True(t, proto.Equal(occurredAtProto, response.Closure.StartedAt)) } +func TestCreateNodeExecutionWithParent(t *testing.T) { + truncateAllTablesForTestingOnly() + populateWorkflowExecutionForTestingOnly(project, domain, name) + + ctx := context.Background() + client, conn := GetTestAdminServiceClient() + defer conn.Close() + + occurredAt := time.Now() + occurredAtProto, _ := ptypes.TimestampProto(occurredAt) + _, err := client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{ + RequestId: "request id", + Event: &event.NodeExecutionEvent{ + Id: nodeExecutionId, + Phase: core.NodeExecution_RUNNING, + InputUri: inputURI, + OccurredAt: occurredAtProto, + }, + }) + assert.Nil(t, err) + + response, err := client.GetNodeExecution(ctx, &admin.NodeExecutionGetRequest{ + Id: nodeExecutionId, + }) + assert.Nil(t, err) + assert.False(t, response.Metadata.IsParentNode) + assert.True(t, proto.Equal(nodeExecutionId, response.Id)) + + _, err = client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{ + RequestId: "request id", + Event: &event.NodeExecutionEvent{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "child", + ExecutionId: nodeExecutionId.ExecutionId, + }, + Phase: core.NodeExecution_RUNNING, + InputUri: inputURI, + OccurredAt: occurredAtProto, + SpecNodeId: "spec", + RetryGroup: "1", + ParentNodeMetadata: &event.ParentNodeExecutionMetadata{ + NodeId: nodeExecutionId.NodeId, + }, + }, + }) + response, err = client.GetNodeExecution(ctx, &admin.NodeExecutionGetRequest{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "child", + ExecutionId: nodeExecutionId.ExecutionId, + }, + }) + assert.Nil(t, err) + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "child", + ExecutionId: nodeExecutionId.ExecutionId, + }, response.Id)) + assert.Nil(t, err) + assert.False(t, response.Metadata.IsParentNode) + + response, err = client.GetNodeExecution(ctx, &admin.NodeExecutionGetRequest{ + Id: nodeExecutionId, + }) + assert.Nil(t, err) + assert.True(t, response.Metadata.IsParentNode) + assert.True(t, proto.Equal(nodeExecutionId, response.Id)) +} + func TestCreateAndUpdateNodeExecution(t *testing.T) { truncateAllTablesForTestingOnly() populateWorkflowExecutionForTestingOnly(project, domain, name) @@ -190,6 +257,113 @@ func TestCreateAndListNodeExecutions(t *testing.T) { assert.True(t, proto.Equal(occurredAtProto, nodeExecutionResponse.Closure.StartedAt)) } +func TestListNodeExecutionWithParent(t *testing.T) { + truncateAllTablesForTestingOnly() + populateWorkflowExecutionForTestingOnly(project, domain, name) + + ctx := context.Background() + client, conn := GetTestAdminServiceClient() + defer conn.Close() + + occurredAt := time.Now() + occurredAtProto, _ := ptypes.TimestampProto(occurredAt) + _, err := client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{ + RequestId: "request id", + Event: &event.NodeExecutionEvent{ + Id: nodeExecutionId, + Phase: core.NodeExecution_RUNNING, + InputUri: inputURI, + OccurredAt: occurredAtProto, + }, + }) + assert.Nil(t, err) + + _, err = client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{ + RequestId: "request id", + Event: &event.NodeExecutionEvent{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "child", + ExecutionId: nodeExecutionId.ExecutionId, + }, + Phase: core.NodeExecution_RUNNING, + InputUri: inputURI, + OccurredAt: occurredAtProto, + SpecNodeId: "spec", + RetryGroup: "1", + ParentNodeMetadata: &event.ParentNodeExecutionMetadata{ + NodeId: nodeExecutionId.NodeId, + }, + }, + }) + assert.Nil(t, err) + + _, err = client.CreateNodeEvent(ctx, &admin.NodeExecutionEventRequest{ + RequestId: "request id", + Event: &event.NodeExecutionEvent{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "child2", + ExecutionId: nodeExecutionId.ExecutionId, + }, + Phase: core.NodeExecution_RUNNING, + InputUri: inputURI, + OccurredAt: occurredAtProto, + SpecNodeId: "spec", + RetryGroup: "1", + ParentNodeMetadata: &event.ParentNodeExecutionMetadata{ + NodeId: nodeExecutionId.NodeId, + }, + }, + }) + assert.Nil(t, err) + + response, err := client.ListNodeExecutions(ctx, &admin.NodeExecutionListRequest{ + WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ + Project: project, + Domain: domain, + Name: name, + }, + Limit: 10, + }) + + assert.Nil(t, err) + assert.Len(t, response.NodeExecutions, 1) + nodeExecutionResponse := response.NodeExecutions[0] + assert.True(t, proto.Equal(nodeExecutionId, nodeExecutionResponse.Id)) + assert.Equal(t, core.NodeExecution_RUNNING, nodeExecutionResponse.Closure.Phase) + assert.Equal(t, inputURI, nodeExecutionResponse.InputUri) + assert.True(t, proto.Equal(occurredAtProto, nodeExecutionResponse.Closure.StartedAt)) + + response, err = client.ListNodeExecutions(ctx, &admin.NodeExecutionListRequest{ + WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ + Project: project, + Domain: domain, + Name: name, + }, + UniqueParentId: nodeExecutionId.NodeId, + Limit: 10, + }) + + assert.Nil(t, err) + assert.Len(t, response.NodeExecutions, 2) + nodeExecutionResponse = response.NodeExecutions[0] + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "child", + ExecutionId: nodeExecutionId.ExecutionId, + }, nodeExecutionResponse.Id)) + assert.Equal(t, core.NodeExecution_RUNNING, nodeExecutionResponse.Closure.Phase) + assert.Equal(t, inputURI, nodeExecutionResponse.InputUri) + assert.True(t, proto.Equal(occurredAtProto, nodeExecutionResponse.Closure.StartedAt)) + + nodeExecutionResponse = response.NodeExecutions[1] + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "child2", + ExecutionId: nodeExecutionId.ExecutionId, + }, nodeExecutionResponse.Id)) + assert.Equal(t, core.NodeExecution_RUNNING, nodeExecutionResponse.Closure.Phase) + assert.Equal(t, inputURI, nodeExecutionResponse.InputUri) + assert.True(t, proto.Equal(occurredAtProto, nodeExecutionResponse.Closure.StartedAt)) +} + func TestCreateChildNodeExecutionForTaskExecution(t *testing.T) { truncateAllTablesForTestingOnly() populateWorkflowExecutionForTestingOnly(project, domain, name)