Skip to content

Commit

Permalink
Add basic fast task service metrics (#268)
Browse files Browse the repository at this point in the history
## Overview
Adds some basic metrics to fast task service to monitor (1) paths from calls to assign a task via `OfferOnQueue` and (2) period counts of queues and workers

## Test Plan
- [x] Run locally and verify that metrics are scrapable

```
❯ curl http://localhost:10254/metrics | rg -e "^fasttask"
fasttask:queue 1
fasttask:task_assigned 4
fasttask:task_no_capacity_available 0
fasttask:task_no_workers_available 4
fasttask:workers 1
```

## Rollout Plan (if applicable)
Pick up eventually in cloud

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed
  • Loading branch information
andrewwdye authored May 15, 2024
1 parent 2fada83 commit 705ca0d
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion fasttask/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"math/rand"
"sync"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/types"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"

"github.com/unionai/flyte/fasttask/plugin/pb"
)
Expand All @@ -24,6 +26,7 @@ type FastTaskService struct {
queues map[string]*Queue
queuesLock sync.RWMutex
taskStatusChannels sync.Map // map[string]chan *WorkerTaskStatus
metrics metrics
}

// Queue is a collection of Workers that are capable of executing similar tasks.
Expand All @@ -45,6 +48,36 @@ type workerTaskStatus struct {
taskStatus *pb.TaskStatus
}

type metrics struct {
taskNoWorkersAvailable prometheus.Counter
taskNoCapacityAvailable prometheus.Counter
taskAssigned prometheus.Counter

queues *prometheus.Desc
workers *prometheus.Desc
}

func (f *FastTaskService) Describe(ch chan<- *prometheus.Desc) {
ch <- f.metrics.queues
ch <- f.metrics.workers
}

func (f *FastTaskService) Collect(ch chan<- prometheus.Metric) {
f.queuesLock.RLock()
defer f.queuesLock.RUnlock()

queues := len(f.queues)
workers := 0
for _, queue := range f.queues {
queue.lock.RLock()
workers += len(queue.workers)
queue.lock.RUnlock()
}

ch <- prometheus.MustNewConstMetric(f.metrics.queues, prometheus.GaugeValue, float64(queues))
ch <- prometheus.MustNewConstMetric(f.metrics.workers, prometheus.GaugeValue, float64(workers))
}

// Heartbeat is a gRPC stream that manages the heartbeat of a fasttask worker. This includes
// receiving task status updates and sending task assignments.
func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error {
Expand Down Expand Up @@ -173,6 +206,7 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam

queue, exists := f.queues[queueID]
if !exists {
f.metrics.taskNoWorkersAvailable.Inc()
return "", nil // no workers available
}

Expand All @@ -198,10 +232,12 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam
worker = acceptedWorkers[rand.Intn(len(acceptedWorkers))]
worker.capacity.BacklogCount++
} else {
f.metrics.taskNoCapacityAvailable.Inc()
return "", nil // no workers available
}

// send assign message to worker
f.metrics.taskAssigned.Inc()
worker.responseChan <- &pb.HeartbeatResponse{
TaskId: taskID,
Namespace: namespace,
Expand Down Expand Up @@ -294,8 +330,18 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID

// NewFastTaskService creates a new FastTaskService.
func NewFastTaskService(enqueueOwner core.EnqueueOwner) *FastTaskService {
return &FastTaskService{
scope := promutils.NewScope("fasttask")
svc := &FastTaskService{
enqueueOwner: enqueueOwner,
queues: make(map[string]*Queue),
metrics: metrics{
taskNoWorkersAvailable: scope.MustNewCounter("task_no_workers_available", "Count of task assignment attempts with no workers available"),
taskNoCapacityAvailable: scope.MustNewCounter("task_no_capacity_available", "Count of task assignment attempts with no capacity available"),
taskAssigned: scope.MustNewCounter("task_assigned", "Count of task assignments"),
queues: prometheus.NewDesc(scope.NewScopedMetricName("queue"), "Current number of queues", nil, nil),
workers: prometheus.NewDesc(scope.NewScopedMetricName("workers"), "Current number of workers", nil, nil),
},
}
prometheus.MustRegister(svc)
return svc
}

0 comments on commit 705ca0d

Please sign in to comment.