From cca9e1702bbc3ee774f7214b172c1e5bfff4716f Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Mon, 27 Mar 2023 15:01:14 -0500 Subject: [PATCH] Add endpoint exposing runtime metrics (#524) * metrics manager working for task nodes Signed-off-by: Daniel Rammer * dynamic tasks working Signed-off-by: Daniel Rammer * subworkflow node working Signed-off-by: Daniel Rammer * refactored to allow branch and gate node parsing Signed-off-by: Daniel Rammer * added node transition times Signed-off-by: Daniel Rammer * sorting node and task executions Signed-off-by: Daniel Rammer * added metrics metrics ... inception Signed-off-by: Daniel Rammer * fixed duplicate metrics Signed-off-by: Daniel Rammer * branch node working? Signed-off-by: Daniel Rammer * implemented gate node Signed-off-by: Daniel Rammer * working with partial completions and failures and cache hits Signed-off-by: Daniel Rammer * added unit tests Signed-off-by: Daniel Rammer * fixed most lint issues Signed-off-by: Daniel Rammer * fixed lint issues Signed-off-by: Daniel Rammer * added docs Signed-off-by: Daniel Rammer * updated flyteidl dependency Signed-off-by: Daniel Rammer * fixed unit tests Signed-off-by: Daniel Rammer * chnaged to local flyteidl for testing Signed-off-by: Daniel Rammer * using task event reported_at timestamps for updated_at in task execution models Signed-off-by: Daniel Rammer * updated flyteidl Signed-off-by: Daniel Rammer * fixed lint issues Signed-off-by: Daniel Rammer * using reported_at for node executions Signed-off-by: Daniel Rammer * updated flyteidl Signed-off-by: Daniel Rammer * fixes unit tests and lint issues Signed-off-by: Daniel Rammer * updated flyteidl Signed-off-by: Daniel Rammer * bumped flyteidl deps Signed-off-by: Daniel Rammer * using consts for start-node and end-node Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 4 +- pkg/manager/impl/metrics_manager.go | 684 +++++++++++ pkg/manager/impl/metrics_manager_test.go | 1082 +++++++++++++++++ pkg/manager/interfaces/metrics.go | 15 + pkg/manager/mocks/metrics_interface.go | 57 + pkg/manager/mocks/workflow.go | 9 + .../transformers/node_execution.go | 21 +- .../transformers/task_execution.go | 21 +- pkg/rpc/adminservice/base.go | 20 +- pkg/rpc/adminservice/execution.go | 20 +- pkg/rpc/adminservice/metrics.go | 4 + 12 files changed, 1920 insertions(+), 19 deletions(-) create mode 100644 pkg/manager/impl/metrics_manager.go create mode 100644 pkg/manager/impl/metrics_manager_test.go create mode 100644 pkg/manager/interfaces/metrics.go create mode 100644 pkg/manager/mocks/metrics_interface.go diff --git a/go.mod b/go.mod index 4efdaed1ec..6710d33bbe 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.3.13 + github.com/flyteorg/flyteidl v1.3.14 github.com/flyteorg/flyteplugins v1.0.40 github.com/flyteorg/flytepropeller v1.1.70 github.com/flyteorg/flytestdlib v1.0.15 diff --git a/go.sum b/go.sum index 638d2b84d4..f2fe7c70b8 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.13 h1:jOjiHl6jmSCOGC094QaRdSjjhThhzYPm0jHSxwAZ6UM= -github.com/flyteorg/flyteidl v1.3.13/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= +github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s= github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w= diff --git a/pkg/manager/impl/metrics_manager.go b/pkg/manager/impl/metrics_manager.go new file mode 100644 index 0000000000..a6d010b1e2 --- /dev/null +++ b/pkg/manager/impl/metrics_manager.go @@ -0,0 +1,684 @@ +package impl + +import ( + "context" + "fmt" + "reflect" + "sort" + "time" + + "github.com/flyteorg/flyteadmin/pkg/manager/interfaces" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + + "github.com/flyteorg/flytestdlib/promutils" + + "github.com/golang/protobuf/ptypes/duration" + "github.com/golang/protobuf/ptypes/timestamp" + + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + RequestLimit uint32 = 50 + + nodeIdle = "NODE_IDLE" + nodeReset = "NODE_RESET" + nodeSetup = "NODE_SETUP" + nodeTeardown = "NODE_TEARDOWN" + nodeTransition = "NODE_TRANSITION" + taskRuntime = "TASK_RUNTIME" + taskSetup = "TASK_SETUP" + taskTeardown = "TASK_TEARDOWN" + workflowSetup = "WORKFLOW_SETUP" + workflowTeardown = "WORKFLOW_TEARDOWN" +) + +var ( + emptyDuration *duration.Duration = &duration.Duration{ + Seconds: 0, + Nanos: 0, + } + emptyTimestamp *timestamp.Timestamp = ×tamp.Timestamp{ + Seconds: 0, + Nanos: 0, + } +) + +type metrics struct { + Scope promutils.Scope +} + +// MetricsManager handles computation of workflow, node, and task execution metrics. +type MetricsManager struct { + workflowManager interfaces.WorkflowInterface + executionManager interfaces.ExecutionInterface + nodeExecutionManager interfaces.NodeExecutionInterface + taskExecutionManager interfaces.TaskExecutionInterface + metrics metrics +} + +// createOperationSpan returns a Span defined by the provided arguments. +func createOperationSpan(startTime, endTime *timestamp.Timestamp, operation string) *core.Span { + return &core.Span{ + StartTime: startTime, + EndTime: endTime, + Id: &core.Span_OperationId{ + OperationId: operation, + }, + } +} + +// getBranchNode searches the provided BranchNode definition for the Node identified by nodeID. +func getBranchNode(nodeID string, branchNode *core.BranchNode) *core.Node { + if branchNode.IfElse.Case.ThenNode.Id == nodeID { + return branchNode.IfElse.Case.ThenNode + } + + for _, other := range branchNode.IfElse.Other { + if other.ThenNode.Id == nodeID { + return other.ThenNode + } + } + + if elseNode, ok := branchNode.IfElse.Default.(*core.IfElseBlock_ElseNode); ok { + if elseNode.ElseNode.Id == nodeID { + return elseNode.ElseNode + } + } + + return nil +} + +// getLatestUpstreamNodeExecution returns the NodeExecution with the latest UpdatedAt timestamp that is an upstream +// dependency of the provided nodeID. This is useful for computing the duration between when a node is first available +// for scheduling and when it is actually scheduled. +func (m *MetricsManager) getLatestUpstreamNodeExecution(nodeID string, upstreamNodeIds map[string]*core.ConnectionSet_IdList, + nodeExecutions map[string]*admin.NodeExecution) *admin.NodeExecution { + + var nodeExecution *admin.NodeExecution + var latestUpstreamUpdatedAt = time.Unix(0, 0) + if connectionSet, exists := upstreamNodeIds[nodeID]; exists { + for _, upstreamNodeID := range connectionSet.Ids { + upstreamNodeExecution, exists := nodeExecutions[upstreamNodeID] + if !exists { + continue + } + + t := upstreamNodeExecution.Closure.UpdatedAt.AsTime() + if t.After(latestUpstreamUpdatedAt) { + nodeExecution = upstreamNodeExecution + latestUpstreamUpdatedAt = t + } + } + } + + return nodeExecution +} + +// getNodeExecutions queries the nodeExecutionManager for NodeExecutions adhering to the specified request. +func (m *MetricsManager) getNodeExecutions(ctx context.Context, request admin.NodeExecutionListRequest) (map[string]*admin.NodeExecution, error) { + nodeExecutions := make(map[string]*admin.NodeExecution) + for { + response, err := m.nodeExecutionManager.ListNodeExecutions(ctx, request) + if err != nil { + return nil, err + } + + for _, nodeExecution := range response.NodeExecutions { + nodeExecutions[nodeExecution.Metadata.SpecNodeId] = nodeExecution + } + + if len(response.NodeExecutions) < int(request.Limit) { + break + } + + request.Token = response.Token + } + + return nodeExecutions, nil +} + +// getTaskExecutions queries the taskExecutionManager for TaskExecutions adhering to the specified request. +func (m *MetricsManager) getTaskExecutions(ctx context.Context, request admin.TaskExecutionListRequest) ([]*admin.TaskExecution, error) { + taskExecutions := make([]*admin.TaskExecution, 0) + for { + response, err := m.taskExecutionManager.ListTaskExecutions(ctx, request) + if err != nil { + return nil, err + } + + taskExecutions = append(taskExecutions, response.TaskExecutions...) + + if len(response.TaskExecutions) < int(request.Limit) { + break + } + + request.Token = response.Token + } + + return taskExecutions, nil +} + +// parseBranchNodeExecution partitions the BranchNode execution into a collection of Categorical and Reference Spans +// which are appended to the provided spans argument. +func (m *MetricsManager) parseBranchNodeExecution(ctx context.Context, + nodeExecution *admin.NodeExecution, branchNode *core.BranchNode, spans *[]*core.Span, depth int) error { + + // retrieve node execution(s) + nodeExecutions, err := m.getNodeExecutions(ctx, admin.NodeExecutionListRequest{ + WorkflowExecutionId: nodeExecution.Id.ExecutionId, + Limit: RequestLimit, + UniqueParentId: nodeExecution.Id.NodeId, + }) + if err != nil { + return err + } + + // check if the node started + if len(nodeExecutions) == 0 { + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup)) + } else { + // parse branchNode + if len(nodeExecutions) != 1 { + return fmt.Errorf("invalid branch node execution: expected 1 but found %d node execution(s)", len(nodeExecutions)) + } + + var branchNodeExecution *admin.NodeExecution + for _, e := range nodeExecutions { + branchNodeExecution = e + } + + node := getBranchNode(branchNodeExecution.Metadata.SpecNodeId, branchNode) + if node == nil { + return fmt.Errorf("failed to identify branch node final node definition for nodeID '%s' and branchNode '%+v'", + branchNodeExecution.Metadata.SpecNodeId, branchNode) + } + + // frontend overhead + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, branchNodeExecution.Closure.CreatedAt, nodeSetup)) + + // node execution + nodeExecutionSpan, err := m.parseNodeExecution(ctx, branchNodeExecution, node, depth) + if err != nil { + return err + } + + *spans = append(*spans, nodeExecutionSpan) + + // backened overhead + if !nodeExecution.Closure.UpdatedAt.AsTime().Before(branchNodeExecution.Closure.UpdatedAt.AsTime()) { + *spans = append(*spans, createOperationSpan(branchNodeExecution.Closure.UpdatedAt, + nodeExecution.Closure.UpdatedAt, nodeTeardown)) + } + } + + return nil +} + +// parseDynamicNodeExecution partitions the DynamicNode execution into a collection of Categorical and Reference Spans +// which are appended to the provided spans argument. +func (m *MetricsManager) parseDynamicNodeExecution(ctx context.Context, nodeExecution *admin.NodeExecution, spans *[]*core.Span, depth int) error { + taskExecutions, err := m.getTaskExecutions(ctx, admin.TaskExecutionListRequest{ + NodeExecutionId: nodeExecution.Id, + Limit: RequestLimit, + }) + if err != nil { + return err + } + + // if no task executions then everything is execution overhead + if len(taskExecutions) == 0 { + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup)) + } else { + // frontend overhead + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, taskExecutions[0].Closure.CreatedAt, nodeSetup)) + + // task execution(s) + parseTaskExecutions(taskExecutions, spans, depth) + + nodeExecutions, err := m.getNodeExecutions(ctx, admin.NodeExecutionListRequest{ + WorkflowExecutionId: nodeExecution.Id.ExecutionId, + Limit: RequestLimit, + UniqueParentId: nodeExecution.Id.NodeId, + }) + if err != nil { + return err + } + + lastTask := taskExecutions[len(taskExecutions)-1] + if len(nodeExecutions) == 0 { + if !nodeExecution.Closure.UpdatedAt.AsTime().Before(lastTask.Closure.UpdatedAt.AsTime()) { + *spans = append(*spans, createOperationSpan(lastTask.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeReset)) + } + } else { + // between task execution(s) and node execution(s) overhead + startNode := nodeExecutions[v1alpha1.StartNodeID] + *spans = append(*spans, createOperationSpan(taskExecutions[len(taskExecutions)-1].Closure.UpdatedAt, + startNode.Closure.UpdatedAt, nodeReset)) + + // node execution(s) + getDataRequest := admin.NodeExecutionGetDataRequest{Id: nodeExecution.Id} + nodeExecutionData, err := m.nodeExecutionManager.GetNodeExecutionData(ctx, getDataRequest) + if err != nil { + return err + } + + if err := m.parseNodeExecutions(ctx, nodeExecutions, nodeExecutionData.DynamicWorkflow.CompiledWorkflow, spans, depth); err != nil { + return err + } + + // backened overhead + latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID, + nodeExecutionData.DynamicWorkflow.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions) + if latestUpstreamNode != nil && !nodeExecution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) { + *spans = append(*spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeTeardown)) + } + } + } + + return nil +} + +// parseExecution partitions the workflow execution into a collection of Categorical and Reference Spans which are +// returned as a hierarchical breakdown of the workflow execution. +func (m *MetricsManager) parseExecution(ctx context.Context, execution *admin.Execution, depth int) (*core.Span, error) { + spans := make([]*core.Span, 0) + if depth != 0 { + // retrieve workflow and node executions + workflowRequest := admin.ObjectGetRequest{Id: execution.Closure.WorkflowId} + workflow, err := m.workflowManager.GetWorkflow(ctx, workflowRequest) + if err != nil { + return nil, err + } + + nodeExecutions, err := m.getNodeExecutions(ctx, admin.NodeExecutionListRequest{ + WorkflowExecutionId: execution.Id, + Limit: RequestLimit, + }) + if err != nil { + return nil, err + } + + // check if workflow has started + startNode := nodeExecutions[v1alpha1.StartNodeID] + if startNode.Closure.UpdatedAt == nil || reflect.DeepEqual(startNode.Closure.UpdatedAt, emptyTimestamp) { + spans = append(spans, createOperationSpan(execution.Closure.CreatedAt, execution.Closure.UpdatedAt, workflowSetup)) + } else { + // compute frontend overhead + spans = append(spans, createOperationSpan(execution.Closure.CreatedAt, startNode.Closure.UpdatedAt, workflowSetup)) + + // iterate over nodes and compute overhead + if err := m.parseNodeExecutions(ctx, nodeExecutions, workflow.Closure.CompiledWorkflow, &spans, depth-1); err != nil { + return nil, err + } + + // compute backend overhead + latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID, + workflow.Closure.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions) + if latestUpstreamNode != nil && !execution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) { + spans = append(spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, + execution.Closure.UpdatedAt, workflowTeardown)) + } + } + } + + return &core.Span{ + StartTime: execution.Closure.CreatedAt, + EndTime: execution.Closure.UpdatedAt, + Id: &core.Span_WorkflowId{ + WorkflowId: execution.Id, + }, + Spans: spans, + }, nil +} + +// parseGateNodeExecution partitions the GateNode execution into a collection of Categorical and Reference Spans +// which are appended to the provided spans argument. +func (m *MetricsManager) parseGateNodeExecution(_ context.Context, nodeExecution *admin.NodeExecution, spans *[]*core.Span) { + // check if node has started yet + if nodeExecution.Closure.StartedAt == nil || reflect.DeepEqual(nodeExecution.Closure.StartedAt, emptyTimestamp) { + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup)) + } else { + // frontend overhead + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.StartedAt, nodeSetup)) + + // check if plugin has completed yet + if nodeExecution.Closure.Duration == nil || reflect.DeepEqual(nodeExecution.Closure.Duration, emptyDuration) { + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.StartedAt, + nodeExecution.Closure.UpdatedAt, nodeIdle)) + } else { + // idle time + nodeEndTime := timestamppb.New(nodeExecution.Closure.StartedAt.AsTime().Add(nodeExecution.Closure.Duration.AsDuration())) + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.StartedAt, nodeEndTime, nodeIdle)) + + // backend overhead + *spans = append(*spans, createOperationSpan(nodeEndTime, nodeExecution.Closure.UpdatedAt, nodeTeardown)) + } + } +} + +// parseLaunchPlanNodeExecution partitions the LaunchPlanNode execution into a collection of Categorical and Reference +// Spans which are appended to the provided spans argument. +func (m *MetricsManager) parseLaunchPlanNodeExecution(ctx context.Context, nodeExecution *admin.NodeExecution, spans *[]*core.Span, depth int) error { + // check if workflow started yet + workflowNode := nodeExecution.Closure.GetWorkflowNodeMetadata() + if workflowNode == nil { + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup)) + } else { + // retrieve execution + executionRequest := admin.WorkflowExecutionGetRequest{Id: workflowNode.ExecutionId} + execution, err := m.executionManager.GetExecution(ctx, executionRequest) + if err != nil { + return err + } + + // frontend overhead + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, execution.Closure.CreatedAt, nodeSetup)) + + // execution + span, err := m.parseExecution(ctx, execution, depth) + if err != nil { + return err + } + + *spans = append(*spans, span) + + // backend overhead + if !nodeExecution.Closure.UpdatedAt.AsTime().Before(execution.Closure.UpdatedAt.AsTime()) { + *spans = append(*spans, createOperationSpan(execution.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeTeardown)) + } + } + + return nil +} + +// parseNodeExecution partitions the node execution into a collection of Categorical and Reference Spans which are +// returned as a hierarchical breakdown of the node execution. +func (m *MetricsManager) parseNodeExecution(ctx context.Context, nodeExecution *admin.NodeExecution, node *core.Node, depth int) (*core.Span, error) { + spans := make([]*core.Span, 0) + if depth != 0 { + + // parse node + var err error + switch target := node.Target.(type) { + case *core.Node_BranchNode: + // handle branch node + err = m.parseBranchNodeExecution(ctx, nodeExecution, target.BranchNode, &spans, depth-1) + case *core.Node_GateNode: + // handle gate node + m.parseGateNodeExecution(ctx, nodeExecution, &spans) + case *core.Node_TaskNode: + if nodeExecution.Metadata.IsParentNode { + // handle dynamic node + err = m.parseDynamicNodeExecution(ctx, nodeExecution, &spans, depth-1) + } else { + // handle task node + err = m.parseTaskNodeExecution(ctx, nodeExecution, &spans, depth-1) + } + case *core.Node_WorkflowNode: + switch workflow := target.WorkflowNode.Reference.(type) { + case *core.WorkflowNode_LaunchplanRef: + // handle launch plan + err = m.parseLaunchPlanNodeExecution(ctx, nodeExecution, &spans, depth-1) + case *core.WorkflowNode_SubWorkflowRef: + // handle subworkflow + err = m.parseSubworkflowNodeExecution(ctx, nodeExecution, workflow.SubWorkflowRef, &spans, depth-1) + default: + err = fmt.Errorf("failed to identify workflow node type for node: %+v", target) + } + default: + err = fmt.Errorf("failed to identify node type for node: %+v", target) + } + + if err != nil { + return nil, err + } + } + + return &core.Span{ + StartTime: nodeExecution.Closure.CreatedAt, + EndTime: nodeExecution.Closure.UpdatedAt, + Id: &core.Span_NodeId{ + NodeId: nodeExecution.Id, + }, + Spans: spans, + }, nil +} + +// parseNodeExecutions partitions the node executions into a collection of Categorical and Reference Spans which are +// appended to the provided spans argument. +func (m *MetricsManager) parseNodeExecutions(ctx context.Context, nodeExecutions map[string]*admin.NodeExecution, + compiledWorkflowClosure *core.CompiledWorkflowClosure, spans *[]*core.Span, depth int) error { + + // sort node executions + sortedNodeExecutions := make([]*admin.NodeExecution, 0, len(nodeExecutions)) + for _, nodeExecution := range nodeExecutions { + sortedNodeExecutions = append(sortedNodeExecutions, nodeExecution) + } + sort.Slice(sortedNodeExecutions, func(i, j int) bool { + x := sortedNodeExecutions[i].Closure.CreatedAt.AsTime() + y := sortedNodeExecutions[j].Closure.CreatedAt.AsTime() + return x.Before(y) + }) + + // iterate over sorted node executions + for _, nodeExecution := range sortedNodeExecutions { + specNodeID := nodeExecution.Metadata.SpecNodeId + if specNodeID == v1alpha1.StartNodeID || specNodeID == v1alpha1.EndNodeID { + continue + } + + // get node definition from workflow + var node *core.Node + for _, n := range compiledWorkflowClosure.Primary.Template.Nodes { + if n.Id == specNodeID { + node = n + } + } + + if node == nil { + return fmt.Errorf("failed to discover workflow node '%s' in workflow '%+v'", + specNodeID, compiledWorkflowClosure.Primary.Template.Id) + } + + // parse node execution + nodeExecutionSpan, err := m.parseNodeExecution(ctx, nodeExecution, node, depth) + if err != nil { + return err + } + + // prepend nodeExecution spans with node transition time + latestUpstreamNode := m.getLatestUpstreamNodeExecution(specNodeID, + compiledWorkflowClosure.Primary.Connections.Upstream, nodeExecutions) + if latestUpstreamNode != nil { + nodeExecutionSpan.Spans = append([]*core.Span{createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, + nodeExecution.Closure.CreatedAt, nodeTransition)}, nodeExecutionSpan.Spans...) + } + + *spans = append(*spans, nodeExecutionSpan) + } + + return nil +} + +// parseSubworkflowNodeExecutions partitions the SubworkflowNode execution into a collection of Categorical and +// Reference Spans which are appended to the provided spans argument. +func (m *MetricsManager) parseSubworkflowNodeExecution(ctx context.Context, + nodeExecution *admin.NodeExecution, identifier *core.Identifier, spans *[]*core.Span, depth int) error { + + // retrieve node execution(s) + nodeExecutions, err := m.getNodeExecutions(ctx, admin.NodeExecutionListRequest{ + WorkflowExecutionId: nodeExecution.Id.ExecutionId, + Limit: RequestLimit, + UniqueParentId: nodeExecution.Id.NodeId, + }) + if err != nil { + return err + } + + // check if the subworkflow started + if len(nodeExecutions) == 0 { + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup)) + } else { + // frontend overhead + startNode := nodeExecutions[v1alpha1.StartNodeID] + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, startNode.Closure.UpdatedAt, nodeSetup)) + + // retrieve workflow + workflowRequest := admin.ObjectGetRequest{Id: identifier} + workflow, err := m.workflowManager.GetWorkflow(ctx, workflowRequest) + if err != nil { + return err + } + + // node execution(s) + if err := m.parseNodeExecutions(ctx, nodeExecutions, workflow.Closure.CompiledWorkflow, spans, depth); err != nil { + return err + } + + // backened overhead + latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID, + workflow.Closure.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions) + if latestUpstreamNode != nil && !nodeExecution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) { + *spans = append(*spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeTeardown)) + } + } + + return nil +} + +// parseTaskExecution partitions the task execution into a collection of Categorical and Reference Spans which are +// returned as a hierarchical breakdown of the task execution. +func parseTaskExecution(taskExecution *admin.TaskExecution) *core.Span { + spans := make([]*core.Span, 0) + + // check if plugin has started yet + if taskExecution.Closure.StartedAt == nil || reflect.DeepEqual(taskExecution.Closure.StartedAt, emptyTimestamp) { + spans = append(spans, createOperationSpan(taskExecution.Closure.CreatedAt, taskExecution.Closure.UpdatedAt, taskSetup)) + } else { + // frontend overhead + spans = append(spans, createOperationSpan(taskExecution.Closure.CreatedAt, taskExecution.Closure.StartedAt, taskSetup)) + + // check if plugin has completed yet + if taskExecution.Closure.Duration == nil || reflect.DeepEqual(taskExecution.Closure.Duration, emptyDuration) { + spans = append(spans, createOperationSpan(taskExecution.Closure.StartedAt, taskExecution.Closure.UpdatedAt, taskRuntime)) + } else { + // plugin execution + taskEndTime := timestamppb.New(taskExecution.Closure.StartedAt.AsTime().Add(taskExecution.Closure.Duration.AsDuration())) + spans = append(spans, createOperationSpan(taskExecution.Closure.StartedAt, taskEndTime, taskRuntime)) + + // backend overhead + if !taskExecution.Closure.UpdatedAt.AsTime().Before(taskEndTime.AsTime()) { + spans = append(spans, createOperationSpan(taskEndTime, taskExecution.Closure.UpdatedAt, taskTeardown)) + } + } + } + + return &core.Span{ + StartTime: taskExecution.Closure.CreatedAt, + EndTime: taskExecution.Closure.UpdatedAt, + Id: &core.Span_TaskId{ + TaskId: taskExecution.Id, + }, + Spans: spans, + } +} + +// parseTaskExecutions partitions the task executions into a collection of Categorical and Reference Spans which are +// appended to the provided spans argument. +func parseTaskExecutions(taskExecutions []*admin.TaskExecution, spans *[]*core.Span, depth int) { + // sort task executions + sort.Slice(taskExecutions, func(i, j int) bool { + x := taskExecutions[i].Closure.CreatedAt.AsTime() + y := taskExecutions[j].Closure.CreatedAt.AsTime() + return x.Before(y) + }) + + // iterate over task executions + for index, taskExecution := range taskExecutions { + if index > 0 { + *spans = append(*spans, createOperationSpan(taskExecutions[index-1].Closure.UpdatedAt, taskExecution.Closure.CreatedAt, nodeReset)) + } + + if depth != 0 { + *spans = append(*spans, parseTaskExecution(taskExecution)) + } + } +} + +// parseTaskNodeExecutions partitions the TaskNode execution into a collection of Categorical and Reference Spans which +// are appended to the provided spans argument. +func (m *MetricsManager) parseTaskNodeExecution(ctx context.Context, nodeExecution *admin.NodeExecution, spans *[]*core.Span, depth int) error { + // retrieve task executions + taskExecutions, err := m.getTaskExecutions(ctx, admin.TaskExecutionListRequest{ + NodeExecutionId: nodeExecution.Id, + Limit: RequestLimit, + }) + if err != nil { + return err + } + + // if no task executions then everything is execution overhead + if len(taskExecutions) == 0 { + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup)) + } else { + // frontend overhead + *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, taskExecutions[0].Closure.CreatedAt, nodeSetup)) + + // parse task executions + parseTaskExecutions(taskExecutions, spans, depth) + + // backend overhead + lastTask := taskExecutions[len(taskExecutions)-1] + if !nodeExecution.Closure.UpdatedAt.AsTime().Before(lastTask.Closure.UpdatedAt.AsTime()) { + *spans = append(*spans, createOperationSpan(taskExecutions[len(taskExecutions)-1].Closure.UpdatedAt, + nodeExecution.Closure.UpdatedAt, nodeTeardown)) + } + } + + return nil +} + +// GetExecutionMetrics returns a Span hierarchically breaking down the workflow execution into a collection of +// Categorical and Reference Spans. +func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, + request admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { + + // retrieve workflow execution + executionRequest := admin.WorkflowExecutionGetRequest{Id: request.Id} + execution, err := m.executionManager.GetExecution(ctx, executionRequest) + if err != nil { + return nil, err + } + + span, err := m.parseExecution(ctx, execution, int(request.Depth)) + if err != nil { + return nil, err + } + + return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil +} + +// NewMetricsManager returns a new MetricsManager constructed with the provided arguments. +func NewMetricsManager( + workflowManager interfaces.WorkflowInterface, + executionManager interfaces.ExecutionInterface, + nodeExecutionManager interfaces.NodeExecutionInterface, + taskExecutionManager interfaces.TaskExecutionInterface, + scope promutils.Scope) interfaces.MetricsInterface { + metrics := metrics{ + Scope: scope, + } + + return &MetricsManager{ + workflowManager: workflowManager, + executionManager: executionManager, + nodeExecutionManager: nodeExecutionManager, + taskExecutionManager: taskExecutionManager, + metrics: metrics, + } +} diff --git a/pkg/manager/impl/metrics_manager_test.go b/pkg/manager/impl/metrics_manager_test.go new file mode 100644 index 0000000000..2958285b8b --- /dev/null +++ b/pkg/manager/impl/metrics_manager_test.go @@ -0,0 +1,1082 @@ +package impl + +import ( + "context" + "reflect" + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + + "github.com/flyteorg/flyteadmin/pkg/manager/interfaces" + "github.com/flyteorg/flyteadmin/pkg/manager/mocks" + + "github.com/golang/protobuf/ptypes/duration" + "github.com/golang/protobuf/ptypes/timestamp" + + "github.com/stretchr/testify/assert" +) + +var ( + baseDuration = &duration.Duration{ + Seconds: 400, + Nanos: 0, + } + baseTimestamp = ×tamp.Timestamp{ + Seconds: 643852800, + Nanos: 0, + } +) + +func addTimestamp(ts *timestamp.Timestamp, seconds int64) *timestamp.Timestamp { + return ×tamp.Timestamp{ + Seconds: ts.Seconds + seconds, + Nanos: ts.Nanos, + } +} + +func getMockExecutionManager(execution *admin.Execution) interfaces.ExecutionInterface { + mockExecutionManager := mocks.MockExecutionManager{} + mockExecutionManager.SetGetCallback( + func(ctx context.Context, request admin.WorkflowExecutionGetRequest) (*admin.Execution, error) { + return execution, nil + }) + + return &mockExecutionManager +} + +func getMockNodeExecutionManager(nodeExecutions []*admin.NodeExecution, + dynamicWorkflow *admin.DynamicWorkflowNodeMetadata) interfaces.NodeExecutionInterface { + + mockNodeExecutionManager := mocks.MockNodeExecutionManager{} + mockNodeExecutionManager.SetListNodeExecutionsFunc( + func(ctx context.Context, request admin.NodeExecutionListRequest) (*admin.NodeExecutionList, error) { + return &admin.NodeExecutionList{ + NodeExecutions: nodeExecutions, + }, nil + }) + mockNodeExecutionManager.SetGetNodeExecutionDataFunc( + func(ctx context.Context, request admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error) { + return &admin.NodeExecutionGetDataResponse{ + DynamicWorkflow: dynamicWorkflow, + }, nil + }) + + return &mockNodeExecutionManager +} + +func getMockTaskExecutionManager(taskExecutions []*admin.TaskExecution) interfaces.TaskExecutionInterface { + mockTaskExecutionManager := mocks.MockTaskExecutionManager{} + mockTaskExecutionManager.SetListTaskExecutionsCallback( + func(ctx context.Context, request admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) { + return &admin.TaskExecutionList{ + TaskExecutions: taskExecutions, + }, nil + }) + + return &mockTaskExecutionManager +} + +func getMockWorkflowManager(workflow *admin.Workflow) interfaces.WorkflowInterface { + mockWorkflowManager := mocks.MockWorkflowManager{} + mockWorkflowManager.SetGetCallback( + func(ctx context.Context, request admin.ObjectGetRequest) (*admin.Workflow, error) { + return workflow, nil + }) + + return &mockWorkflowManager +} + +func parseSpans(spans []*core.Span) (map[string][]int64, int) { + operationDurations := make(map[string][]int64) + referenceCount := 0 + for _, span := range spans { + switch id := span.Id.(type) { + case *core.Span_OperationId: + operationID := id.OperationId + duration := span.EndTime.Seconds - span.StartTime.Seconds + if array, exists := operationDurations[operationID]; exists { + operationDurations[operationID] = append(array, duration) + } else { + operationDurations[operationID] = []int64{duration} + } + default: + referenceCount++ + } + } + + return operationDurations, referenceCount +} + +func TestParseBranchNodeExecution(t *testing.T) { + tests := []struct { + name string + nodeExecution *admin.NodeExecution + nodeExecutions []*admin.NodeExecution + operationDurations map[string][]int64 + referenceCount int + }{ + { + "NotStarted", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 5), + }, + }, + nil, + map[string][]int64{ + nodeSetup: []int64{5}, + }, + 0, + }, + { + "Running", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: baseTimestamp, + }, + }, + []*admin.NodeExecution{ + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "foo", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 430), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + }, + 1, + }, + { + "Completed", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 450), + }, + }, + []*admin.NodeExecution{ + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "foo", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 430), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeTeardown: []int64{20}, + }, + 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // initialize mocks + mockNodeExecutionManager := getMockNodeExecutionManager(test.nodeExecutions, nil) + mockTaskExecutionManager := getMockTaskExecutionManager([]*admin.TaskExecution{}) + metricsManager := MetricsManager{ + nodeExecutionManager: mockNodeExecutionManager, + taskExecutionManager: mockTaskExecutionManager, + } + + // parse node execution + branchNode := &core.BranchNode{ + IfElse: &core.IfElseBlock{ + Case: &core.IfBlock{ + ThenNode: &core.Node{ + Id: "bar", + }, + }, + Other: []*core.IfBlock{ + &core.IfBlock{ + ThenNode: &core.Node{ + Id: "baz", + }, + }, + }, + Default: &core.IfElseBlock_ElseNode{ + ElseNode: &core.Node{ + Id: "foo", + Target: &core.Node_TaskNode{}, + }, + }, + }, + } + + spans := make([]*core.Span, 0) + err := metricsManager.parseBranchNodeExecution(context.TODO(), test.nodeExecution, branchNode, &spans, -1) + assert.Nil(t, err) + + // validate spans + operationDurations, referenceCount := parseSpans(spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + assert.Equal(t, test.referenceCount, referenceCount) + }) + } +} + +func TestParseDynamicNodeExecution(t *testing.T) { + tests := []struct { + name string + nodeExecution *admin.NodeExecution + taskExecutions []*admin.TaskExecution + nodeExecutions []*admin.NodeExecution + operationDurations map[string][]int64 + referenceCount int + }{ + { + "NotStarted", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 5), + }, + }, + nil, + nil, + map[string][]int64{ + nodeSetup: []int64{5}, + }, + 0, + }, + { + "TaskRunning", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: baseTimestamp, + }, + }, + []*admin.TaskExecution{ + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 420), + }, + }, + }, + nil, + map[string][]int64{ + nodeSetup: []int64{10}, + }, + 1, + }, + { + "NodesRunning", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: baseTimestamp, + }, + }, + []*admin.TaskExecution{ + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 420), + }, + }, + }, + []*admin.NodeExecution{ + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "start-node", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 435), + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 435), + }, + }, + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "foo", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 445), + StartedAt: addTimestamp(baseTimestamp, 460), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 880), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeReset: []int64{15}, + }, + 2, + }, + { + "Completed", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 900), + }, + }, + []*admin.TaskExecution{ + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 420), + }, + }, + }, + []*admin.NodeExecution{ + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "start-node", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 435), + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 435), + }, + }, + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "foo", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 445), + StartedAt: addTimestamp(baseTimestamp, 460), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 880), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeReset: []int64{15}, + nodeTeardown: []int64{20}, + }, + 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // initialize mocks + mockNodeExecutionManager := getMockNodeExecutionManager( + test.nodeExecutions, + &admin.DynamicWorkflowNodeMetadata{ + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Connections: &core.ConnectionSet{ + Upstream: map[string]*core.ConnectionSet_IdList{ + "foo": &core.ConnectionSet_IdList{ + Ids: []string{"start-node"}, + }, + "end-node": &core.ConnectionSet_IdList{ + Ids: []string{"foo"}, + }, + }, + }, + Template: &core.WorkflowTemplate{ + Nodes: []*core.Node{ + &core.Node{ + Id: "foo", + Target: &core.Node_TaskNode{}, + }, + }, + }, + }, + }, + }) + mockTaskExecutionManager := getMockTaskExecutionManager(test.taskExecutions) + metricsManager := MetricsManager{ + nodeExecutionManager: mockNodeExecutionManager, + taskExecutionManager: mockTaskExecutionManager, + } + + // parse node execution + spans := make([]*core.Span, 0) + err := metricsManager.parseDynamicNodeExecution(context.TODO(), test.nodeExecution, &spans, -1) + assert.Nil(t, err) + + // validate spans + operationDurations, referenceCount := parseSpans(spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + assert.Equal(t, test.referenceCount, referenceCount) + }) + } +} + +func TestParseGateNodeExecution(t *testing.T) { + tests := []struct { + name string + nodeExecution *admin.NodeExecution + operationDurations map[string][]int64 + }{ + { + "NotStarted", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 5), + }, + }, + map[string][]int64{ + nodeSetup: []int64{5}, + }, + }, + { + "Running", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: addTimestamp(baseTimestamp, 10), + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 15), + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeIdle: []int64{5}, + }, + }, + { + "Completed", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: addTimestamp(baseTimestamp, 10), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 425), + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeIdle: []int64{400}, + nodeTeardown: []int64{15}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // initialize mocks + metricsManager := MetricsManager{} + + // parse node execution + spans := make([]*core.Span, 0) + metricsManager.parseGateNodeExecution(context.TODO(), test.nodeExecution, &spans) + + // validate spans + operationDurations, _ := parseSpans(spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + }) + } +} + +func TestParseLaunchPlanNodeExecution(t *testing.T) { + tests := []struct { + name string + nodeExecution *admin.NodeExecution + execution *admin.Execution + operationDurations map[string][]int64 + referenceCount int + }{ + { + "NotStarted", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 5), + }, + }, + nil, + map[string][]int64{ + nodeSetup: []int64{5}, + }, + 0, + }, + { + "Running", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: baseTimestamp, + TargetMetadata: &admin.NodeExecutionClosure_WorkflowNodeMetadata{ + WorkflowNodeMetadata: &admin.WorkflowNodeMetadata{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + }, + }, + }, + &admin.Execution{ + Closure: &admin.ExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 15), + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + }, + 1, + }, + { + "Completed", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 440), + TargetMetadata: &admin.NodeExecutionClosure_WorkflowNodeMetadata{ + WorkflowNodeMetadata: &admin.WorkflowNodeMetadata{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + }, + }, + }, + &admin.Execution{ + Closure: &admin.ExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 425), + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeTeardown: []int64{15}, + }, + 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // initialize mocks + mockExecutionManager := getMockExecutionManager(test.execution) + mockNodeExecutionManager := getMockNodeExecutionManager( + []*admin.NodeExecution{ + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "start-node", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 10), + }, + }, + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "foo", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 15), + StartedAt: addTimestamp(baseTimestamp, 20), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 435), + }, + }, + }, nil) + mockTaskExecutionManager := getMockTaskExecutionManager([]*admin.TaskExecution{}) + mockWorkflowManager := getMockWorkflowManager( + &admin.Workflow{ + Closure: &admin.WorkflowClosure{ + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Connections: &core.ConnectionSet{ + Upstream: map[string]*core.ConnectionSet_IdList{ + "foo": &core.ConnectionSet_IdList{ + Ids: []string{"start-node"}, + }, + "end-node": &core.ConnectionSet_IdList{ + Ids: []string{"foo"}, + }, + }, + }, + Template: &core.WorkflowTemplate{ + Nodes: []*core.Node{ + &core.Node{ + Id: "foo", + Target: &core.Node_TaskNode{}, + }, + }, + }, + }, + }, + }, + }) + metricsManager := MetricsManager{ + executionManager: mockExecutionManager, + nodeExecutionManager: mockNodeExecutionManager, + taskExecutionManager: mockTaskExecutionManager, + workflowManager: mockWorkflowManager, + } + + // parse node execution + spans := make([]*core.Span, 0) + err := metricsManager.parseLaunchPlanNodeExecution(context.TODO(), test.nodeExecution, &spans, -1) + assert.Nil(t, err) + + // validate spans + operationDurations, referenceCount := parseSpans(spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + assert.Equal(t, test.referenceCount, referenceCount) + }) + } +} + +func TestParseSubworkflowNodeExecution(t *testing.T) { + tests := []struct { + name string + nodeExecution *admin.NodeExecution + nodeExecutions []*admin.NodeExecution + operationDurations map[string][]int64 + referenceCount int + }{ + { + "NotStarted", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 5), + }, + }, + nil, + map[string][]int64{ + nodeSetup: []int64{5}, + }, + 0, + }, + { + "Running", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: baseTimestamp, + }, + }, + []*admin.NodeExecution{ + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "start-node", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 10), + }, + }, + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "foo", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 15), + StartedAt: addTimestamp(baseTimestamp, 20), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 435), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + }, + 1, + }, + { + "Completed", + &admin.NodeExecution{ + Id: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{}, + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 455), + }, + }, + []*admin.NodeExecution{ + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "start-node", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 10), + }, + }, + &admin.NodeExecution{ + Metadata: &admin.NodeExecutionMetaData{ + SpecNodeId: "foo", + }, + Closure: &admin.NodeExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 15), + StartedAt: addTimestamp(baseTimestamp, 20), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 435), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeTeardown: []int64{20}, + }, + 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // initialize mocks + mockNodeExecutionManager := getMockNodeExecutionManager(test.nodeExecutions, nil) + mockTaskExecutionManager := getMockTaskExecutionManager([]*admin.TaskExecution{}) + mockWorkflowManager := getMockWorkflowManager( + &admin.Workflow{ + Closure: &admin.WorkflowClosure{ + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Connections: &core.ConnectionSet{ + Upstream: map[string]*core.ConnectionSet_IdList{ + "foo": &core.ConnectionSet_IdList{ + Ids: []string{"start-node"}, + }, + "end-node": &core.ConnectionSet_IdList{ + Ids: []string{"foo"}, + }, + }, + }, + Template: &core.WorkflowTemplate{ + Nodes: []*core.Node{ + &core.Node{ + Id: "foo", + Target: &core.Node_TaskNode{}, + }, + }, + }, + }, + }, + }, + }) + metricsManager := MetricsManager{ + nodeExecutionManager: mockNodeExecutionManager, + taskExecutionManager: mockTaskExecutionManager, + workflowManager: mockWorkflowManager, + } + + // parse node execution + spans := make([]*core.Span, 0) + err := metricsManager.parseSubworkflowNodeExecution(context.TODO(), test.nodeExecution, &core.Identifier{}, &spans, -1) + assert.Nil(t, err) + + // validate spans + operationDurations, referenceCount := parseSpans(spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + assert.Equal(t, test.referenceCount, referenceCount) + }) + } +} + +func TestParseTaskExecution(t *testing.T) { + tests := []struct { + name string + taskExecution *admin.TaskExecution + operationDurations map[string][]int64 + }{ + { + "NotStarted", + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 5), + }, + }, + map[string][]int64{ + taskSetup: []int64{5}, + }, + }, + { + "Running", + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: addTimestamp(baseTimestamp, 5), + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 605), + }, + }, + map[string][]int64{ + taskSetup: []int64{5}, + taskRuntime: []int64{600}, + }, + }, + { + "Completed", + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: addTimestamp(baseTimestamp, 5), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 415), + }, + }, + map[string][]int64{ + taskSetup: []int64{5}, + taskRuntime: []int64{400}, + taskTeardown: []int64{10}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // parse task execution + span := parseTaskExecution(test.taskExecution) + _, ok := span.Id.(*core.Span_TaskId) + assert.True(t, ok) + + // validate spans + operationDurations, referenceCount := parseSpans(span.Spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + assert.Equal(t, 0, referenceCount) + }) + } +} + +func TestParseTaskExecutions(t *testing.T) { + tests := []struct { + name string + taskExecutions []*admin.TaskExecution + operationDurations map[string][]int64 + referenceCount int + }{ + { + "SingleAttempt", + []*admin.TaskExecution{ + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: addTimestamp(baseTimestamp, 5), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 415), + }, + }, + }, + map[string][]int64{}, + 1, + }, + { + "MultipleAttempts", + []*admin.TaskExecution{ + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: addTimestamp(baseTimestamp, 5), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 605), + }, + }, + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 625), + StartedAt: addTimestamp(baseTimestamp, 630), + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 630), + }, + }, + }, + map[string][]int64{ + nodeReset: []int64{20}, + }, + 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // parse task executions + spans := make([]*core.Span, 0) + parseTaskExecutions(test.taskExecutions, &spans, -1) + + // validate spans + operationDurations, referenceCount := parseSpans(spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + assert.Equal(t, test.referenceCount, referenceCount) + }) + } +} + +func TestParseTaskNodeExecution(t *testing.T) { + tests := []struct { + name string + nodeExecution *admin.NodeExecution + taskExecutions []*admin.TaskExecution + operationDurations map[string][]int64 + referenceCount int + }{ + { + "NotStarted", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 5), + }, + }, + nil, + map[string][]int64{ + nodeSetup: []int64{5}, + }, + 0, + }, + { + "Running", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 10), + }, + }, + []*admin.TaskExecution{ + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 420), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + }, + 1, + }, + { + "Completed", + &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + CreatedAt: baseTimestamp, + StartedAt: emptyTimestamp, + Duration: emptyDuration, + UpdatedAt: addTimestamp(baseTimestamp, 435), + }, + }, + []*admin.TaskExecution{ + &admin.TaskExecution{ + Closure: &admin.TaskExecutionClosure{ + CreatedAt: addTimestamp(baseTimestamp, 10), + StartedAt: addTimestamp(baseTimestamp, 15), + Duration: baseDuration, + UpdatedAt: addTimestamp(baseTimestamp, 420), + }, + }, + }, + map[string][]int64{ + nodeSetup: []int64{10}, + nodeTeardown: []int64{15}, + }, + 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // initialize mocks + mockTaskExecutionManager := getMockTaskExecutionManager(test.taskExecutions) + metricsManager := MetricsManager{ + taskExecutionManager: mockTaskExecutionManager, + } + + // parse node execution + spans := make([]*core.Span, 0) + err := metricsManager.parseTaskNodeExecution(context.TODO(), test.nodeExecution, &spans, -1) + assert.Nil(t, err) + + // validate spans + operationDurations, referenceCount := parseSpans(spans) + assert.True(t, reflect.DeepEqual(test.operationDurations, operationDurations)) + assert.Equal(t, test.referenceCount, referenceCount) + }) + } +} diff --git a/pkg/manager/interfaces/metrics.go b/pkg/manager/interfaces/metrics.go new file mode 100644 index 0000000000..d726cdc997 --- /dev/null +++ b/pkg/manager/interfaces/metrics.go @@ -0,0 +1,15 @@ +package interfaces + +import ( + "context" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" +) + +//go:generate mockery -name=MetricsInterface -output=../mocks -case=underscore + +// Interface for managing Flyte execution metrics +type MetricsInterface interface { + GetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) ( + *admin.WorkflowExecutionGetMetricsResponse, error) +} diff --git a/pkg/manager/mocks/metrics_interface.go b/pkg/manager/mocks/metrics_interface.go new file mode 100644 index 0000000000..2e292593ed --- /dev/null +++ b/pkg/manager/mocks/metrics_interface.go @@ -0,0 +1,57 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + admin "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + + mock "github.com/stretchr/testify/mock" +) + +// MetricsInterface is an autogenerated mock type for the MetricsInterface type +type MetricsInterface struct { + mock.Mock +} + +type MetricsInterface_GetExecutionMetrics struct { + *mock.Call +} + +func (_m MetricsInterface_GetExecutionMetrics) Return(_a0 *admin.WorkflowExecutionGetMetricsResponse, _a1 error) *MetricsInterface_GetExecutionMetrics { + return &MetricsInterface_GetExecutionMetrics{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *MetricsInterface) OnGetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) *MetricsInterface_GetExecutionMetrics { + c_call := _m.On("GetExecutionMetrics", ctx, request) + return &MetricsInterface_GetExecutionMetrics{Call: c_call} +} + +func (_m *MetricsInterface) OnGetExecutionMetricsMatch(matchers ...interface{}) *MetricsInterface_GetExecutionMetrics { + c_call := _m.On("GetExecutionMetrics", matchers...) + return &MetricsInterface_GetExecutionMetrics{Call: c_call} +} + +// GetExecutionMetrics provides a mock function with given fields: ctx, request +func (_m *MetricsInterface) GetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { + ret := _m.Called(ctx, request) + + var r0 *admin.WorkflowExecutionGetMetricsResponse + if rf, ok := ret.Get(0).(func(context.Context, admin.WorkflowExecutionGetMetricsRequest) *admin.WorkflowExecutionGetMetricsResponse); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*admin.WorkflowExecutionGetMetricsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, admin.WorkflowExecutionGetMetricsRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/manager/mocks/workflow.go b/pkg/manager/mocks/workflow.go index d1f55750eb..5055b2b4c7 100644 --- a/pkg/manager/mocks/workflow.go +++ b/pkg/manager/mocks/workflow.go @@ -7,9 +7,11 @@ import ( ) type CreateWorkflowFunc func(ctx context.Context, request admin.WorkflowCreateRequest) (*admin.WorkflowCreateResponse, error) +type GetWorkflowFunc func(ctx context.Context, request admin.ObjectGetRequest) (*admin.Workflow, error) type MockWorkflowManager struct { createWorkflowFunc CreateWorkflowFunc + getWorkflowFunc GetWorkflowFunc } func (r *MockWorkflowManager) SetCreateCallback(createFunction CreateWorkflowFunc) { @@ -30,8 +32,15 @@ func (r *MockWorkflowManager) ListWorkflows(ctx context.Context, return nil, nil } +func (r *MockWorkflowManager) SetGetCallback(getFunction GetWorkflowFunc) { + r.getWorkflowFunc = getFunction +} + func (r *MockWorkflowManager) GetWorkflow( ctx context.Context, request admin.ObjectGetRequest) (*admin.Workflow, error) { + if r.getWorkflowFunc != nil { + return r.getWorkflowFunc(ctx, request) + } return nil, nil } diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index c3cc62213a..a427412e39 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -115,10 +115,15 @@ func CreateNodeExecutionModel(ctx context.Context, input ToNodeExecutionModelInp Phase: input.Request.Event.Phase.String(), } + reportedAt := input.Request.Event.ReportedAt + if reportedAt == nil || (reportedAt.Seconds == 0 && reportedAt.Nanos == 0) { + reportedAt = input.Request.Event.OccurredAt + } + closure := admin.NodeExecutionClosure{ Phase: input.Request.Event.Phase, CreatedAt: input.Request.Event.OccurredAt, - UpdatedAt: input.Request.Event.OccurredAt, + UpdatedAt: reportedAt, } nodeExecutionMetadata := admin.NodeExecutionMetaData{ @@ -161,7 +166,11 @@ func CreateNodeExecutionModel(ctx context.Context, input ToNodeExecutionModelInp return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read event timestamp") } nodeExecution.NodeExecutionCreatedAt = &nodeExecutionCreatedAt - nodeExecution.NodeExecutionUpdatedAt = &nodeExecutionCreatedAt + nodeExecutionUpdatedAt, err := ptypes.Timestamp(reportedAt) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read event reported_at timestamp") + } + nodeExecution.NodeExecutionUpdatedAt = &nodeExecutionUpdatedAt if input.Request.Event.ParentTaskMetadata != nil { nodeExecution.ParentTaskExecutionID = input.ParentTaskExecutionID } @@ -195,7 +204,11 @@ func UpdateNodeExecutionModel( } nodeExecutionModel.Phase = request.Event.Phase.String() nodeExecutionClosure.Phase = request.Event.Phase - nodeExecutionClosure.UpdatedAt = request.Event.OccurredAt + reportedAt := request.Event.ReportedAt + if reportedAt == nil || (reportedAt.Seconds == 0 && reportedAt.Nanos == 0) { + reportedAt = request.Event.OccurredAt + } + nodeExecutionClosure.UpdatedAt = reportedAt if request.Event.Phase == core.NodeExecution_RUNNING { err := addNodeRunningState(request, nodeExecutionModel, &nodeExecutionClosure) @@ -248,7 +261,7 @@ func UpdateNodeExecutionModel( } nodeExecutionModel.Closure = marshaledClosure - updatedAt, err := ptypes.Timestamp(request.Event.OccurredAt) + updatedAt, err := ptypes.Timestamp(reportedAt) if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to parse updated at timestamp") } diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index f335cc7ea5..e3eca3884f 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -139,9 +139,14 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode }) } + reportedAt := input.Request.Event.ReportedAt + if reportedAt == nil || (reportedAt.Seconds == 0 && reportedAt.Nanos == 0) { + reportedAt = input.Request.Event.OccurredAt + } + closure := &admin.TaskExecutionClosure{ Phase: input.Request.Event.Phase, - UpdatedAt: input.Request.Event.OccurredAt, + UpdatedAt: reportedAt, CreatedAt: input.Request.Event.OccurredAt, Logs: input.Request.Event.Logs, CustomInfo: input.Request.Event.CustomInfo, @@ -190,7 +195,11 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read event timestamp") } taskExecution.TaskExecutionCreatedAt = &taskExecutionCreatedAt - taskExecution.TaskExecutionUpdatedAt = &taskExecutionCreatedAt + taskExecutionUpdatedAt, err := ptypes.Timestamp(reportedAt) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to read event reported_at timestamp") + } + taskExecution.TaskExecutionUpdatedAt = &taskExecutionUpdatedAt return taskExecution, nil } @@ -368,7 +377,11 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE taskExecutionModel.Phase = request.Event.Phase.String() taskExecutionModel.PhaseVersion = request.Event.PhaseVersion taskExecutionClosure.Phase = request.Event.Phase - taskExecutionClosure.UpdatedAt = request.Event.OccurredAt + reportedAt := request.Event.ReportedAt + if reportedAt == nil || (reportedAt.Seconds == 0 && reportedAt.Nanos == 0) { + reportedAt = request.Event.OccurredAt + } + taskExecutionClosure.UpdatedAt = reportedAt taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) if len(request.Event.Reason) > 0 { if taskExecutionClosure.Reason != request.Event.Reason { @@ -412,7 +425,7 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE codes.Internal, "failed to marshal task execution closure with error: %v", err) } taskExecutionModel.Closure = marshaledClosure - updatedAt, err := ptypes.Timestamp(request.Event.OccurredAt) + updatedAt, err := ptypes.Timestamp(reportedAt) if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to parse updated at timestamp") } diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 12b9f4ce54..77c78f480e 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -45,6 +45,7 @@ type AdminService struct { NamedEntityManager interfaces.NamedEntityInterface VersionManager interfaces.VersionInterface DescriptionEntityManager interfaces.DescriptionEntityInterface + MetricsManager interfaces.MetricsInterface Metrics AdminMetrics } @@ -157,6 +158,11 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi nodeExecutionEventWriter.Run() }() + nodeExecutionManager := manager.NewNodeExecutionManager(repo, configuration, applicationConfiguration.GetMetadataStoragePrefix(), dataStorageClient, + adminScope.NewSubScope("node_execution_manager"), urlData, eventPublisher, cloudEventPublisher, nodeExecutionEventWriter) + taskExecutionManager := manager.NewTaskExecutionManager(repo, configuration, dataStorageClient, + adminScope.NewSubScope("task_execution_manager"), urlData, eventPublisher, cloudEventPublisher) + logger.Info(ctx, "Initializing a new AdminService") return &AdminService{ TaskManager: manager.NewTaskManager(repo, configuration, workflowengineImpl.NewCompiler(), @@ -167,12 +173,12 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi NamedEntityManager: namedEntityManager, DescriptionEntityManager: descriptionEntityManager, VersionManager: versionManager, - NodeExecutionManager: manager.NewNodeExecutionManager(repo, configuration, applicationConfiguration.GetMetadataStoragePrefix(), dataStorageClient, - adminScope.NewSubScope("node_execution_manager"), urlData, eventPublisher, cloudEventPublisher, nodeExecutionEventWriter), - TaskExecutionManager: manager.NewTaskExecutionManager(repo, configuration, dataStorageClient, - adminScope.NewSubScope("task_execution_manager"), urlData, eventPublisher, cloudEventPublisher), - ProjectManager: manager.NewProjectManager(repo, configuration), - ResourceManager: resources.NewResourceManager(repo, configuration.ApplicationConfiguration()), - Metrics: InitMetrics(adminScope), + NodeExecutionManager: nodeExecutionManager, + TaskExecutionManager: taskExecutionManager, + ProjectManager: manager.NewProjectManager(repo, configuration), + ResourceManager: resources.NewResourceManager(repo, configuration.ApplicationConfiguration()), + MetricsManager: manager.NewMetricsManager(workflowManager, executionManager, nodeExecutionManager, + taskExecutionManager, adminScope.NewSubScope("metrics_manager")), + Metrics: InitMetrics(adminScope), } } diff --git a/pkg/rpc/adminservice/execution.go b/pkg/rpc/adminservice/execution.go index 57680a58f5..87e2fe4873 100644 --- a/pkg/rpc/adminservice/execution.go +++ b/pkg/rpc/adminservice/execution.go @@ -131,7 +131,7 @@ func (m *AdminService) GetExecutionData( } var response *admin.WorkflowExecutionGetDataResponse var err error - m.Metrics.executionEndpointMetrics.get.Time(func() { + m.Metrics.executionEndpointMetrics.getData.Time(func() { response, err = m.ExecutionManager.GetExecutionData(ctx, *request) }) if err != nil { @@ -141,6 +141,24 @@ func (m *AdminService) GetExecutionData( return response, nil } +func (m *AdminService) GetExecutionMetrics( + ctx context.Context, request *admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { + defer m.interceptPanic(ctx, request) + if request == nil { + return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") + } + var response *admin.WorkflowExecutionGetMetricsResponse + var err error + m.Metrics.executionEndpointMetrics.getMetrics.Time(func() { + response, err = m.MetricsManager.GetExecutionMetrics(ctx, *request) + }) + if err != nil { + return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.getMetrics) + } + m.Metrics.executionEndpointMetrics.getMetrics.Success() + return response, nil +} + func (m *AdminService) ListExecutions( ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) { defer m.interceptPanic(ctx, request) diff --git a/pkg/rpc/adminservice/metrics.go b/pkg/rpc/adminservice/metrics.go index f5c02c21ee..b2bab4514c 100644 --- a/pkg/rpc/adminservice/metrics.go +++ b/pkg/rpc/adminservice/metrics.go @@ -17,6 +17,7 @@ type executionEndpointMetrics struct { get util.RequestMetrics update util.RequestMetrics getData util.RequestMetrics + getMetrics util.RequestMetrics list util.RequestMetrics terminate util.RequestMetrics } @@ -47,6 +48,7 @@ type nodeExecutionEndpointMetrics struct { createEvent util.RequestMetrics get util.RequestMetrics getData util.RequestMetrics + getMetrics util.RequestMetrics list util.RequestMetrics listChildren util.RequestMetrics } @@ -137,6 +139,7 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics { get: util.NewRequestMetrics(adminScope, "get_execution"), update: util.NewRequestMetrics(adminScope, "update_execution"), getData: util.NewRequestMetrics(adminScope, "get_execution_data"), + getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"), list: util.NewRequestMetrics(adminScope, "list_execution"), terminate: util.NewRequestMetrics(adminScope, "terminate_execution"), }, @@ -161,6 +164,7 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics { createEvent: util.NewRequestMetrics(adminScope, "create_node_execution_event"), get: util.NewRequestMetrics(adminScope, "get_node_execution"), getData: util.NewRequestMetrics(adminScope, "get_node_execution_data"), + getMetrics: util.NewRequestMetrics(adminScope, "get_node_execution_metrics"), list: util.NewRequestMetrics(adminScope, "list_node_execution"), listChildren: util.NewRequestMetrics(adminScope, "list_children_node_executions"), },