Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] [receiver/datadog] Refactor translation files into internal package #34160

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both Batcher and DImensions could be unexported.

Expand All @@ -10,16 +10,16 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type Batcher struct {
type batcher struct {
pmetric.Metrics

resourceMetrics map[identity.Resource]pmetric.ResourceMetrics
scopeMetrics map[identity.Scope]pmetric.ScopeMetrics
metrics map[identity.Metric]pmetric.Metric
}

func newBatcher() Batcher {
return Batcher{
func newBatcher() batcher {
return batcher{
Metrics: pmetric.NewMetrics(),
resourceMetrics: make(map[identity.Resource]pmetric.ResourceMetrics),
scopeMetrics: make(map[identity.Scope]pmetric.ScopeMetrics),
Expand All @@ -30,7 +30,7 @@ func newBatcher() Batcher {
// Dimensions stores the properties of the series that are needed in order
// to unique identify the series. This is needed in order to batch metrics by
// resource, scope, and datapoint attributes
type Dimensions struct {
type dimensions struct {
name string
metricType pmetric.MetricType
resourceAttrs pcommon.Map
Expand All @@ -47,9 +47,9 @@ var metricTypeMap = map[string]pmetric.MetricType{
"sketch": pmetric.MetricTypeExponentialHistogram,
}

func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) Dimensions {
func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) dimensions {
resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(tags, host, stringPool)
return Dimensions{
return dimensions{
name: name,
metricType: metricTypeMap[metricType],
buildInfo: version,
Expand All @@ -59,7 +59,7 @@ func parseSeriesProperties(name string, metricType string, tags []string, host s
}
}

func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
func (b batcher) Lookup(dim dimensions) (pmetric.Metric, identity.Metric) {
resource := dim.Resource()
resourceID := identity.OfResource(resource)
resourceMetrics, ok := b.resourceMetrics[resourceID]
Expand Down Expand Up @@ -90,21 +90,21 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
return metric, metricID
}

func (d Dimensions) Resource() pcommon.Resource {
func (d dimensions) Resource() pcommon.Resource {
resource := pcommon.NewResource()
d.resourceAttrs.CopyTo(resource.Attributes()) // TODO(jesus.vazquez) review this copy
return resource
}

func (d Dimensions) Scope() pcommon.InstrumentationScope {
func (d dimensions) Scope() pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName("otelcol/datadogreceiver")
scope.SetVersion(d.buildInfo)
d.scopeAttrs.CopyTo(scope.Attributes())
return scope
}

func (d Dimensions) Metric() pmetric.Metric {
func (d dimensions) Metric() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(d.name)
switch d.metricType {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver
package translator

import (
"testing"

"github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand Down Expand Up @@ -275,13 +274,8 @@ func TestMetricBatcher(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mt := newMetricsTranslator()
mt.buildInfo = component.BuildInfo{
Command: "otelcol",
Description: "OpenTelemetry Collector",
Version: "latest",
}
result := mt.translateMetricsV1(tt.series)
mt := createMetricsTranslator()
result := mt.TranslateSeriesV1(tt.series)

tt.expect(t, result)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type MetricsTranslator struct {
sync.RWMutex
buildInfo component.BuildInfo
lastTs map[identity.Stream]pcommon.Timestamp
stringPool *StringPool
}

func NewMetricsTranslator(buildInfo component.BuildInfo) *MetricsTranslator {
return &MetricsTranslator{
buildInfo: buildInfo,
lastTs: make(map[identity.Stream]pcommon.Timestamp),
stringPool: newStringPool(),
}
}

func (mt *MetricsTranslator) streamHasTimestamp(stream identity.Stream) (pcommon.Timestamp, bool) {
mt.RLock()
defer mt.RUnlock()
ts, ok := mt.lastTs[stream]
return ts, ok
}

func (mt *MetricsTranslator) updateLastTsForStream(stream identity.Stream, ts pcommon.Timestamp) {
mt.Lock()
defer mt.Unlock()
mt.lastTs[stream] = ts
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"sync"
"time"

datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type MetricsTranslator struct {
sync.RWMutex
buildInfo component.BuildInfo
lastTs map[identity.Stream]pcommon.Timestamp
stringPool *StringPool
}

func newMetricsTranslator() *MetricsTranslator {
return &MetricsTranslator{
lastTs: make(map[identity.Stream]pcommon.Timestamp),
stringPool: newStringPool(),
}
}

func (mt *MetricsTranslator) streamHasTimestamp(stream identity.Stream) (pcommon.Timestamp, bool) {
mt.RLock()
defer mt.RUnlock()
ts, ok := mt.lastTs[stream]
return ts, ok
}

func (mt *MetricsTranslator) updateLastTsForStream(stream identity.Stream, ts pcommon.Timestamp) {
mt.Lock()
defer mt.Unlock()
mt.lastTs[stream] = ts
}

const (
TypeGauge string = "gauge"
TypeRate string = "rate"
Expand All @@ -52,7 +23,7 @@ type SeriesList struct {
Series []datadogV1.Series `json:"series"`
}

func (mt *MetricsTranslator) translateMetricsV1(series SeriesList) pmetric.Metrics {
func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metrics {
bt := newBatcher()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over time the same batchers will be created again and again. This is not for this PR but I wonder if an improvement for the receiver would be to have a pool of the maps we've seen in the past and reuse them instead of reallocating them on every request handling.


for _, serie := range series.Series {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver
package translator

import (
"testing"
Expand All @@ -28,7 +28,7 @@ func testPointsToDatadogPoints(points []testPoint) [][]*float64 {

}

func TestTranslateMetricsV1(t *testing.T) {
func TestTranslateSeriesV1(t *testing.T) {
tests := []struct {
name string

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestTranslateMetricsV1(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mt := createMetricsTranslator()
result := mt.translateMetricsV1(tt.series)
result := mt.TranslateSeriesV1(tt.series)

tt.expect(t, result)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver
package translator

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"testing"
Expand All @@ -13,12 +13,11 @@ import (
)

func createMetricsTranslator() *MetricsTranslator {
mt := newMetricsTranslator()
mt.buildInfo = component.BuildInfo{
mt := NewMetricsTranslator(component.BuildInfo{
Command: "otelcol",
Description: "OpenTelemetry Collector",
Version: "latest",
}
})
return mt
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"bytes"
Expand Down Expand Up @@ -51,7 +51,7 @@ func upsertHeadersAttributes(req *http.Request, attrs pcommon.Map) {
}
}

func toTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
func ToTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
var traces pb.Traces
for _, p := range payload.GetChunks() {
traces = append(traces, p.GetSpans())
Expand Down Expand Up @@ -161,17 +161,17 @@ var bufferPool = sync.Pool{
},
}

func getBuffer() *bytes.Buffer {
func GetBuffer() *bytes.Buffer {
buffer := bufferPool.Get().(*bytes.Buffer)
buffer.Reset()
return buffer
}

func putBuffer(buffer *bytes.Buffer) {
func PutBuffer(buffer *bytes.Buffer) {
bufferPool.Put(buffer)
}

func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error) {
func HandleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error) {
var tracerPayloads []*pb.TracerPayload

defer func() {
Expand All @@ -181,8 +181,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)

switch {
case strings.HasPrefix(req.URL.Path, "/v0.7"):
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}
Expand All @@ -193,8 +193,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)

tracerPayloads = append(tracerPayloads, &tracerPayload)
case strings.HasPrefix(req.URL.Path, "/v0.5"):
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}
Expand Down Expand Up @@ -229,8 +229,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)
}
tracerPayloads = append(tracerPayloads, tracerPayload)
case strings.HasPrefix(req.URL.Path, "/api/v0.2"):
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}
Expand Down Expand Up @@ -265,8 +265,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)
func decodeRequest(req *http.Request, dest *pb.Traces) (err error) {
switch mediaType := getMediaType(req); mediaType {
case "application/msgpack":
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
_, err = io.Copy(buf, req.Body)
if err != nil {
return err
Expand All @@ -283,8 +283,8 @@ func decodeRequest(req *http.Request, dest *pb.Traces) (err error) {
default:
// do our best
if err1 := json.NewDecoder(req.Body).Decode(&dest); err1 != nil {
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
_, err2 := io.Copy(buf, req.Body)
if err2 != nil {
return err2
Expand Down
Loading