Skip to content

Commit

Permalink
Adding metrics to capture pod delay in pending state (flyteorg#47)
Browse files Browse the repository at this point in the history
* adding metrics to capture pod delay in pending state

* typo

* .

* fix
  • Loading branch information
surindersinghp authored Jan 10, 2020
1 parent 78b1164 commit c03dbbd
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 7 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,14 @@ type ExecutableTaskNodeStatus interface {
GetPluginState() []byte
GetPluginStateVersion() uint32
GetBarrierClockTick() uint32
GetLastPhaseUpdatedAt() time.Time
}

type MutableTaskNodeStatus interface {
Mutable
ExecutableTaskNodeStatus
SetPhase(phase int)
SetLastPhaseUpdatedAt(updatedAt time.Time)
SetPhaseVersion(version uint32)
SetPluginState([]byte)
SetPluginStateVersion(uint32)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 15 additions & 5 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"reflect"
"strconv"
"time"

"github.com/lyft/flytestdlib/storage"

Expand Down Expand Up @@ -633,11 +634,12 @@ func (in *CustomState) DeepCopy() *CustomState {

type TaskNodeStatus struct {
MutableStruct
Phase int `json:"phase,omitempty"`
PhaseVersion uint32 `json:"phaseVersion,omitempty"`
PluginState []byte `json:"pState,omitempty"`
PluginStateVersion uint32 `json:"psv,omitempty"`
BarrierClockTick uint32 `json:"tick,omitempty"`
Phase int `json:"phase,omitempty"`
PhaseVersion uint32 `json:"phaseVersion,omitempty"`
PluginState []byte `json:"pState,omitempty"`
PluginStateVersion uint32 `json:"psv,omitempty"`
BarrierClockTick uint32 `json:"tick,omitempty"`
LastPhaseUpdatedAt time.Time `json:"updAt,omitempty"`
}

func (in *TaskNodeStatus) GetBarrierClockTick() uint32 {
Expand All @@ -654,6 +656,10 @@ func (in *TaskNodeStatus) SetPluginState(s []byte) {
in.SetDirty()
}

func (in TaskNodeStatus) SetLastPhaseUpdatedAt(updatedAt time.Time) {
in.LastPhaseUpdatedAt = updatedAt
}

func (in *TaskNodeStatus) SetPluginStateVersion(v uint32) {
in.PluginStateVersion = v
in.SetDirty()
Expand Down Expand Up @@ -681,6 +687,10 @@ func (in TaskNodeStatus) GetPhase() int {
return in.Phase
}

func (in TaskNodeStatus) GetLastPhaseUpdatedAt() time.Time {
return in.LastPhaseUpdatedAt
}

func (in TaskNodeStatus) GetPhaseVersion() uint32 {
return in.PhaseVersion
}
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"time"

pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
Expand All @@ -15,6 +17,7 @@ type TaskNodeState struct {
PluginState []byte
PluginStateVersion uint32
BarrierClockTick uint32
LastPhaseUpdatedAt time.Time
}

type BranchNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginStateVersion: tn.GetPluginStateVersion(),
PluginState: tn.GetPluginState(),
BarrierClockTick: tn.GetBarrierClockTick(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
}
}
return handler.TaskNodeState{}
Expand Down
11 changes: 11 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type metrics struct {
catalogMissCount labeled.Counter
catalogHitCount labeled.Counter
pluginExecutionLatency labeled.StopWatch
pluginQueueLatency labeled.StopWatch

// TODO We should have a metric to capture custom state size
scope promutils.Scope
Expand Down Expand Up @@ -269,6 +270,14 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
}
pluginTrns.ObservedTransitionAndState(trns, v, b)

// Emit the queue latency if the task has just transitioned from Queued to Running.
if ts.PluginPhase == pluginCore.PhaseQueued &&
(pluginTrns.pInfo.Phase() == pluginCore.PhaseInitializing || pluginTrns.pInfo.Phase() == pluginCore.PhaseRunning) {
if !ts.LastPhaseUpdatedAt.IsZero() {
t.metrics.pluginQueueLatency.Observe(ctx, ts.LastPhaseUpdatedAt, time.Now())
}
}

if pluginTrns.pInfo.Phase() == ts.PluginPhase {
if pluginTrns.pInfo.Version() == ts.PluginPhaseVersion {
logger.Debugf(ctx, "p+Version previously seen .. no event will be sent")
Expand Down Expand Up @@ -478,6 +487,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginPhase: pluginTrns.pInfo.Phase(),
PluginPhaseVersion: pluginTrns.pInfo.Version(),
BarrierClockTick: barrierTick,
LastPhaseUpdatedAt: time.Now(),
})
if err != nil {
logger.Errorf(ctx, "Failed to store TaskNode state, err :%s", err.Error())
Expand Down Expand Up @@ -598,6 +608,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client
catalogPutFailureCount: labeled.NewCounter("discovery_put_failure_count", "Discovery Put failure count", scope),
catalogGetFailureCount: labeled.NewCounter("discovery_get_failure_count", "Discovery Get faillure count", scope),
pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latecny", "Time taken to invoke plugin for one round", time.Microsecond, scope),
pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latecny", "Time spent by plugin in queued phase", time.Microsecond, scope),
scope: scope,
},
kubeClient: kubeClient,
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t := s.GetOrCreateTaskStatus()
t.SetPhaseVersion(n.t.PluginPhaseVersion)
t.SetPhase(int(n.t.PluginPhase))
t.SetLastPhaseUpdatedAt(n.t.LastPhaseUpdatedAt)
t.SetPluginState(n.t.PluginState)
t.SetPluginStateVersion(n.t.PluginStateVersion)
t.SetBarrierClockTick(n.t.BarrierClockTick)
Expand Down

0 comments on commit c03dbbd

Please sign in to comment.