Skip to content

Commit

Permalink
Add metrics for # of interruptible nodes running (flyteorg#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Jul 24, 2020
1 parent 562969e commit 51338b6
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ type nodeMetrics struct {
InputsWriteFailure labeled.Counter
TimedOutFailure labeled.Counter

InterruptedThresholdHit labeled.Counter
InterruptedThresholdHit labeled.Counter
InterruptibleNodesRunning labeled.Counter
InterruptibleNodesTerminated labeled.Counter

// Measures the latency between the last parent node stoppedAt time and current node's queued time.
TransitionLatency labeled.StopWatch
Expand Down Expand Up @@ -312,6 +314,9 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor
}

if np == v1alpha1.NodePhaseQueued {
if nCtx.md.IsInterruptible() {
c.metrics.InterruptibleNodesRunning.Inc(ctx)
}
return executors.NodeStatusQueued, nil
} else if np == v1alpha1.NodePhaseSkipped {
return executors.NodeStatusSuccess, nil
Expand Down Expand Up @@ -464,6 +469,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur
}
nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError())
c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time)
if nCtx.md.IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
}
return executors.NodeStatusFailed(nodeStatus.GetExecutionError()), nil
}

Expand All @@ -476,6 +484,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur
nodeStatus.ClearSubNodeStatus()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError())
c.metrics.TimedOutFailure.Inc(ctx)
if nCtx.md.IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
}
return executors.NodeStatusTimedOut, nil
}

Expand All @@ -488,6 +499,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur
nodeStatus.ClearSubNodeStatus()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, v1.Now(), "completed successfully", nil)
c.metrics.SuccessDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time)
if nCtx.md.IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
}
return executors.NodeStatusSuccess, nil
}

Expand Down Expand Up @@ -868,6 +882,8 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
InputsWriteFailure: labeled.NewCounter("inputs_write_fail", "Indicates failure in writing node inputs to metastore", nodeScope),
TimedOutFailure: labeled.NewCounter("timeout_fail", "Indicates failure due to timeout", nodeScope),
InterruptedThresholdHit: labeled.NewCounter("interrupted_threshold", "Indicates the node interruptible disabled because it hit max failure count", nodeScope),
InterruptibleNodesRunning: labeled.NewCounter("interruptible_nodes_running", "number of interruptible nodes running", nodeScope),
InterruptibleNodesTerminated: labeled.NewCounter("interruptible_nodes_terminated", "number of interruptible nodes finished running", nodeScope),
ResolutionFailure: labeled.NewCounter("input_resolve_fail", "Indicates failure in resolving node inputs", nodeScope),
TransitionLatency: labeled.NewStopWatch("transition_latency", "Measures the latency between the last parent node stoppedAt time and current node's queued time.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
QueuingLatency: labeled.NewStopWatch("queueing_latency", "Measures the latency between the time a node's been queued to the time the handler reported the executable moved to running state", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
Expand Down

0 comments on commit 51338b6

Please sign in to comment.