diff --git a/flyteadmin/cmd/entrypoints/serve.go b/flyteadmin/cmd/entrypoints/serve.go index 38444400c1..32e6ca8f26 100644 --- a/flyteadmin/cmd/entrypoints/serve.go +++ b/flyteadmin/cmd/entrypoints/serve.go @@ -49,6 +49,6 @@ func init() { // Set Keys labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey, contextutils.ExecIDKey, contextutils.WorkflowIDKey, contextutils.NodeIDKey, contextutils.TaskIDKey, - contextutils.TaskTypeKey, contextutils.PhaseKey, contextutils.LaunchPlanIDKey, common.RuntimeTypeKey, + contextutils.TaskTypeKey, contextutils.PhaseKey, contextutils.LaunchPlanIDKey, common.ErrorKindKey, common.RuntimeTypeKey, common.RuntimeVersionKey) } diff --git a/flyteadmin/pkg/common/constants.go b/flyteadmin/pkg/common/constants.go index 6f95e5cdfd..f67ac583e0 100644 --- a/flyteadmin/pkg/common/constants.go +++ b/flyteadmin/pkg/common/constants.go @@ -8,6 +8,7 @@ var RuntimeVersionKey = contextutils.Key("runtime_version") const ( AuditFieldsContextKey contextutils.Key = "audit_fields" PrincipalContextKey contextutils.Key = "principal" + ErrorKindKey contextutils.Key = "error_kind" ) const MaxResponseStatusBytes = 32000 diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 80ed9c7ee7..a7a780be77 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1343,6 +1343,16 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi go m.emitScheduledWorkflowMetrics(ctx, executionModel, request.Event.OccurredAt) } } else if common.IsExecutionTerminal(request.Event.Phase) { + if request.Event.Phase == core.WorkflowExecution_FAILED { + // request.Event is expected to be of type WorkflowExecutionEvent_Error when workflow fails. + // if not, log the error and continue + if err := request.Event.GetError(); err != nil { + ctx = context.WithValue(ctx, common.ErrorKindKey, err.Kind.String()) + } else { + logger.Warning(ctx, "Failed to parse error for FAILED request [%+v]", request) + } + } + m.systemMetrics.ActiveExecutions.Dec() m.systemMetrics.ExecutionsTerminated.Inc(contextutils.WithPhase(ctx, request.Event.Phase.String())) go m.emitOverallWorkflowExecutionTime(executionModel, request.Event.OccurredAt) diff --git a/flyteadmin/pkg/manager/impl/task_manager_test.go b/flyteadmin/pkg/manager/impl/task_manager_test.go index 4e933142e4..a7d78bf9c1 100644 --- a/flyteadmin/pkg/manager/impl/task_manager_test.go +++ b/flyteadmin/pkg/manager/impl/task_manager_test.go @@ -48,7 +48,7 @@ var taskIdentifier = core.Identifier{ } func init() { - labeled.SetMetricKeys(common.RuntimeTypeKey, common.RuntimeVersionKey) + labeled.SetMetricKeys(common.RuntimeTypeKey, common.RuntimeVersionKey, common.ErrorKindKey) } func getMockTaskCompiler() workflowengine.Compiler {