From 705ca0d0382a6303cd00ed9f25cb1de053c4d3d4 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 15 May 2024 12:21:32 -0700 Subject: [PATCH] Add basic fast task service metrics (#268) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Overview Adds some basic metrics to fast task service to monitor (1) paths from calls to assign a task via `OfferOnQueue` and (2) period counts of queues and workers ## Test Plan - [x] Run locally and verify that metrics are scrapable ``` ❯ curl http://localhost:10254/metrics | rg -e "^fasttask" fasttask:queue 1 fasttask:task_assigned 4 fasttask:task_no_capacity_available 0 fasttask:task_no_workers_available 4 fasttask:workers 1 ``` ## Rollout Plan (if applicable) Pick up eventually in cloud ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [ ] To be upstreamed --- fasttask/plugin/service.go | 48 +++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index 656850bbad..add8c63cef 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -8,10 +8,12 @@ import ( "math/rand" "sync" + "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/types" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/unionai/flyte/fasttask/plugin/pb" ) @@ -24,6 +26,7 @@ type FastTaskService struct { queues map[string]*Queue queuesLock sync.RWMutex taskStatusChannels sync.Map // map[string]chan *WorkerTaskStatus + metrics metrics } // Queue is a collection of Workers that are capable of executing similar tasks. @@ -45,6 +48,36 @@ type workerTaskStatus struct { taskStatus *pb.TaskStatus } +type metrics struct { + taskNoWorkersAvailable prometheus.Counter + taskNoCapacityAvailable prometheus.Counter + taskAssigned prometheus.Counter + + queues *prometheus.Desc + workers *prometheus.Desc +} + +func (f *FastTaskService) Describe(ch chan<- *prometheus.Desc) { + ch <- f.metrics.queues + ch <- f.metrics.workers +} + +func (f *FastTaskService) Collect(ch chan<- prometheus.Metric) { + f.queuesLock.RLock() + defer f.queuesLock.RUnlock() + + queues := len(f.queues) + workers := 0 + for _, queue := range f.queues { + queue.lock.RLock() + workers += len(queue.workers) + queue.lock.RUnlock() + } + + ch <- prometheus.MustNewConstMetric(f.metrics.queues, prometheus.GaugeValue, float64(queues)) + ch <- prometheus.MustNewConstMetric(f.metrics.workers, prometheus.GaugeValue, float64(workers)) +} + // Heartbeat is a gRPC stream that manages the heartbeat of a fasttask worker. This includes // receiving task status updates and sending task assignments. func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error { @@ -173,6 +206,7 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam queue, exists := f.queues[queueID] if !exists { + f.metrics.taskNoWorkersAvailable.Inc() return "", nil // no workers available } @@ -198,10 +232,12 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam worker = acceptedWorkers[rand.Intn(len(acceptedWorkers))] worker.capacity.BacklogCount++ } else { + f.metrics.taskNoCapacityAvailable.Inc() return "", nil // no workers available } // send assign message to worker + f.metrics.taskAssigned.Inc() worker.responseChan <- &pb.HeartbeatResponse{ TaskId: taskID, Namespace: namespace, @@ -294,8 +330,18 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID // NewFastTaskService creates a new FastTaskService. func NewFastTaskService(enqueueOwner core.EnqueueOwner) *FastTaskService { - return &FastTaskService{ + scope := promutils.NewScope("fasttask") + svc := &FastTaskService{ enqueueOwner: enqueueOwner, queues: make(map[string]*Queue), + metrics: metrics{ + taskNoWorkersAvailable: scope.MustNewCounter("task_no_workers_available", "Count of task assignment attempts with no workers available"), + taskNoCapacityAvailable: scope.MustNewCounter("task_no_capacity_available", "Count of task assignment attempts with no capacity available"), + taskAssigned: scope.MustNewCounter("task_assigned", "Count of task assignments"), + queues: prometheus.NewDesc(scope.NewScopedMetricName("queue"), "Current number of queues", nil, nil), + workers: prometheus.NewDesc(scope.NewScopedMetricName("workers"), "Current number of workers", nil, nil), + }, } + prometheus.MustRegister(svc) + return svc }