Skip to content

Commit

Permalink
exporter/prometheusexporter: support Summary
Browse files Browse the repository at this point in the history
Allows Summary metrics to be exported to Prometheus.
To walk through this feature, I created an adaptation of a Brian Brazil
tutorial for DropWizard, to create a Java server at
https://github.com/odeke-em/bugs/tree/master/opentelemetry-collector/2661
and it uses DropWizard, and exports out JVM statistics with Prometheus where the
*gc_collection_seconds are of the summary kind when scraped by visiting
http://localhost:1234/metrics which produced

```
 # HELP jvm_gc_collection_seconds Time spent in a given JVM garbage collector in seconds.
 # TYPE jvm_gc_collection_seconds summary
 jvm_gc_collection_seconds_count{gc="G1 Young Generation",} 4.0
 jvm_gc_collection_seconds_sum{gc="G1 Young Generation",} 0.026
 jvm_gc_collection_seconds_count{gc="G1 Old Generation",} 0.0
```

and then roundtripped the collector to scrape those metrics and then
export them to Prometheus

    VS

making Prometheus directly scrape that endpoint and then comparing
results.

Fixes #2661
  • Loading branch information
odeke-em committed Apr 7, 2021
1 parent 8cac172 commit 5fa2c7d
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 0 deletions.
32 changes: 32 additions & 0 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,43 @@ func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.Instrumen
return a.accumulateIntHistogram(metric, il, now)
case pdata.MetricDataTypeHistogram:
return a.accumulateDoubleHistogram(metric, il, now)
case pdata.MetricDataTypeSummary:
return a.accumulateSummary(metric, il, now)
default:
a.logger.With(
zap.Any("data_type", metric.DataType()),
zap.Any("metric_name", metric.Name()),
).Error("failed to translate metric")
}

return 0
}

func (a *lastValueAccumulator) accumulateSummary(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())

v, ok := a.registeredMetrics.Load(signature)
stalePoint := ok &&
ip.Timestamp().AsTime().Before(v.(*accumulatedValue).value.Summary().DataPoints().At(0).Timestamp().AsTime())

if stalePoint {
// Only keep this datapoint if it has a later timestamp.
continue
}

mm := createMetric(metric)
mm.Summary().DataPoints().Append(ip)
a.registeredMetrics.Store(signature, &accumulatedValue{value: mm, instrumentationLibrary: il, updated: now})
n++
}

return n
}

func (a *lastValueAccumulator) accumulateIntGauge(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
dps := metric.IntGauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
Expand Down
36 changes: 36 additions & 0 deletions exporter/prometheusexporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func (c *collector) convertMetric(metric pdata.Metric) (prometheus.Metric, error
return c.convertIntHistogram(metric)
case pdata.MetricDataTypeHistogram:
return c.convertDoubleHistogram(metric)
case pdata.MetricDataTypeSummary:
return c.convertSummary(metric)
}

return nil, errUnknownMetricType
Expand Down Expand Up @@ -207,6 +209,40 @@ func (c *collector) convertIntHistogram(metric pdata.Metric) (prometheus.Metric,
return m, nil
}

func (c *collector) convertSummary(metric pdata.Metric) (prometheus.Metric, error) {
dataPoints := metric.Summary().DataPoints()

var cumSum float64
var cumCount uint64
quantiles := make(map[float64]float64)
labelsMap := pdata.NewStringMap()
for i := 0; i < dataPoints.Len(); i++ {
sdpi := dataPoints.At(i)
qv := sdpi.QuantileValues()
for j := 0; j < qv.Len(); j++ {
qvj := qv.At(j)
quantiles[qvj.Quantile()] += qvj.Value()
}
// Copy all the label map values.
sdpi.LabelsMap().ForEach(func(key, value string) {
labelsMap.Insert(key, value)
})
cumSum += sdpi.Sum()
cumCount += sdpi.Count()
}

desc, labelValues := c.getMetricMetadata(metric, labelsMap)
m, err := prometheus.NewConstSummary(desc, cumCount, cumSum, quantiles, labelValues...)
if err != nil {
return nil, err
}
if c.sendTimestamps {
ip := dataPoints.At(0)
return prometheus.NewMetricWithTimestamp(ip.Timestamp().AsTime(), m), nil
}
return m, nil
}

func (c *collector) convertDoubleHistogram(metric pdata.Metric) (prometheus.Metric, error) {
ip := metric.Histogram().DataPoints().At(0)
desc, labels := c.getMetricMetadata(metric, ip.LabelsMap())
Expand Down
114 changes: 114 additions & 0 deletions exporter/prometheusexporter/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,12 @@ func TestCollectMetrics(t *testing.T) {
require.Equal(t, tt.value, *pbMetric.Counter.Value)
require.Nil(t, pbMetric.Gauge)
require.Nil(t, pbMetric.Histogram)
require.Nil(t, pbMetric.Summary)
case prometheus.GaugeValue:
require.Equal(t, tt.value, *pbMetric.Gauge.Value)
require.Nil(t, pbMetric.Counter)
require.Nil(t, pbMetric.Histogram)
require.Nil(t, pbMetric.Summary)
}
}
require.Equal(t, 1, j)
Expand Down Expand Up @@ -488,3 +490,115 @@ func TestAccumulateHistograms(t *testing.T) {
}
}
}

