Skip to content

Commit

Permalink
Added metrics to fasttask builder and plugin (#303)
Browse files Browse the repository at this point in the history
## Overview
This PR adds prometheus metrics to the fasttask builder and plugin to improve observability of the execution path.

## Test Plan
These metrics were exported and tested locally using a simple `curl` command to the single binary endpoint.

## Rollout Plan (if applicable)
Will need to update the cloud repo to ensure we are correctly pulling these updates.

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

## Issue
https://linear.app/unionai/issue/EXO-96/fill-out-fasttask-metrics

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
hamersaw authored May 29, 2024
1 parent 63ede5b commit a411adb
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 39 deletions.
32 changes: 31 additions & 1 deletion fasttask/plugin/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

_struct "github.com/golang/protobuf/ptypes/struct"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"

"github.com/unionai/flyte/fasttask/plugin/pb"
)
Expand All @@ -41,6 +43,24 @@ const (
TTL_SECONDS = "ttl-seconds"
)

// builderMetrics is a collection of metrics for the InMemoryEnvBuilder.
type builderMetrics struct {
environmentsCreated prometheus.Counter
environmentsGCed prometheus.Counter
environmentOrphansDetected prometheus.Counter
environmentsRepaired prometheus.Counter
}

// newBuilderMetrics creates a new builderMetrics with the given scope.
func newBuilderMetrics(scope promutils.Scope) builderMetrics {
return builderMetrics{
environmentsCreated: scope.MustNewCounter("env_created", "The number of environments created"),
environmentsGCed: scope.MustNewCounter("env_gced", "The number of environments garbage collected"),
environmentOrphansDetected: scope.MustNewCounter("env_orphans_detected", "The number of orphaned environments detected"),
environmentsRepaired: scope.MustNewCounter("env_repaired", "The number of environments repaired"),
}
}

