Skip to content

Commit

Permalink
Adding metrics to capture pod delay in pending state (flyteorg#47)
Browse files Browse the repository at this point in the history
* adding metrics to capture pod delay in pending state

* typo

* .

* fix
  • Loading branch information
surindersinghp authored Jan 10, 2020
1 parent 7b32cbb commit 7dfd3b5
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 7 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,14 @@ type ExecutableTaskNodeStatus interface {
GetPluginState() []byte
GetPluginStateVersion() uint32
GetBarrierClockTick() uint32
GetLastPhaseUpdatedAt() time.Time
}

type MutableTaskNodeStatus interface {
Mutable
ExecutableTaskNodeStatus
SetPhase(phase int)
SetLastPhaseUpdatedAt(updatedAt time.Time)
SetPhaseVersion(version uint32)
SetPluginState([]byte)
SetPluginStateVersion(uint32)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 42 additions & 1 deletion pkg/apis/flyteworkflow/v1alpha1/mocks/MutableTaskNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 15 additions & 5 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"reflect"
"strconv"
"time"

"github.com/lyft/flytestdlib/storage"

Expand Down Expand Up @@ -633,11 +634,12 @@ func (in *CustomState) DeepCopy() *CustomState {

type TaskNodeStatus struct {
MutableStruct
Phase int `json:"phase,omitempty"`
PhaseVersion uint32 `json:"phaseVersion,omitempty"`
PluginState []byte `json:"pState,omitempty"`
PluginStateVersion uint32 `json:"psv,omitempty"`
BarrierClockTick uint32 `json:"tick,omitempty"`
Phase int `json:"phase,omitempty"`
PhaseVersion uint32 `json:"phaseVersion,omitempty"`
PluginState []byte `json:"pState,omitempty"`
PluginStateVersion uint32 `json:"psv,omitempty"`
BarrierClockTick uint32 `json:"tick,omitempty"`
LastPhaseUpdatedAt time.Time `json:"updAt,omitempty"`
}

func (in *TaskNodeStatus) GetBarrierClockTick() uint32 {
Expand All @@ -654,6 +656,10 @@ func (in *TaskNodeStatus) SetPluginState(s []byte) {
in.SetDirty()
}

func (in TaskNodeStatus) SetLastPhaseUpdatedAt(updatedAt time.Time) {
in.LastPhaseUpdatedAt = updatedAt
}

func (in *TaskNodeStatus) SetPluginStateVersion(v uint32) {
in.PluginStateVersion = v
in.SetDirty()
Expand Down Expand Up @@ -681,6 +687,10 @@ func (in TaskNodeStatus) GetPhase() int {
return in.Phase
}

func (in TaskNodeStatus) GetLastPhaseUpdatedAt() time.Time {
return in.LastPhaseUpdatedAt
}

func (in TaskNodeStatus) GetPhaseVersion() uint32 {
return in.PhaseVersion
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"time"

pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
Expand All @@ -15,6 +17,7 @@ type TaskNodeState struct {
PluginState []byte
PluginStateVersion uint32
BarrierClockTick uint32
LastPhaseUpdatedAt time.Time
}

type BranchNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginStateVersion: tn.GetPluginStateVersion(),
PluginState: tn.GetPluginState(),
BarrierClockTick: tn.GetBarrierClockTick(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
}
}
return handler.TaskNodeState{}
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type metrics struct {
catalogMissCount labeled.Counter
catalogHitCount labeled.Counter
pluginExecutionLatency labeled.StopWatch
pluginQueueLatency labeled.StopWatch

// TODO We should have a metric to capture custom state size
scope promutils.Scope
Expand Down Expand Up @@ -269,6 +270,14 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
}
pluginTrns.ObservedTransitionAndState(trns, v, b)

// Emit the queue latency if the task has just transitioned from Queued to Running.
if ts.PluginPhase == pluginCore.PhaseQueued &&
(pluginTrns.pInfo.Phase() == pluginCore.PhaseInitializing || pluginTrns.pInfo.Phase() == pluginCore.PhaseRunning) {
if !ts.LastPhaseUpdatedAt.IsZero() {
t.metrics.pluginQueueLatency.Observe(ctx, ts.LastPhaseUpdatedAt, time.Now())
}
}

if pluginTrns.pInfo.Phase() == ts.PluginPhase {
if pluginTrns.pInfo.Version() == ts.PluginPhaseVersion {
logger.Debugf(ctx, "p+Version previously seen .. no event will be sent")
Expand Down Expand Up @@ -478,6 +487,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginPhase: pluginTrns.pInfo.Phase(),
PluginPhaseVersion: pluginTrns.pInfo.Version(),
BarrierClockTick: barrierTick,
LastPhaseUpdatedAt: time.Now(),
})
if err != nil {
logger.Errorf(ctx, "Failed to store TaskNode state, err :%s", err.Error())
Expand Down Expand Up @@ -598,6 +608,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client
catalogPutFailureCount: labeled.NewCounter("discovery_put_failure_count", "Discovery Put failure count", scope),
catalogGetFailureCount: labeled.NewCounter("discovery_get_failure_count", "Discovery Get faillure count", scope),
pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latecny", "Time taken to invoke plugin for one round", time.Microsecond, scope),
pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latecny", "Time spent by plugin in queued phase", time.Microsecond, scope),
scope: scope,
},
kubeClient: kubeClient,
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t := s.GetOrCreateTaskStatus()
t.SetPhaseVersion(n.t.PluginPhaseVersion)
t.SetPhase(int(n.t.PluginPhase))
t.SetLastPhaseUpdatedAt(n.t.LastPhaseUpdatedAt)
t.SetPluginState(n.t.PluginState)
t.SetPluginStateVersion(n.t.PluginStateVersion)
t.SetBarrierClockTick(n.t.BarrierClockTick)
Expand Down

0 comments on commit 7dfd3b5

Please sign in to comment.