-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[connector/datadog] Use the native OTel ingest API in APM stats (#33297)
**Description:** Add a feature gate `connector.datadogconnector.NativeIngest` that enables datadog connector to use the new native OTel API in APM stats computation. It is disabled by default. Follow-up of DataDog/datadog-agent#23503. --------- Co-authored-by: Pablo Baeyens <[email protected]>
- Loading branch information
Showing
7 changed files
with
354 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: datadogconnector | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: "Add a feature gate `connector.datadogconnector.NativeIngest` that enables datadog connector to use the new native OTel API in APM stats computation." | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [33297] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: "The feature gate `connector.datadogconnector.NativeIngest` is disabled by default." | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector" | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" | ||
"github.com/DataDog/datadog-agent/pkg/trace/config" | ||
"github.com/DataDog/datadog-agent/pkg/trace/stats" | ||
"github.com/DataDog/datadog-go/v5/statsd" | ||
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" | ||
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
"go.opentelemetry.io/collector/pdata/ptrace" | ||
"go.opentelemetry.io/otel/metric/noop" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// traceToMetricConnectorNative is the schema for connector | ||
type traceToMetricConnectorNative struct { | ||
metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector | ||
logger *zap.Logger | ||
|
||
// concentrator ingests spans and produces APM stats | ||
concentrator *stats.Concentrator | ||
|
||
// tcfg is the trace agent config | ||
tcfg *config.AgentConfig | ||
|
||
// ctagKeys are container tag keys | ||
ctagKeys []string | ||
|
||
// translator specifies the translator used to transform APM Stats Payloads | ||
// from the agent to OTLP Metrics. | ||
translator *metrics.Translator | ||
|
||
// statsout specifies the channel through which the agent will output Stats Payloads | ||
// resulting from ingested traces. | ||
statsout chan *pb.StatsPayload | ||
|
||
// exit specifies the exit channel, which will be closed upon shutdown. | ||
exit chan struct{} | ||
|
||
// isStarted tracks whether Start() has been called. | ||
isStarted bool | ||
} | ||
|
||
var _ component.Component = (*traceToMetricConnectorNative)(nil) // testing that the connectorImp properly implements the type Component interface | ||
|
||
// newTraceToMetricConnectorNative creates a new connector with native OTel span ingestion | ||
func newTraceToMetricConnectorNative(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics, metricsClient statsd.ClientInterface) (*traceToMetricConnectorNative, error) { | ||
set.Logger.Info("Building datadog connector for traces to metrics") | ||
statsout := make(chan *pb.StatsPayload, 100) | ||
set.MeterProvider = noop.NewMeterProvider() // disable metrics for the connector | ||
attributesTranslator, err := attributes.NewTranslator(set) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create attributes translator: %w", err) | ||
} | ||
trans, err := metrics.NewTranslator(set, attributesTranslator) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create metrics translator: %w", err) | ||
} | ||
|
||
tcfg := getTraceAgentCfg(set.Logger, cfg.(*Config).Traces, attributesTranslator) | ||
return &traceToMetricConnectorNative{ | ||
logger: set.Logger, | ||
translator: trans, | ||
tcfg: tcfg, | ||
ctagKeys: cfg.(*Config).Traces.ResourceAttributesAsContainerTags, | ||
concentrator: stats.NewConcentrator(tcfg, statsout, time.Now(), metricsClient), | ||
statsout: statsout, | ||
metricsConsumer: metricsConsumer, | ||
exit: make(chan struct{}), | ||
}, nil | ||
} | ||
|
||
// Start implements the component.Component interface. | ||
func (c *traceToMetricConnectorNative) Start(_ context.Context, _ component.Host) error { | ||
c.logger.Info("Starting datadogconnector") | ||
c.concentrator.Start() | ||
go c.run() | ||
c.isStarted = true | ||
return nil | ||
} | ||
|
||
// Shutdown implements the component.Component interface. | ||
func (c *traceToMetricConnectorNative) Shutdown(context.Context) error { | ||
if !c.isStarted { | ||
// Note: it is not necessary to manually close c.exit, c.in and c.concentrator.exit channels as these are unused. | ||
c.logger.Info("Requested shutdown, but not started, ignoring.") | ||
return nil | ||
} | ||
c.logger.Info("Shutting down datadog connector") | ||
c.logger.Info("Stopping concentrator") | ||
// stop the concentrator and wait for the run loop to exit | ||
c.concentrator.Stop() | ||
c.exit <- struct{}{} // signal exit | ||
<-c.exit // wait for close | ||
return nil | ||
} | ||
|
||
// Capabilities implements the consumer interface. | ||
// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data | ||
func (c *traceToMetricConnectorNative) Capabilities() consumer.Capabilities { | ||
return consumer.Capabilities{MutatesData: false} | ||
} | ||
|
||
func (c *traceToMetricConnectorNative) ConsumeTraces(_ context.Context, traces ptrace.Traces) error { | ||
inputs := stats.OTLPTracesToConcentratorInputs(traces, c.tcfg, c.ctagKeys) | ||
for _, input := range inputs { | ||
c.concentrator.Add(input) | ||
} | ||
return nil | ||
} | ||
|
||
// run awaits incoming stats resulting from the agent's ingestion, converts them | ||
// to metrics and flushes them using the configured metrics exporter. | ||
func (c *traceToMetricConnectorNative) run() { | ||
defer close(c.exit) | ||
for { | ||
select { | ||
case stats := <-c.statsout: | ||
if len(stats.Stats) == 0 { | ||
continue | ||
} | ||
var mx pmetric.Metrics | ||
var err error | ||
|
||
c.logger.Debug("Received stats payload", zap.Any("stats", stats)) | ||
|
||
mx, err = c.translator.StatsToMetrics(stats) | ||
if err != nil { | ||
c.logger.Error("Failed to convert stats to metrics", zap.Error(err)) | ||
continue | ||
} | ||
// APM stats as metrics | ||
ctx := context.TODO() | ||
|
||
// send metrics to the consumer or next component in pipeline | ||
if err := c.metricsConsumer.ConsumeMetrics(ctx, mx); err != nil { | ||
c.logger.Error("Failed ConsumeMetrics", zap.Error(err)) | ||
return | ||
} | ||
case <-c.exit: | ||
return | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package datadogconnector | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/connector/connectortest" | ||
"go.opentelemetry.io/collector/consumer/consumertest" | ||
"go.opentelemetry.io/collector/featuregate" | ||
semconv "go.opentelemetry.io/collector/semconv/v1.5.0" | ||
"go.uber.org/zap" | ||
"google.golang.org/protobuf/proto" | ||
) | ||
|
||
var _ component.Component = (*traceToMetricConnectorNative)(nil) // testing that the connectorImp properly implements the type Component interface | ||
|
||
// create test to create a connector, check that basic code compiles | ||
func TestNewConnectorNative(t *testing.T) { | ||
err := featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), true) | ||
assert.NoError(t, err) | ||
defer func() { | ||
_ = featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), false) | ||
}() | ||
|
||
factory := NewFactory() | ||
|
||
creationParams := connectortest.NewNopSettings() | ||
cfg := factory.CreateDefaultConfig().(*Config) | ||
|
||
tconn, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop()) | ||
assert.NoError(t, err) | ||
|
||
_, ok := tconn.(*traceToMetricConnectorNative) | ||
assert.True(t, ok) // checks if the created connector implements the connectorImp struct | ||
} | ||
|
||
func TestTraceToTraceConnectorNative(t *testing.T) { | ||
err := featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), true) | ||
assert.NoError(t, err) | ||
defer func() { | ||
_ = featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), false) | ||
}() | ||
|
||
factory := NewFactory() | ||
|
||
creationParams := connectortest.NewNopSettings() | ||
cfg := factory.CreateDefaultConfig().(*Config) | ||
|
||
tconn, err := factory.CreateTracesToTraces(context.Background(), creationParams, cfg, consumertest.NewNop()) | ||
assert.NoError(t, err) | ||
|
||
_, ok := tconn.(*traceToTraceConnector) | ||
assert.True(t, ok) // checks if the created connector implements the connectorImp struct | ||
} | ||
|
||
func creteConnectorNative(t *testing.T) (*traceToMetricConnectorNative, *consumertest.MetricsSink) { | ||
err := featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), true) | ||
assert.NoError(t, err) | ||
defer func() { | ||
_ = featuregate.GlobalRegistry().Set(NativeIngestFeatureGate.ID(), false) | ||
}() | ||
|
||
factory := NewFactory() | ||
|
||
creationParams := connectortest.NewNopSettings() | ||
cfg := factory.CreateDefaultConfig().(*Config) | ||
cfg.Traces.ResourceAttributesAsContainerTags = []string{semconv.AttributeCloudAvailabilityZone, semconv.AttributeCloudRegion, "az"} | ||
|
||
metricsSink := &consumertest.MetricsSink{} | ||
|
||
tconn, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, metricsSink) | ||
assert.NoError(t, err) | ||
|
||
connector, ok := tconn.(*traceToMetricConnectorNative) | ||
require.True(t, ok) | ||
return connector, metricsSink | ||
} | ||
|
||
func TestContainerTagsNative(t *testing.T) { | ||
connector, metricsSink := creteConnectorNative(t) | ||
err := connector.Start(context.Background(), componenttest.NewNopHost()) | ||
if err != nil { | ||
t.Errorf("Error starting connector: %v", err) | ||
return | ||
} | ||
defer func() { | ||
_ = connector.Shutdown(context.Background()) | ||
}() | ||
|
||
trace1 := generateTrace() | ||
|
||
err = connector.ConsumeTraces(context.Background(), trace1) | ||
assert.NoError(t, err) | ||
|
||
// Send two traces to ensure unique container tags are added to the cache | ||
trace2 := generateTrace() | ||
err = connector.ConsumeTraces(context.Background(), trace2) | ||
assert.NoError(t, err) | ||
|
||
for { | ||
if len(metricsSink.AllMetrics()) > 0 { | ||
break | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
|
||
// check if the container tags are added to the metrics | ||
metrics := metricsSink.AllMetrics() | ||
assert.Equal(t, 1, len(metrics)) | ||
|
||
ch := make(chan []byte, 100) | ||
tr := newTranslatorWithStatsChannel(t, zap.NewNop(), ch) | ||
_, err = tr.MapMetrics(context.Background(), metrics[0], nil) | ||
require.NoError(t, err) | ||
msg := <-ch | ||
sp := &pb.StatsPayload{} | ||
|
||
err = proto.Unmarshal(msg, sp) | ||
require.NoError(t, err) | ||
|
||
tags := sp.Stats[0].Tags | ||
assert.Equal(t, 3, len(tags)) | ||
assert.ElementsMatch(t, []string{"region:my-region", "zone:my-zone", "az:my-az"}, tags) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.