// environment represents a managed fast task environment, including it's definition and current
// state
type environment struct {
Expand All @@ -57,6 +77,7 @@ type InMemoryEnvBuilder struct {
environments map[string]*environment
kubeClient core.KubeClient
lock sync.Mutex
metrics builderMetrics
randSource *rand.Rand
}

Expand Down Expand Up @@ -140,6 +161,8 @@ func (i *InMemoryEnvBuilder) Create(ctx context.Context, executionEnvID string,
}

i.environments[executionEnvID] = env
i.metrics.environmentsCreated.Inc()

i.lock.Unlock()

// create replicas
Expand Down Expand Up @@ -385,6 +408,8 @@ func (i *InMemoryEnvBuilder) gcEnvironments(ctx context.Context) error {
i.lock.Lock()
for _, environmentID := range deletedEnvironments {
logger.Infof(ctx, "garbage collected environment '%s'", environmentID)
i.metrics.environmentsGCed.Inc()

delete(i.environments, environmentID)
}
i.lock.Unlock()
Expand Down Expand Up @@ -462,6 +487,8 @@ func (i *InMemoryEnvBuilder) repairEnvironments(ctx context.Context) error {
}

logger.Infof(ctx, "repaired environment '%s'", environmentID)
i.metrics.environmentsRepaired.Inc()

environment.state = HEALTHY
}
i.lock.Unlock()
Expand Down Expand Up @@ -550,17 +577,20 @@ func (i *InMemoryEnvBuilder) detectOrphanedEnvironments(ctx context.Context, k8s
// copy orphaned environments to env builder
for environmentID, orphanedEnvironment := range orphanedEnvironments {
logger.Infof(ctx, "detected orphaned environment '%s'", environmentID)
i.metrics.environmentOrphansDetected.Inc()

i.environments[environmentID] = orphanedEnvironment
}

return nil
}

// NewEnvironmentBuilder creates a new InMemoryEnvBuilder with the given kube client.
func NewEnvironmentBuilder(kubeClient core.KubeClient) *InMemoryEnvBuilder {
func NewEnvironmentBuilder(kubeClient core.KubeClient, scope promutils.Scope) *InMemoryEnvBuilder {
return &InMemoryEnvBuilder{
environments: make(map[string]*environment),
kubeClient: kubeClient,
metrics: newBuilderMetrics(scope),
randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
21 changes: 16 additions & 5 deletions fasttask/plugin/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

coremocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/flyteorg/flyte/flytestdlib/promutils"

"github.com/unionai/flyte/fasttask/plugin/pb"
)
Expand Down Expand Up @@ -136,6 +137,8 @@ func TestCreate(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

fastTaskEnvSpecStruct := &_struct.Struct{}
err := utils.MarshalStruct(test.environmentSpec, fastTaskEnvSpecStruct)
assert.Nil(t, err)
Expand All @@ -148,7 +151,7 @@ func TestCreate(t *testing.T) {
kubeClientImpl.OnGetClient().Return(kubeClient)
kubeClientImpl.OnGetCache().Return(kubeCache)

builder := NewEnvironmentBuilder(kubeClientImpl)
builder := NewEnvironmentBuilder(kubeClientImpl, scope)
builder.environments = test.environments

// call `Create`
Expand Down Expand Up @@ -214,6 +217,8 @@ func TestDetectOrphanedEnvironments(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

// initialize InMemoryBuilder
kubeClient := &kubeClient{}
kubeCache := &kubeCache{
Expand All @@ -224,7 +229,7 @@ func TestDetectOrphanedEnvironments(t *testing.T) {
kubeClientImpl.OnGetClient().Return(kubeClient)
kubeClientImpl.OnGetCache().Return(kubeCache)

builder := NewEnvironmentBuilder(kubeClientImpl)
builder := NewEnvironmentBuilder(kubeClientImpl, scope)
builder.environments = test.environments

// call `Create`
Expand Down Expand Up @@ -297,6 +302,8 @@ func TestGCEnvironments(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

// initialize InMemoryBuilder
kubeClient := &kubeClient{}
kubeCache := &kubeCache{}
Expand All @@ -305,7 +312,7 @@ func TestGCEnvironments(t *testing.T) {
kubeClientImpl.OnGetClient().Return(kubeClient)
kubeClientImpl.OnGetCache().Return(kubeCache)

builder := NewEnvironmentBuilder(kubeClientImpl)
builder := NewEnvironmentBuilder(kubeClientImpl, scope)
builder.environments = test.environments

// call `Create`
Expand Down Expand Up @@ -360,6 +367,8 @@ func TestGet(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

// initialize InMemoryBuilder
kubeClient := &kubeClient{}
kubeCache := &kubeCache{}
Expand All @@ -368,7 +377,7 @@ func TestGet(t *testing.T) {
kubeClientImpl.OnGetClient().Return(kubeClient)
kubeClientImpl.OnGetCache().Return(kubeCache)

builder := NewEnvironmentBuilder(kubeClientImpl)
builder := NewEnvironmentBuilder(kubeClientImpl, scope)
builder.environments = test.environments

// call `Get`
Expand Down Expand Up @@ -463,6 +472,8 @@ func TestRepairEnvironments(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

// initialize InMemoryBuilder
kubeClient := &kubeClient{}
kubeCache := &kubeCache{
Expand All @@ -473,7 +484,7 @@ func TestRepairEnvironments(t *testing.T) {
kubeClientImpl.OnGetClient().Return(kubeClient)
kubeClientImpl.OnGetCache().Return(kubeCache)

builder := NewEnvironmentBuilder(kubeClientImpl)
builder := NewEnvironmentBuilder(kubeClientImpl, scope)
builder.environments = test.environments

// call `Create`
Expand Down
22 changes: 22 additions & 0 deletions fasttask/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"time"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"

Expand All @@ -20,6 +21,7 @@ import (
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"

"github.com/unionai/flyte/fasttask/plugin/pb"
)
Expand All @@ -35,6 +37,20 @@ const (
Submitted
)

// pluginMetrics is a collection of metrics for the plugin.
type pluginMetrics struct {
workersUnavailableTimeout prometheus.Counter
statusUpdateNotFoundTimeout prometheus.Counter
}

// newPluginMetrics creates a new pluginMetrics with the given scope.
func newPluginMetrics(scope promutils.Scope) pluginMetrics {
return pluginMetrics{
workersUnavailableTimeout: scope.MustNewCounter("workers_unavailable_timeout", "Count of tasks that timed out waiting for workers to become available"),
statusUpdateNotFoundTimeout: scope.MustNewCounter("status_update_not_found_timeout", "Count of tasks that timed out waiting for status update from worker"),
}
}

// State maintains the current status of the task execution.
type State struct {
SubmissionPhase SubmissionPhase
Expand All @@ -45,6 +61,7 @@ type State struct {
// Plugin is a fast task plugin that offers task execution to a worker pool.
type Plugin struct {
fastTaskService FastTaskService
metrics pluginMetrics
}

// GetID returns the unique identifier for the plugin.
Expand Down Expand Up @@ -212,6 +229,8 @@ func (p *Plugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (co
// fail if no worker available within grace period
if time.Since(pluginState.LastUpdated) > GetConfig().GracePeriodWorkersUnavailable.Duration {
logger.Infof(ctx, "Timed out waiting for available worker for queue %s", queueID)
p.metrics.workersUnavailableTimeout.Inc()

phaseInfo = core.PhaseInfoSystemFailure("unknown", fmt.Sprintf("timed out waiting for available worker for queue %s", queueID), nil)
} else {
phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "no workers available", nil)
Expand All @@ -227,6 +246,8 @@ func (p *Plugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (co
} else if errors.Is(err, statusUpdateNotFoundError) && now.Sub(pluginState.LastUpdated) > GetConfig().GracePeriodStatusNotFound.Duration {
// if task has not been updated within the grace period we should abort
logger.Infof(ctx, "Task status update not reported within grace period for queue %s and worker %s", queueID, pluginState.WorkerID)
p.metrics.statusUpdateNotFoundTimeout.Inc()

return core.DoTransition(core.PhaseInfoSystemRetryableFailure("unknown", fmt.Sprintf("task status update not reported within grace period for queue %s and worker %s", queueID, pluginState.WorkerID), nil)), nil
} else if phase == core.PhaseSuccess {
taskTemplate, err := tCtx.TaskReader().Read(ctx)
Expand Down Expand Up @@ -315,6 +336,7 @@ func init() {

return &Plugin{
fastTaskService: fastTaskService,
metrics: newPluginMetrics(iCtx.MetricsScope()),
}, nil
},
IsDefault: false,
Expand Down
15 changes: 14 additions & 1 deletion fasttask/plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
iomocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io/mocks"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/flyteorg/flyte/flytestdlib/promutils"

"github.com/unionai/flyte/fasttask/plugin/mocks"
"github.com/unionai/flyte/fasttask/plugin/pb"
Expand Down Expand Up @@ -76,6 +77,7 @@ func getBaseFasttaskTaskTemplate(t *testing.T) *idlcore.TaskTemplate {

func TestFinalize(t *testing.T) {
ctx := context.TODO()
scope := promutils.NewTestScope()

// initialize fasttask TaskTemplate
taskTemplate := getBaseFasttaskTaskTemplate(t)
Expand Down Expand Up @@ -114,6 +116,7 @@ func TestFinalize(t *testing.T) {
// initialize plugin
plugin := &Plugin{
fastTaskService: fastTaskService,
metrics: newPluginMetrics(scope),
}

// call handle
Expand Down Expand Up @@ -229,6 +232,8 @@ func TestGetExecutionEnv(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

// initialize fasttask TaskTemplate
executionEnvStruct := buildFasttaskEnvironment(t, test.fastTaskExtant, test.fastTaskSpec)
taskTemplate := &idlcore.TaskTemplate{
Expand Down Expand Up @@ -266,7 +271,9 @@ func TestGetExecutionEnv(t *testing.T) {
tCtx.OnGetExecutionEnvClient().Return(executionEnvClient)

// initialize plugin
plugin := &Plugin{}
plugin := &Plugin{
metrics: newPluginMetrics(scope),
}

// call handle
fastTaskEnvironment, err := plugin.getExecutionEnv(ctx, tCtx)
Expand Down Expand Up @@ -339,6 +346,8 @@ func TestHandleNotYetStarted(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

// create TaskExecutionContext
tCtx := &coremocks.TaskExecutionContext{}
tCtx.OnInputReader().Return(inputReader)
Expand Down Expand Up @@ -377,6 +386,7 @@ func TestHandleNotYetStarted(t *testing.T) {
// initialize plugin
plugin := &Plugin{
fastTaskService: fastTaskService,
metrics: newPluginMetrics(scope),
}

// call handle
Expand Down Expand Up @@ -462,6 +472,8 @@ func TestHandleRunning(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scope := promutils.NewTestScope()

// create TaskExecutionContext
tCtx := &coremocks.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(taskMetadata)
Expand Down Expand Up @@ -499,6 +511,7 @@ func TestHandleRunning(t *testing.T) {
// initialize plugin
plugin := &Plugin{
fastTaskService: fastTaskService,
metrics: newPluginMetrics(scope),
}

// call handle
Expand Down
Loading

0 comments on commit a411adb

Please sign in to comment.