Skip to content

Commit

Permalink
Emit error metrics for all failures (flyteorg#98)
Browse files Browse the repository at this point in the history
* Emit error metrics for all kind of failures
Co-authored-by: Anand Swaminathan <[email protected]>
  • Loading branch information
surindersinghp authored Mar 25, 2020
1 parent 0c0d5cc commit 0c4673b
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
nodeStatus.ClearTaskStatus()
nodeStatus.ClearWorkflowStatus()
nodeStatus.ClearDynamicNodeStatus()
nodeStatus.ClearLastAttemptStartedAt()
return executors.NodeStatusPending, nil
}

Expand All @@ -398,23 +397,37 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
return executors.NodeStatusFailed(fmt.Errorf(nodeStatus.GetMessage())), nil
}

// case v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseRetryableFailure:
// Since we reset node status inside execute for retryable failure, we use lastAttemptStartTime to carry that information
// across execute which is used to emit metrics
lastAttemptStartTime := nodeStatus.GetLastAttemptStartedAt()

// case v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseRunning:
logger.Debugf(ctx, "node executing, current phase [%s]", currentPhase)
defer logger.Debugf(ctx, "node execution completed")
p, err := c.execute(ctx, h, nCtx, nodeStatus)
if err != nil {
logger.Errorf(ctx, "failed Execute for node. Error: %s", err.Error())
return executors.NodeStatusUndefined, err
}

// execErr in phase-info 'p' is only available if node has failed to execute, and the current phase at that time
// will be v1alpha1.NodePhaseRunning
execErr := p.GetErr()
if execErr != nil && p.GetPhase() == handler.EPhaseRetryableFailure && nodeStatus.GetLastAttemptStartedAt() != nil {
if execErr != nil && currentPhase == v1alpha1.NodePhaseRunning {

endTime := time.Now()
startTime := endTime
if lastAttemptStartTime != nil {
startTime = lastAttemptStartTime.Time
}

if execErr.GetKind() == core.ExecutionError_SYSTEM {
nodeStatus.IncrementSystemFailures()
c.metrics.SystemErrorDuration.Observe(ctx, nodeStatus.GetLastAttemptStartedAt().Time, time.Now())
c.metrics.SystemErrorDuration.Observe(ctx, startTime, endTime)
} else if execErr.GetKind() == core.ExecutionError_USER {
c.metrics.UserErrorDuration.Observe(ctx, nodeStatus.GetLastAttemptStartedAt().Time, time.Now())
c.metrics.UserErrorDuration.Observe(ctx, startTime, endTime)
} else {
c.metrics.UnknownErrorDuration.Observe(ctx, nodeStatus.GetLastAttemptStartedAt().Time, time.Now())
c.metrics.UnknownErrorDuration.Observe(ctx, startTime, endTime)
}
}

Expand Down Expand Up @@ -461,9 +474,10 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, node.GetID(), err, "failed to record node event")
}

// We reach here only when transitioning from Queued to Running. In this case, the startedAt is not set.
if np == v1alpha1.NodePhaseRunning {
if nodeStatus.GetQueuedAt() != nil && nodeStatus.GetStartedAt() != nil {
c.metrics.QueuingLatency.Observe(ctx, nodeStatus.GetQueuedAt().Time, nodeStatus.GetStartedAt().Time)
if nodeStatus.GetQueuedAt() != nil {
c.metrics.QueuingLatency.Observe(ctx, nodeStatus.GetQueuedAt().Time, time.Now())
}
}
}
Expand Down

0 comments on commit 0c4673b

Please sign in to comment.