Skip to content

Commit

Permalink
prometheus: check return value from CheckpointSet.ForEach (#622)
Browse files Browse the repository at this point in the history
This PR modifies prometheus.Collect to reflect the change introduced
by #557.

The `export{Counter,Histogram,LastValue,Summary}` methods now all return
an error instead of calling the error callback directly.
The callback is now only called on the returned error from `ForEach`.

fixes #563

Co-authored-by: Rahul Patel <[email protected]>
  • Loading branch information
oncilla and rghetia authored Apr 7, 2020
1 parent c8ec530 commit 6005d01
Showing 1 changed file with 26 additions and 29 deletions.
55 changes: 26 additions & 29 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,14 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
return
}

_ = c.exp.snapshot.ForEach(func(record export.Record) error {
err := c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
labels := labelValues(record.Labels())
desc := c.toDesc(&record)

if hist, ok := agg.(aggregator.Histogram); ok {
c.exportHistogram(ch, hist, numberKind, desc, labels)
return c.exportHistogram(ch, hist, numberKind, desc, labels)
} else if dist, ok := agg.(aggregator.Distribution); ok {
// TODO: summaries values are never being resetted.
// As measures are recorded, new records starts to have less impact on these summaries.
Expand All @@ -221,60 +221,59 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
// References:
// https://www.robustperception.io/how-does-a-prometheus-summary-work
// https://github.com/prometheus/client_golang/blob/fa4aa9000d2863904891d193dea354d23f3d712a/prometheus/summary.go#L135
c.exportSummary(ch, dist, numberKind, desc, labels)
return c.exportSummary(ch, dist, numberKind, desc, labels)
} else if sum, ok := agg.(aggregator.Sum); ok {
c.exportCounter(ch, sum, numberKind, desc, labels)
return c.exportCounter(ch, sum, numberKind, desc, labels)
} else if lastValue, ok := agg.(aggregator.LastValue); ok {
c.exportLastValue(ch, lastValue, numberKind, desc, labels)
return c.exportLastValue(ch, lastValue, numberKind, desc, labels)
}
return nil
})
if err != nil {
c.exp.onError(err)
}
}

func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregator.LastValue, kind core.NumberKind, desc *prometheus.Desc, labels []string) {
func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregator.LastValue, kind core.NumberKind, desc *prometheus.Desc, labels []string) error {
lv, _, err := lvagg.LastValue()
if err != nil {
c.exp.onError(err)
return
return err
}

m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, lv.CoerceToFloat64(kind), labels...)
if err != nil {
c.exp.onError(err)
return
return err
}

ch <- m
return nil
}

func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Sum, kind core.NumberKind, desc *prometheus.Desc, labels []string) {
func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Sum, kind core.NumberKind, desc *prometheus.Desc, labels []string) error {
v, err := sum.Sum()
if err != nil {
c.exp.onError(err)
return
return err
}

m, err := prometheus.NewConstMetric(desc, prometheus.CounterValue, v.CoerceToFloat64(kind), labels...)
if err != nil {
c.exp.onError(err)
return
return err
}

ch <- m
return nil
}

func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.Distribution, kind core.NumberKind, desc *prometheus.Desc, labels []string) {
func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.Distribution, kind core.NumberKind, desc *prometheus.Desc, labels []string) error {
count, err := dist.Count()
if err != nil {
c.exp.onError(err)
return
return err
}

var sum core.Number
sum, err = dist.Sum()
if err != nil {
c.exp.onError(err)
return
return err
}

quantiles := make(map[float64]float64)
Expand All @@ -285,23 +284,21 @@ func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.D

m, err := prometheus.NewConstSummary(desc, uint64(count), sum.CoerceToFloat64(kind), quantiles, labels...)
if err != nil {
c.exp.onError(err)
return
return err
}

ch <- m
return nil
}

func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator.Histogram, kind core.NumberKind, desc *prometheus.Desc, labels []string) {
func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator.Histogram, kind core.NumberKind, desc *prometheus.Desc, labels []string) error {
buckets, err := hist.Histogram()
if err != nil {
c.exp.onError(err)
return
return err
}
sum, err := hist.Sum()
if err != nil {
c.exp.onError(err)
return
return err
}

var totalCount uint64
Expand All @@ -318,11 +315,11 @@ func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator

m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, labels...)
if err != nil {
c.exp.onError(err)
return
return err
}

ch <- m
return nil
}

func (c *collector) toDesc(record *export.Record) *prometheus.Desc {
Expand Down

0 comments on commit 6005d01

Please sign in to comment.