Skip to content

Commit

Permalink
[receiver/prometheusreceiver] Exemplar Support (open-telemetry#14132)
Browse files Browse the repository at this point in the history
 Add exemplar support to Prometheus Receiver

Co-authored-by: Anthony Mirabella <[email protected]>
  • Loading branch information
2 people authored and shalper2 committed Dec 6, 2022
1 parent e7e9561 commit d79e2c7
Show file tree
Hide file tree
Showing 5 changed files with 574 additions and 105 deletions.
9 changes: 9 additions & 0 deletions .chloggen/append-exemplars-to-prometheus-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
change_type: enhancement

component: receiver/prometheusreceiver

note: Append exemplars to the metrics received by prometheus receiver

issues: [8353]

subtext: Acknowledge exemplars coming from prometheus receiver and append it to otel format
6 changes: 6 additions & 0 deletions receiver/prometheusreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ receivers:
interval: 30s
collector_id: collector-1
```
## Exemplars
This receiver accepts exemplars coming in Prometheus format and converts it to OTLP format.
1. Value is expected to be received in `float64` format
2. Timestamp is expected to be received in `ms`
3. Labels with key `span_id` in prometheus exemplars are set as OTLP `span id` and labels with key `trace_id` are set as `trace id`
4. Rest of the labels are copied as it is to OTLP format

[sc]: https://github.com/prometheus/prometheus/blob/v2.28.1/docs/configuration/configuration.md#scrape_config

Expand Down
82 changes: 79 additions & 3 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"

import (
"encoding/hex"
"fmt"
"sort"
"strings"

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
Expand All @@ -27,6 +29,11 @@ import (
"go.uber.org/zap"
)

const (
traceIDKey = "trace_id"
spanIDKey = "span_id"
)

type metricFamily struct {
mtype pmetric.MetricType
// isMonotonic only applies to sums
Expand All @@ -50,6 +57,7 @@ type metricGroup struct {
hasSum bool
value float64
complexValue []*dataPoint
exemplars pmetric.ExemplarSlice
}

func newMetricFamily(metricName string, mc scrape.MetricMetadataStore, logger *zap.Logger) *metricFamily {
Expand Down Expand Up @@ -141,6 +149,16 @@ func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice)
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
point.SetTimestamp(tsNanos)
populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
mg.setExemplars(point.Exemplars())
}

func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) {
if mg == nil {
return
}
if mg.exemplars.Len() > 0 {
mg.exemplars.MoveAndAppendTo(exemplars)
}
}

func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {
Expand Down Expand Up @@ -201,6 +219,7 @@ func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {
point.SetDoubleValue(mg.value)
}
populateAttributes(pmetric.MetricTypeGauge, mg.ls, point.Attributes())
mg.setExemplars(point.Exemplars())
}

func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon.Map) {
Expand All @@ -226,9 +245,10 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Label
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroup{
family: mf,
ts: ts,
ls: ls,
family: mf,
ts: ts,
ls: ls,
exemplars: pmetric.NewExemplarSlice(),
}
mf.groups[groupKey] = mg
// maintaining data insertion order is helpful to generate stable/reproducible metric output
Expand Down Expand Up @@ -319,3 +339,59 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice) {

metric.MoveTo(metrics.AppendEmpty())
}

func (mf *metricFamily) addExemplar(l labels.Labels, e exemplar.Exemplar) {
gk := mf.getGroupKey(l)
mg := mf.groups[gk]
if mg == nil {
return
}
es := mg.exemplars
convertExemplar(e, es.AppendEmpty())
}

func convertExemplar(pe exemplar.Exemplar, e pmetric.Exemplar) {
e.SetTimestamp(timestampFromMs(pe.Ts))
e.SetDoubleValue(pe.Value)
e.FilteredAttributes().EnsureCapacity(len(pe.Labels))
for _, lb := range pe.Labels {
switch strings.ToLower(lb.Name) {
case traceIDKey:
var tid [16]byte
err := decodeAndCopyToLowerBytes(tid[:], []byte(lb.Value))
if err == nil {
e.SetTraceID(tid)
} else {
e.FilteredAttributes().PutString(lb.Name, lb.Value)
}
case spanIDKey:
var sid [8]byte
err := decodeAndCopyToLowerBytes(sid[:], []byte(lb.Value))
if err == nil {
e.SetSpanID(sid)
} else {
e.FilteredAttributes().PutString(lb.Name, lb.Value)
}
default:
e.FilteredAttributes().PutString(lb.Name, lb.Value)
}
}
}

/*
decodeAndCopyToLowerBytes copies src to dst on lower bytes instead of higher
1. If len(src) > len(dst) -> copy first len(dst) bytes as it is. Example -> src = []byte{0xab,0xcd,0xef,0xgh,0xij}, dst = [2]byte, result dst = [2]byte{0xab, 0xcd}
2. If len(src) = len(dst) -> copy src to dst as it is
3. If len(src) < len(dst) -> prepend required 0s and then add src to dst. Example -> src = []byte{0xab, 0xcd}, dst = [8]byte, result dst = [8]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd}
*/
func decodeAndCopyToLowerBytes(dst []byte, src []byte) error {
var err error
decodedLen := hex.DecodedLen(len(src))
if decodedLen >= len(dst) {
_, err = hex.Decode(dst, src[:hex.EncodedLen(len(dst))])
} else {
_, err = hex.Decode(dst[len(dst)-decodedLen:], src)
}
return err
}
47 changes: 39 additions & 8 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,52 @@ func (t *transaction) Append(ref storage.SeriesRef, ls labels.Labels, atMs int64
return 0, t.AddTargetInfo(ls)
}

curMF, ok := t.families[metricName]
curMF := t.getOrCreateMetricFamily(metricName)

return 0, curMF.Add(metricName, ls, atMs, val)
}

func (t *transaction) getOrCreateMetricFamily(mn string) *metricFamily {
curMf, ok := t.families[mn]
if !ok {
familyName := normalizeMetricName(metricName)
if mf, ok := t.families[familyName]; ok && mf.includesMetric(metricName) {
curMF = mf
fn := normalizeMetricName(mn)
if mf, ok := t.families[fn]; ok && mf.includesMetric(mn) {
curMf = mf
} else {
curMF = newMetricFamily(metricName, t.mc, t.logger)
t.families[curMF.name] = curMF
curMf = newMetricFamily(mn, t.mc, t.logger)
t.families[curMf.name] = curMf
}
}

return 0, curMF.Add(metricName, ls, atMs, val)
return curMf
}

func (t *transaction) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
select {
case <-t.ctx.Done():
return 0, errTransactionAborted
default:
}

if t.isNew {
if err := t.initTransaction(l); err != nil {
return 0, err
}
}

l = l.WithoutEmpty()

if dupLabel, hasDup := l.HasDuplicateLabelNames(); hasDup {
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
}

mn := l.Get(model.MetricNameLabel)
if mn == "" {
return 0, errMetricNameNotFound
}

mf := t.getOrCreateMetricFamily(mn)
mf.addExemplar(l, e)

return 0, nil
}

Expand Down
Loading

0 comments on commit d79e2c7

Please sign in to comment.