Skip to content

Commit

Permalink
Add support for V1 series endpoint
Browse files Browse the repository at this point in the history
Co-authored by: Jesus Vazquez <[email protected]>:
  • Loading branch information
carrieedwards committed Jul 17, 2024
1 parent 4010573 commit 83dd84e
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 6 deletions.
100 changes: 97 additions & 3 deletions receiver/datadogreceiver/metrics_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
import (
datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/pdata/pmetric"
"sync"

"go.opentelemetry.io/collector/component"
Expand All @@ -13,12 +15,104 @@ import (

type MetricsTranslator struct {
sync.RWMutex
buildInfo component.BuildInfo
lastTs map[identity.Stream]pcommon.Timestamp
buildInfo component.BuildInfo
lastTs map[identity.Stream]pcommon.Timestamp
stringPool *StringPool
}

func newMetricsTranslator() *MetricsTranslator {
return &MetricsTranslator{
lastTs: make(map[identity.Stream]pcommon.Timestamp),
lastTs: make(map[identity.Stream]pcommon.Timestamp),
stringPool: newStringPool(),
}
}

func (mt *MetricsTranslator) streamHasTimestamp(stream identity.Stream) (pcommon.Timestamp, bool) {
mt.RLock()
defer mt.RUnlock()
ts, ok := mt.lastTs[stream]
return ts, ok
}

func (mt *MetricsTranslator) updateLastTsForStream(stream identity.Stream, ts pcommon.Timestamp) {
mt.Lock()
defer mt.Unlock()
mt.lastTs[stream] = ts
}

const (
TypeGauge string = "gauge"
TypeRate string = "rate"
TypeCount string = "count"
)

type SeriesList struct {
Series []datadogV1.Series `json:"series"`
}

func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metrics {
bt := newBatcher()
bt.Metrics = pmetric.NewMetrics()

for _, serie := range series.Series {
var dps pmetric.NumberDataPointSlice

dimensions := parseSeriesProperties(serie.Metric, serie.GetType(), serie.GetTags(), serie.GetHost(), mt.buildInfo.Version, mt.stringPool)
metric, metricID := bt.Lookup(dimensions)

switch serie.GetType() {
case TypeCount:
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.Sum().SetIsMonotonic(false) // See https://docs.datadoghq.com/metrics/types/?tab=count#definition
dps = metric.Sum().DataPoints()
case TypeGauge:
dps = metric.Gauge().DataPoints()
case TypeRate:
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dps = metric.Sum().DataPoints()
default:
// Type is unset/unspecified
continue
}

dps.EnsureCapacity(len(serie.Points))

var dp pmetric.NumberDataPoint
var ts uint64
var value float64
// The Datadog API returns a slice of slices of points [][]*float64 which is a bit awkward to work with.
// It looks like this:
// points := [][]*float64{
// {&timestamp1, &value1},
// {&timestamp2, &value2},
// }
// We need to flatten this to a slice of *float64 to work with it. And we know that in that slice, the first
// element is the timestamp and the second is the value.
for _, points := range serie.Points {
if len(points) != 2 {
continue // The datapoint is missing a timestamp and/or value, so this point should be skipped
}
ts = uint64(*points[0])
value = *points[1]

dp = dps.AppendEmpty()
dp.SetTimestamp(pcommon.Timestamp(ts * 1_000_000_000)) // OTel uses nanoseconds, while Datadog uses seconds

if *serie.Type == TypeRate {
if serie.Interval.IsSet() {
value *= float64(serie.GetInterval())
}
}
dp.SetDoubleValue(value)
dimensions.dpAttrs.CopyTo(dp.Attributes())

stream := identity.OfStream(metricID, dp)
ts, ok := mt.streamHasTimestamp(stream)
if ok {
dp.SetStartTimestamp(ts)
}
mt.updateLastTsForStream(stream, dp.Timestamp())
}
}
return bt.Metrics
}
33 changes: 30 additions & 3 deletions receiver/datadogreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package datadogreceiver // import "github.com/open-telemetry/opentelemetry-colle

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
Expand Down Expand Up @@ -143,9 +145,34 @@ func (ddr *datadogReceiver) handleV1Series(w http.ResponseWriter, req *http.Requ
ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err)
}(&metricsCount)

err = fmt.Errorf("series v1 endpoint not implemented")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err))
buf := getBuffer()
defer putBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
ddr.params.Logger.Error(err.Error())
return
}

seriesList := SeriesList{}
err = json.Unmarshal(buf.Bytes(), &seriesList)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
ddr.params.Logger.Error(err.Error())
return
}

metrics := translateMetricsV1(seriesList, ddr.metricsTranslator)
metricsCount = metrics.DataPointCount()

err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err))
return
}

w.WriteHeader(http.StatusAccepted)
_, err = w.Write([]byte("OK"))
}

// handleV2Series handles the v2 series endpoint https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
Expand Down

0 comments on commit 83dd84e

Please sign in to comment.