Skip to content

Commit

Permalink
Refactoring of batching and v1 series translation code
Browse files Browse the repository at this point in the history
  • Loading branch information
carrieedwards committed Jul 17, 2024
1 parent c8f8665 commit 9f9d651
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 33 deletions.
24 changes: 12 additions & 12 deletions receiver/datadogreceiver/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Batcher struct {

func newBatcher() Batcher {
return Batcher{
Metrics: pmetric.NewMetrics(),
resourceMetrics: make(map[identity.Resource]pmetric.ResourceMetrics),
scopeMetrics: make(map[identity.Scope]pmetric.ScopeMetrics),
metrics: make(map[identity.Metric]pmetric.Metric),
Expand Down Expand Up @@ -59,7 +60,7 @@ func parseSeriesProperties(name string, metricType string, tags []string, host s
}

func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
resource := getResource(dim.resourceAttrs)
resource := dim.Resource()
resourceID := identity.OfResource(resource)
resourceMetrics, ok := b.resourceMetrics[resourceID]
if !ok {
Expand All @@ -68,7 +69,7 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
b.resourceMetrics[resourceID] = resourceMetrics
}

scope := getScope(dim.scopeAttrs, dim.buildInfo)
scope := dim.Scope()
scopeID := identity.OfScope(resourceID, scope)
scopeMetrics, ok := b.scopeMetrics[scopeID]
if !ok {
Expand All @@ -77,7 +78,7 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
b.scopeMetrics[scopeID] = scopeMetrics
}

m := getMetric(dim)
m := dim.Metric()
metricID := identity.OfMetric(scopeID, m)
metric, ok := b.metrics[metricID]
if !ok {
Expand All @@ -89,25 +90,24 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
return metric, metricID
}

func getResource(attrs pcommon.Map) pcommon.Resource {
func (d Dimensions) Resource() pcommon.Resource {
resource := pcommon.NewResource()
attrs.CopyTo(resource.Attributes()) // TODO(jesus.vazquez) review this copy
d.resourceAttrs.CopyTo(resource.Attributes()) // TODO(jesus.vazquez) review this copy
return resource
}

func getScope(attrs pcommon.Map, version string) pcommon.InstrumentationScope {
func (d Dimensions) Scope() pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName("otelcol/datadogreceiver")
scope.SetVersion(version)
attrs.CopyTo(scope.Attributes())
scope.SetVersion(d.buildInfo)
d.scopeAttrs.CopyTo(scope.Attributes())
return scope
}

func getMetric(dim Dimensions) pmetric.Metric {
func (d Dimensions) Metric() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(dim.name)
metric.Type()
switch dim.metricType {
metric.SetName(d.name)
switch d.metricType {
case pmetric.MetricTypeSum:
metric.SetEmptySum()
case pmetric.MetricTypeGauge:
Expand Down
2 changes: 1 addition & 1 deletion receiver/datadogreceiver/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestMetricBatcher(t *testing.T) {
Description: "OpenTelemetry Collector",
Version: "latest",
}
result := translateMetricsV1(tt.series, mt)
result := mt.translateMetricsV1(tt.series)

tt.expect(t, result)
})
Expand Down
18 changes: 9 additions & 9 deletions receiver/datadogreceiver/metrics_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"

import (
datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/pdata/pmetric"
"sync"
"time"

datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)
Expand Down Expand Up @@ -50,9 +52,8 @@ type SeriesList struct {
Series []datadogV1.Series `json:"series"`
}

func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metrics {
func (mt *MetricsTranslator) translateMetricsV1(series SeriesList) pmetric.Metrics {
bt := newBatcher()
bt.Metrics = pmetric.NewMetrics()

for _, serie := range series.Series {
var dps pmetric.NumberDataPointSlice
Expand All @@ -78,7 +79,7 @@ func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metric
dps.EnsureCapacity(len(serie.Points))

var dp pmetric.NumberDataPoint
var ts uint64
var ts int64
var value float64
// The Datadog API returns a slice of slices of points [][]*float64 which is a bit awkward to work with.
// It looks like this:
Expand All @@ -92,11 +93,11 @@ func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metric
if len(points) != 2 {
continue // The datapoint is missing a timestamp and/or value, so this point should be skipped
}
ts = uint64(*points[0])
ts = int64(*points[0])
value = *points[1]

dp = dps.AppendEmpty()
dp.SetTimestamp(pcommon.Timestamp(ts * 1_000_000_000)) // OTel uses nanoseconds, while Datadog uses seconds
dp.SetTimestamp(pcommon.Timestamp(ts * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds

if *serie.Type == TypeRate {
if serie.Interval.IsSet() {
Expand All @@ -107,8 +108,7 @@ func translateMetricsV1(series SeriesList, mt *MetricsTranslator) pmetric.Metric
dimensions.dpAttrs.CopyTo(dp.Attributes())

stream := identity.OfStream(metricID, dp)
ts, ok := mt.streamHasTimestamp(stream)
if ok {
if ts, ok := mt.streamHasTimestamp(stream); ok {
dp.SetStartTimestamp(ts)
}
mt.updateLastTsForStream(stream, dp.Timestamp())
Expand Down
2 changes: 1 addition & 1 deletion receiver/datadogreceiver/metrics_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestTranslateMetricsV1(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mt := createMetricsTranslator()
result := translateMetricsV1(tt.series, mt)
result := mt.translateMetricsV1(tt.series)

tt.expect(t, result)
})
Expand Down
2 changes: 1 addition & 1 deletion receiver/datadogreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (ddr *datadogReceiver) handleV1Series(w http.ResponseWriter, req *http.Requ
return
}

metrics := translateMetricsV1(seriesList, ddr.metricsTranslator)
metrics := ddr.metricsTranslator.translateMetricsV1(seriesList)
metricsCount = metrics.DataPointCount()

err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics)
Expand Down
9 changes: 5 additions & 4 deletions receiver/datadogreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
"io"
"net/http"
"strings"
"testing"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -114,7 +115,7 @@ func TestDatadogMetricsV1_EndToEnd(t *testing.T) {

dd, err := newDataDogReceiver(
cfg,
receivertest.NewNopCreateSettings(),
receivertest.NewNopSettings(),
)
require.NoError(t, err, "Must not error when creating receiver")
dd.(*datadogReceiver).nextMetricsConsumer = sink
Expand Down
5 changes: 0 additions & 5 deletions receiver/datadogreceiver/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ func requireScope(t *testing.T, result pmetric.Metrics, expectedAttrs pcommon.Ma
require.Equal(t, expectedAttrs, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes())
}

func requireMetricAndDataPointCounts(t *testing.T, result pmetric.Metrics, expectedMetricCount, expectedDpCount int) {
require.Equal(t, expectedMetricCount, result.MetricCount())
require.Equal(t, expectedDpCount, result.DataPointCount())
}

func requireSum(t *testing.T, metric pmetric.Metric, expectedName string, expectedAggregationTemporality pmetric.AggregationTemporality, expectedDpsLen int) {
require.Equal(t, expectedName, metric.Name())
require.Equal(t, pmetric.MetricTypeSum, metric.Type())
Expand Down

0 comments on commit 9f9d651

Please sign in to comment.