Skip to content

Commit

Permalink
Add metric for pending workloads, broken down by queue and cluster_queue
Browse files Browse the repository at this point in the history
Change-Id: I46a509a2612597004483cb8c90bb76dcfe95742d
  • Loading branch information
alculquicondor committed Apr 29, 2022
1 parent 7cb7d69 commit 04e0e93
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 7 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
k8s.io/api v0.23.4
k8s.io/apimachinery v0.23.4
k8s.io/client-go v0.23.4
k8s.io/component-base v0.23.3
k8s.io/component-helpers v0.23.4
k8s.io/klog/v2 v2.40.1
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
Expand All @@ -27,6 +28,7 @@ require (
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
Expand Down Expand Up @@ -66,7 +68,6 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/apiextensions-apiserver v0.23.3 // indirect
k8s.io/component-base v0.23.3 // indirect
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/workload/job"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/scheduler"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -94,6 +95,7 @@ func main() {
}
setupLog.Info("Successfully loaded config file", "config", cfgStr)
}
metrics.Register()

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options)
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,21 @@ var (
Help: "Latency of an admission attempt, broken down by result.",
}, []string{"result"},
)

PendingWorkloads = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: subsystemName,
Name: "pending_workloads",
Help: "Number of pending workloads, per queue and cluster_queue.",
}, []string{"queue", "cluster_queue"})
)

func AdmissionAttempt(result AdmissionResult, duration time.Duration) {
admissionAttempts.WithLabelValues(string(result)).Inc()
admissionAttemptLatency.WithLabelValues(string(result)).Observe(duration.Seconds())
}

func init() {
func Register() {
metrics.Registry.MustRegister(
admissionAttempts,
admissionAttemptLatency,
Expand Down
8 changes: 8 additions & 0 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error {
if cq != nil && cq.AddFromQueue(qImpl) {
m.cond.Broadcast()
}
qImpl.reportPendingWorkloads()
return nil
}

Expand All @@ -189,6 +190,7 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error {
return errQueueDoesNotExist
}
if qImpl.ClusterQueue != string(q.Spec.ClusterQueue) {
qImpl.resetPendingWorkloads()
oldCQ := m.clusterQueues[qImpl.ClusterQueue]
if oldCQ != nil {
oldCQ.DeleteFromQueue(qImpl)
Expand All @@ -199,6 +201,7 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error {
}
}
qImpl.update(q)
qImpl.reportPendingWorkloads()
return nil
}

Expand All @@ -215,6 +218,7 @@ func (m *Manager) DeleteQueue(q *kueue.Queue) {
cq.DeleteFromQueue(qImpl)
}
delete(m.queues, key)
qImpl.resetPendingWorkloads()
}

func (m *Manager) PendingWorkloads(q *kueue.Queue) (int32, error) {
Expand Down Expand Up @@ -272,6 +276,7 @@ func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool {
return false
}
q.AddOrUpdate(w)
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
Expand Down Expand Up @@ -301,6 +306,7 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, imme
}

q.AddIfNotPresent(info)
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
Expand All @@ -325,6 +331,7 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey
return
}
delete(q.items, workload.Key(w))
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq != nil {
cq.Delete(w)
Expand Down Expand Up @@ -450,6 +457,7 @@ func (m *Manager) heads() []workload.Info {
workloads = append(workloads, wlCopy)
q := m.queues[queueKeyForWorkload(wl.Obj)]
delete(q.items, workload.Key(wl.Obj))
q.reportPendingWorkloads()
}
return workloads
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand All @@ -40,13 +41,15 @@ func queueKeyForWorkload(w *kueue.Workload) string {

// Queue is the internal implementation of kueue.Queue.
type Queue struct {
Key string
ClusterQueue string

items map[string]*workload.Info
}

func newQueue(q *kueue.Queue) *Queue {
qImpl := &Queue{
Key: Key(q),
items: make(map[string]*workload.Info),
}
qImpl.update(q)
Expand All @@ -71,3 +74,11 @@ func (q *Queue) AddIfNotPresent(w *workload.Info) bool {
}
return false
}

func (q *Queue) reportPendingWorkloads() {
metrics.PendingWorkloads.WithLabelValues(q.Key, q.ClusterQueue).Set(float64(len(q.items)))
}

func (q *Queue) resetPendingWorkloads() {
metrics.PendingWorkloads.DeleteLabelValues(q.Key, q.ClusterQueue)
}
4 changes: 3 additions & 1 deletion test/integration/controller/core/queue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/test/integration/framework"
Expand Down Expand Up @@ -82,6 +82,7 @@ var _ = ginkgo.Describe("Queue controller", func() {
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.QueueStatus{PendingWorkloads: 3}))
framework.ExpendPendingWorkloadsMetric(queue, 3)

ginkgo.By("Admitting workloads")
for _, w := range workloads {
Expand All @@ -98,6 +99,7 @@ var _ = ginkgo.Describe("Queue controller", func() {
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.QueueStatus{PendingWorkloads: 0}))
framework.ExpendPendingWorkloadsMetric(queue, 0)

ginkgo.By("Finishing workloads")
for _, w := range workloads {
Expand Down
13 changes: 13 additions & 0 deletions test/integration/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/component-base/metrics/testutil"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/workload"
// +kubebuilder:scaffold:imports
)
Expand All @@ -56,6 +59,7 @@ type Framework struct {

func (f *Framework) Setup() (context.Context, *rest.Config, client.Client) {
ctrl.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true), zap.Level(zapcore.Level(-3))))
metrics.Register()

ginkgo.By("bootstrapping test environment")
f.testEnv = &envtest.Environment{
Expand Down Expand Up @@ -215,6 +219,15 @@ func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wl
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are pending")
}

func ExpendPendingWorkloadsMetric(q *kueue.Queue, v int) {
metric := metrics.PendingWorkloads.WithLabelValues(queue.Key(q), string(q.Spec.ClusterQueue))
gomega.EventuallyWithOffset(1, func() int {
v, err := testutil.GetGaugeMetricValue(metric)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return int(v)
}, Timeout, Interval).Should(gomega.Equal(v))
}

func UpdateWorkloadStatus(ctx context.Context, k8sClient client.Client, wl *kueue.Workload, update func(*kueue.Workload)) {
gomega.EventuallyWithOffset(1, func() error {
var updatedWl kueue.Workload
Expand Down
Loading

0 comments on commit 04e0e93

Please sign in to comment.