Skip to content

Commit

Permalink
Add error kind tag to ExecutionsTerminated metric (flyteorg#409)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexapdev authored Apr 20, 2022
1 parent 4d3ade8 commit 57f9ae2
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/cmd/entrypoints/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ func init() {
// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey,
contextutils.ExecIDKey, contextutils.WorkflowIDKey, contextutils.NodeIDKey, contextutils.TaskIDKey,
contextutils.TaskTypeKey, contextutils.PhaseKey, contextutils.LaunchPlanIDKey, common.RuntimeTypeKey,
contextutils.TaskTypeKey, contextutils.PhaseKey, contextutils.LaunchPlanIDKey, common.ErrorKindKey, common.RuntimeTypeKey,
common.RuntimeVersionKey)
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var RuntimeVersionKey = contextutils.Key("runtime_version")
const (
AuditFieldsContextKey contextutils.Key = "audit_fields"
PrincipalContextKey contextutils.Key = "principal"
ErrorKindKey contextutils.Key = "error_kind"
)

const MaxResponseStatusBytes = 32000
Expand Down
10 changes: 10 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,16 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
go m.emitScheduledWorkflowMetrics(ctx, executionModel, request.Event.OccurredAt)
}
} else if common.IsExecutionTerminal(request.Event.Phase) {
if request.Event.Phase == core.WorkflowExecution_FAILED {
// request.Event is expected to be of type WorkflowExecutionEvent_Error when workflow fails.
// if not, log the error and continue
if err := request.Event.GetError(); err != nil {
ctx = context.WithValue(ctx, common.ErrorKindKey, err.Kind.String())
} else {
logger.Warning(ctx, "Failed to parse error for FAILED request [%+v]", request)
}
}

m.systemMetrics.ActiveExecutions.Dec()
m.systemMetrics.ExecutionsTerminated.Inc(contextutils.WithPhase(ctx, request.Event.Phase.String()))
go m.emitOverallWorkflowExecutionTime(executionModel, request.Event.OccurredAt)
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var taskIdentifier = core.Identifier{
}

func init() {
labeled.SetMetricKeys(common.RuntimeTypeKey, common.RuntimeVersionKey)
labeled.SetMetricKeys(common.RuntimeTypeKey, common.RuntimeVersionKey, common.ErrorKindKey)
}

func getMockTaskCompiler() workflowengine.Compiler {
Expand Down

0 comments on commit 57f9ae2

Please sign in to comment.