diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index e2d3d46592..c1e33c950d 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -47,7 +47,9 @@ type nodeMetrics struct { InputsWriteFailure labeled.Counter TimedOutFailure labeled.Counter - InterruptedThresholdHit labeled.Counter + InterruptedThresholdHit labeled.Counter + InterruptibleNodesRunning labeled.Counter + InterruptibleNodesTerminated labeled.Counter // Measures the latency between the last parent node stoppedAt time and current node's queued time. TransitionLatency labeled.StopWatch @@ -312,6 +314,9 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor } if np == v1alpha1.NodePhaseQueued { + if nCtx.md.IsInterruptible() { + c.metrics.InterruptibleNodesRunning.Inc(ctx) + } return executors.NodeStatusQueued, nil } else if np == v1alpha1.NodePhaseSkipped { return executors.NodeStatusSuccess, nil @@ -464,6 +469,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur } nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time) + if nCtx.md.IsInterruptible() { + c.metrics.InterruptibleNodesTerminated.Inc(ctx) + } return executors.NodeStatusFailed(nodeStatus.GetExecutionError()), nil } @@ -476,6 +484,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur nodeStatus.ClearSubNodeStatus() nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.TimedOutFailure.Inc(ctx) + if nCtx.md.IsInterruptible() { + c.metrics.InterruptibleNodesTerminated.Inc(ctx) + } return executors.NodeStatusTimedOut, nil } @@ -488,6 +499,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur nodeStatus.ClearSubNodeStatus() nodeStatus.UpdatePhase(v1alpha1.NodePhaseSucceeded, v1.Now(), "completed successfully", nil) c.metrics.SuccessDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time) + if nCtx.md.IsInterruptible() { + c.metrics.InterruptibleNodesTerminated.Inc(ctx) + } return executors.NodeStatusSuccess, nil } @@ -868,6 +882,8 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora InputsWriteFailure: labeled.NewCounter("inputs_write_fail", "Indicates failure in writing node inputs to metastore", nodeScope), TimedOutFailure: labeled.NewCounter("timeout_fail", "Indicates failure due to timeout", nodeScope), InterruptedThresholdHit: labeled.NewCounter("interrupted_threshold", "Indicates the node interruptible disabled because it hit max failure count", nodeScope), + InterruptibleNodesRunning: labeled.NewCounter("interruptible_nodes_running", "number of interruptible nodes running", nodeScope), + InterruptibleNodesTerminated: labeled.NewCounter("interruptible_nodes_terminated", "number of interruptible nodes finished running", nodeScope), ResolutionFailure: labeled.NewCounter("input_resolve_fail", "Indicates failure in resolving node inputs", nodeScope), TransitionLatency: labeled.NewStopWatch("transition_latency", "Measures the latency between the last parent node stoppedAt time and current node's queued time.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric), QueuingLatency: labeled.NewStopWatch("queueing_latency", "Measures the latency between the time a node's been queued to the time the handler reported the executable moved to running state", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),