func TestAccumulateSummary(t *testing.T) {
quantileValue := func(pN, value float64) pdata.ValueAtQuantile {
vqpN := pdata.NewValueAtQuantile()
vqpN.SetQuantile(pN)
vqpN.SetValue(value)
return vqpN
}
quantilesFromMap := func(qf map[float64]float64) (qL []*io_prometheus_client.Quantile) {
f64Ptr := func(v float64) *float64 { return &v }
for quantile, value := range qf {
qL = append(qL, &io_prometheus_client.Quantile{
Quantile: f64Ptr(quantile), Value: f64Ptr(value),
})
}
return qL
}
tests := []struct {
name string
metric func(time.Time) pdata.Metric
wantSum float64
wantCount uint64
wantQuantiles []*io_prometheus_client.Quantile
}{
{
name: "Summary with single point",
wantSum: 0.012,
wantCount: 10,
wantQuantiles: quantilesFromMap(map[float64]float64{
0.50: 190,
0.99: 817,
}),
metric: func(ts time.Time) (metric pdata.Metric) {
sp := pdata.NewSummaryDataPoint()
sp.SetCount(10)
sp.SetSum(0.012)
sp.SetCount(10)
sp.LabelsMap().Insert("label_1", "1")
sp.LabelsMap().Insert("label_2", "2")
sp.SetTimestamp(pdata.TimestampFromTime(ts))

sp.QuantileValues().Append(quantileValue(0.50, 190))
sp.QuantileValues().Append(quantileValue(0.99, 817))

metric = pdata.NewMetric()
metric.SetName("test_metric")
metric.SetDataType(pdata.MetricDataTypeSummary)
metric.Summary().DataPoints().Append(sp)
metric.SetDescription("test description")

return
},
},
}

for _, tt := range tests {
for _, sendTimestamp := range []bool{true, false} {
name := tt.name
if sendTimestamp {
name += "/WithTimestamp"
}
t.Run(name, func(t *testing.T) {
ts := time.Now()
metric := tt.metric(ts)
c := collector{
accumulator: &mockAccumulator{
[]pdata.Metric{metric},
},
sendTimestamps: sendTimestamp,
logger: zap.NewNop(),
}

ch := make(chan prometheus.Metric, 1)
go func() {
c.Collect(ch)
close(ch)
}()

n := 0
for m := range ch {
n++
require.Contains(t, m.Desc().String(), "fqName: \"test_metric\"")
require.Contains(t, m.Desc().String(), "variableLabels: [label_1 label_2]")

pbMetric := io_prometheus_client.Metric{}
m.Write(&pbMetric)

labelsKeys := map[string]string{"label_1": "1", "label_2": "2"}
for _, l := range pbMetric.Label {
require.Equal(t, labelsKeys[*l.Name], *l.Value)
}

if sendTimestamp {
require.Equal(t, ts.UnixNano()/1e6, *(pbMetric.TimestampMs))
} else {
require.Nil(t, pbMetric.TimestampMs)
}

require.Nil(t, pbMetric.Gauge)
require.Nil(t, pbMetric.Counter)
require.Nil(t, pbMetric.Histogram)

s := *pbMetric.Summary
require.Equal(t, tt.wantCount, *s.SampleCount)
require.Equal(t, tt.wantSum, *s.SampleSum)
require.Equal(t, tt.wantQuantiles, s.Quantile)
}
require.Equal(t, 1, n)
})
}
}
}

0 comments on commit 5fa2c7d

Please sign in to comment.