Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Admin changes for Parent child node execution relationship (#111)
Browse files Browse the repository at this point in the history
* Admin changes for Parent child node execution relationship
* Add migrations
  • Loading branch information
anandswaminathan authored Aug 3, 2020
1 parent 464028f commit efd98d4
Show file tree
Hide file tree
Showing 10 changed files with 564 additions and 41 deletions.
50 changes: 40 additions & 10 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ const (
alreadyInTerminalStatus
)

const addIsParentFilter = true

var isParent = common.NewMapFilter(map[string]interface{}{
shared.ParentTaskExecutionID: nil,
shared.ParentID: nil,
})

func getNodeExecutionContext(ctx context.Context, identifier *core.NodeExecutionIdentifier) context.Context {
Expand All @@ -77,9 +76,23 @@ func (m *NodeExecutionManager) createNodeExecutionWithEvent(
}
parentTaskExecutionID = taskExecutionModel.ID
}
var parentID *uint
if request.Event.ParentNodeMetadata != nil {
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, &core.NodeExecutionIdentifier{
ExecutionId: request.Event.Id.ExecutionId,
NodeId: request.Event.ParentNodeMetadata.NodeId,
})
if err != nil {
logger.Errorf(ctx, "failed to fetch node execution for the parent node: %v %s with err",
request.Event.Id.ExecutionId, request.Event.ParentNodeMetadata.NodeId, err)
return err
}
parentID = &parentNodeExecutionModel.ID
}
nodeExecutionModel, err := transformers.CreateNodeExecutionModel(transformers.ToNodeExecutionModelInput{
Request: request,
ParentTaskExecutionID: parentTaskExecutionID,
ParentID: parentID,
})
if err != nil {
logger.Debugf(ctx, "failed to create node execution model for event request: %s with err: %v",
Expand Down Expand Up @@ -237,7 +250,7 @@ func (m *NodeExecutionManager) GetNodeExecution(

func (m *NodeExecutionManager) listNodeExecutions(
ctx context.Context, identifierFilters []common.InlineFilter,
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, addIsParentFilter bool) (
requestFilters string, limit uint32, requestToken string, sortBy *admin.Sort, mapFilters []common.MapFilter) (
*admin.NodeExecutionList, error) {

filters, err := util.AddRequestFilters(requestFilters, common.NodeExecution, identifierFilters)
Expand All @@ -262,11 +275,8 @@ func (m *NodeExecutionManager) listNodeExecutions(
InlineFilters: filters,
SortParameter: sortParameter,
}
if addIsParentFilter {
listInput.MapFilters = []common.MapFilter{
isParent,
}
}

listInput.MapFilters = mapFilters
output, err := m.db.NodeExecutionRepo().List(ctx, listInput)
if err != nil {
logger.Debugf(ctx, "Failed to list node executions for request with err %v", err)
Expand Down Expand Up @@ -301,8 +311,28 @@ func (m *NodeExecutionManager) ListNodeExecutions(
if err != nil {
return nil, err
}
var mapFilters []common.MapFilter
if request.UniqueParentId != "" {
parentNodeExecution, err := util.GetNodeExecutionModel(ctx, m.db, &core.NodeExecutionIdentifier{
ExecutionId: request.WorkflowExecutionId,
NodeId: request.UniqueParentId,
})
if err != nil {
return nil, err
}
parentIDFilter, err := common.NewSingleValueFilter(
common.NodeExecution, common.Equal, shared.ParentID, parentNodeExecution.ID)
if err != nil {
return nil, err
}
identifierFilters = append(identifierFilters, parentIDFilter)
} else {
mapFilters = []common.MapFilter{
isParent,
}
}
return m.listNodeExecutions(
ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, addIsParentFilter)
ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, mapFilters)
}

// Filters on node executions matching the execution parameters (execution project, domain, and name) as well as the
Expand Down Expand Up @@ -330,7 +360,7 @@ func (m *NodeExecutionManager) ListNodeExecutionsForTask(
}
identifierFilters = append(identifierFilters, nodeIDFilter)
return m.listNodeExecutions(
ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, !addIsParentFilter)
ctx, identifierFilters, request.Filters, request.Limit, request.Token, request.SortBy, nil)
}

func (m *NodeExecutionManager) GetNodeExecutionData(
Expand Down
Loading

0 comments on commit efd98d4

Please sign in to comment.