diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index 656850bbad..add8c63cef 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -8,10 +8,12 @@ import ( "math/rand" "sync" + "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/types" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/unionai/flyte/fasttask/plugin/pb" ) @@ -24,6 +26,7 @@ type FastTaskService struct { queues map[string]*Queue queuesLock sync.RWMutex taskStatusChannels sync.Map // map[string]chan *WorkerTaskStatus + metrics metrics } // Queue is a collection of Workers that are capable of executing similar tasks. @@ -45,6 +48,36 @@ type workerTaskStatus struct { taskStatus *pb.TaskStatus } +type metrics struct { + taskNoWorkersAvailable prometheus.Counter + taskNoCapacityAvailable prometheus.Counter + taskAssigned prometheus.Counter + + queues *prometheus.Desc + workers *prometheus.Desc +} + +func (f *FastTaskService) Describe(ch chan<- *prometheus.Desc) { + ch <- f.metrics.queues + ch <- f.metrics.workers +} + +func (f *FastTaskService) Collect(ch chan<- prometheus.Metric) { + f.queuesLock.RLock() + defer f.queuesLock.RUnlock() + + queues := len(f.queues) + workers := 0 + for _, queue := range f.queues { + queue.lock.RLock() + workers += len(queue.workers) + queue.lock.RUnlock() + } + + ch <- prometheus.MustNewConstMetric(f.metrics.queues, prometheus.GaugeValue, float64(queues)) + ch <- prometheus.MustNewConstMetric(f.metrics.workers, prometheus.GaugeValue, float64(workers)) +} + // Heartbeat is a gRPC stream that manages the heartbeat of a fasttask worker. This includes // receiving task status updates and sending task assignments. func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error { @@ -173,6 +206,7 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam queue, exists := f.queues[queueID] if !exists { + f.metrics.taskNoWorkersAvailable.Inc() return "", nil // no workers available } @@ -198,10 +232,12 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam worker = acceptedWorkers[rand.Intn(len(acceptedWorkers))] worker.capacity.BacklogCount++ } else { + f.metrics.taskNoCapacityAvailable.Inc() return "", nil // no workers available } // send assign message to worker + f.metrics.taskAssigned.Inc() worker.responseChan <- &pb.HeartbeatResponse{ TaskId: taskID, Namespace: namespace, @@ -294,8 +330,18 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID // NewFastTaskService creates a new FastTaskService. func NewFastTaskService(enqueueOwner core.EnqueueOwner) *FastTaskService { - return &FastTaskService{ + scope := promutils.NewScope("fasttask") + svc := &FastTaskService{ enqueueOwner: enqueueOwner, queues: make(map[string]*Queue), + metrics: metrics{ + taskNoWorkersAvailable: scope.MustNewCounter("task_no_workers_available", "Count of task assignment attempts with no workers available"), + taskNoCapacityAvailable: scope.MustNewCounter("task_no_capacity_available", "Count of task assignment attempts with no capacity available"), + taskAssigned: scope.MustNewCounter("task_assigned", "Count of task assignments"), + queues: prometheus.NewDesc(scope.NewScopedMetricName("queue"), "Current number of queues", nil, nil), + workers: prometheus.NewDesc(scope.NewScopedMetricName("workers"), "Current number of workers", nil, nil), + }, } + prometheus.MustRegister(svc) + return svc }