Skip to content

Commit

Permalink
[ws-manager] Provide subscription queue metric
Browse files Browse the repository at this point in the history
A gauge vector reporting the fill levels of all subscriber queues.
  • Loading branch information
Christian Weichel committed Feb 4, 2021
1 parent fd46a83 commit 4c03861
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions components/ws-manager/pkg/manager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (m *metrics) Register(reg prometheus.Registerer) error {
newPhaseTotalVec(m.manager),
newWorkspaceActivityVec(m.manager),
newTimeoutSettingsVec(m.manager),
newSubscriberQueueLevelVec(m.manager),
m.totalStartsCounterVec,
m.totalStopsCounterVec,
}
Expand Down Expand Up @@ -360,3 +361,47 @@ func (vec *timeoutSettingsVec) Collect(ch chan<- prometheus.Metric) {
ch <- metric
}
}

// subscriberQueueLevelVec provides a gauge of the current subscriber queue fill levels.
type subscriberQueueLevelVec struct {
name string
manager *Manager
desc *prometheus.Desc
}

func newSubscriberQueueLevelVec(m *Manager) *timeoutSettingsVec {
name := prometheus.BuildFQName(metricsNamespace, metricsWorkspaceSubsystem, "subscriber_queue_level")
desc := prometheus.NewDesc(
name,
"Current fill levels of the subscriber queues",
[]string{"client"},
prometheus.Labels(map[string]string{}),
)
return &timeoutSettingsVec{
name: name,
manager: m,
desc: desc,
}
}

// Describe implements Collector. It will send exactly one Desc to the provided channel.
func (vec *subscriberQueueLevelVec) Describe(ch chan<- *prometheus.Desc) {
ch <- vec.desc
}

// Collect implements Collector.
func (vec *subscriberQueueLevelVec) Collect(ch chan<- prometheus.Metric) {
vec.manager.subscriberLock.RLock()
defer vec.manager.subscriberLock.RUnlock()

for key, queue := range vec.manager.subscribers {
// metrics cannot be re-used, we have to create them every single time
metric, err := prometheus.NewConstMetric(vec.desc, prometheus.GaugeValue, float64(len(queue)), key)
if err != nil {
log.WithError(err).Warnf("cannot create workspace metric - %s will be inaccurate", vec.name)
continue
}

ch <- metric
}
}

0 comments on commit 4c03861

Please sign in to comment.