Skip to content

Commit

Permalink
[connector/datadog] Clean up feature gate
Browse files Browse the repository at this point in the history
  • Loading branch information
songy23 committed Feb 26, 2024
1 parent ea1632e commit 2801df5
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 120 deletions.
27 changes: 27 additions & 0 deletions .chloggen/datadogconnector-feature-gate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Move feature gate `connector.datadogconnector.performance` to stable stage."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31414]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: "`connector.datadogconnector.performance` will be removed in the next release"

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/datadogconnector-mutates-data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: datadogconnector no longer mutates the input traces in trace-to-trace pipelines.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31414]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
14 changes: 5 additions & 9 deletions connector/datadogconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,11 @@ func (c *traceToMetricConnector) run() {
}
var mx pmetric.Metrics
var err error
if datadog.ConnectorPerformanceFeatureGate.IsEnabled() {
c.logger.Debug("Received stats payload", zap.Any("stats", stats))
mx, err = c.translator.StatsToMetrics(stats)
if err != nil {
c.logger.Error("Failed to convert stats to metrics", zap.Error(err))
continue
}
} else {
mx = c.translator.StatsPayloadToMetrics(stats)
c.logger.Debug("Received stats payload", zap.Any("stats", stats))
mx, err = c.translator.StatsToMetrics(stats)
if err != nil {
c.logger.Error("Failed to convert stats to metrics", zap.Error(err))
continue
}
// APM stats as metrics
ctx := context.TODO()
Expand Down
13 changes: 1 addition & 12 deletions connector/datadogconnector/traces_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ import (
"go.uber.org/zap"
)

// keyStatsComputed specifies the resource attribute key which indicates if stats have been
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"

type traceToTraceConnector struct {
logger *zap.Logger
tracesConsumer consumer.Traces // the next component in the pipeline to ingest traces after connector
Expand All @@ -42,17 +38,10 @@ func (c *traceToTraceConnector) Shutdown(_ context.Context) error {
// Capabilities implements the consumer interface.
// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data
func (c *traceToTraceConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true} // ConsumeTraces puts a new attribute _dd.stats_computed
return consumer.Capabilities{MutatesData: false}
}

// ConsumeTraces implements the consumer interface.
func (c *traceToTraceConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
// Stats will be computed for p. Mark the original resource spans to ensure that they don't
// get computed twice in case these spans pass through here again.
rs.Resource().Attributes().PutBool(keyStatsComputed, true)

}
return c.tracesConsumer.ConsumeTraces(ctx, traces)
}
9 changes: 3 additions & 6 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,9 @@ func (f *factory) createMetricsExporter(
set.Logger.Debug("Starting Datadog Trace-Agent StatsWriter")
go statsWriter.Run()

var statsIn chan []byte
if datadog.ConnectorPerformanceFeatureGate.IsEnabled() {
statsIn = make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, statsIn, statsToAgent, statsv, set.Logger)
}
statsIn := make(chan []byte, 1000)
statsv := set.BuildInfo.Command + set.BuildInfo.Version
f.consumeStatsPayload(ctx, statsIn, statsToAgent, statsv, set.Logger)
pcfg := newMetadataConfigfromConfig(cfg)
metadataReporter, err := f.Reporter(set, pcfg)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions exporter/datadogexporter/integrationtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/DataDog/datadog-agent/pkg/proto v0.52.0-devel
github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector v0.95.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter v0.95.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.95.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.95.0
github.com/stretchr/testify v1.8.4
github.com/tinylib/msgp v1.1.9
Expand All @@ -16,7 +15,6 @@ require (
go.opentelemetry.io/collector/connector v0.95.0
go.opentelemetry.io/collector/exporter v0.95.0
go.opentelemetry.io/collector/exporter/debugexporter v0.95.0
go.opentelemetry.io/collector/featuregate v1.2.0
go.opentelemetry.io/collector/otelcol v0.95.0
go.opentelemetry.io/collector/processor v0.95.0
go.opentelemetry.io/collector/processor/batchprocessor v0.95.0
Expand Down Expand Up @@ -113,6 +111,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.95.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.95.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.95.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.95.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.95.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.95.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.95.0 // indirect
Expand Down Expand Up @@ -161,6 +160,7 @@ require (
go.opentelemetry.io/collector/consumer v0.95.0 // indirect
go.opentelemetry.io/collector/extension v0.95.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.95.0 // indirect
go.opentelemetry.io/collector/featuregate v1.2.0 // indirect
go.opentelemetry.io/collector/pdata v1.2.0 // indirect
go.opentelemetry.io/collector/semconv v0.95.0 // indirect
go.opentelemetry.io/collector/service v0.95.0 // indirect
Expand Down
136 changes: 52 additions & 84 deletions exporter/datadogexporter/integrationtest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/debugexporter"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.opentelemetry.io/collector/processor"
Expand All @@ -42,102 +41,71 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
)

func TestIntegration(t *testing.T) {
tests := []struct {
name string
featureGateEnabled bool
}{
{
name: "with feature gate enabled",
featureGateEnabled: true,
},
{
name: "with feature gate disabled",
featureGateEnabled: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 1. Set up mock Datadog server
// See also https://github.com/DataDog/datadog-agent/blob/49c16e0d4deab396626238fa1d572b684475a53f/cmd/trace-agent/test/backend.go
apmstatsRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.APMStatsEndpoint, ReqChan: make(chan []byte)}
tracesRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.TraceEndpoint, ReqChan: make(chan []byte)}
server := testutil.DatadogServerMock(apmstatsRec.HandlerFunc, tracesRec.HandlerFunc)
defer server.Close()
// 1. Set up mock Datadog server
// See also https://github.com/DataDog/datadog-agent/blob/49c16e0d4deab396626238fa1d572b684475a53f/cmd/trace-agent/test/backend.go
apmstatsRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.APMStatsEndpoint, ReqChan: make(chan []byte)}
tracesRec := &testutil.HTTPRequestRecorderWithChan{Pattern: testutil.TraceEndpoint, ReqChan: make(chan []byte)}
server := testutil.DatadogServerMock(apmstatsRec.HandlerFunc, tracesRec.HandlerFunc)
defer server.Close()

// 2. Start in-process collector
factories := getIntegrationTestComponents(t)
app, confFilePath := getIntegrationTestCollector(t, server.URL, factories)
if tt.featureGateEnabled {
err := featuregate.GlobalRegistry().Set(datadog.ConnectorPerformanceFeatureGate.ID(), true)
assert.NoError(t, err)
defer func() {
_ = featuregate.GlobalRegistry().Set(datadog.ConnectorPerformanceFeatureGate.ID(), false)
}()
}
go func() {
assert.NoError(t, app.Run(context.Background()))
}()
defer app.Shutdown()
defer os.Remove(confFilePath)
waitForReadiness(app)
// 2. Start in-process collector
factories := getIntegrationTestComponents(t)
app, confFilePath := getIntegrationTestCollector(t, server.URL, factories)
go func() {
assert.NoError(t, app.Run(context.Background()))
}()
defer app.Shutdown()
defer os.Remove(confFilePath)
waitForReadiness(app)

// 3. Generate and send traces
sendTraces(t)
// 3. Generate and send traces
sendTraces(t)

// 4. Validate traces and APM stats from the mock server
var spans []*pb.Span
var stats []*pb.ClientGroupedStats
// 4. Validate traces and APM stats from the mock server
var spans []*pb.Span
var stats []*pb.ClientGroupedStats

// 5 sampled spans + APM stats on 10 spans are sent to datadog exporter
for len(spans) < 5 || len(stats) < 10 {
select {
case tracesBytes := <-tracesRec.ReqChan:
gz := getGzipReader(t, tracesBytes)
slurp, err := io.ReadAll(gz)
require.NoError(t, err)
var traces pb.AgentPayload
require.NoError(t, proto.Unmarshal(slurp, &traces))
for _, tps := range traces.TracerPayloads {
for _, chunks := range tps.Chunks {
spans = append(spans, chunks.Spans...)
for _, span := range chunks.Spans {
assert.Equal(t, "true", span.Meta["_dd.stats_computed"])
}
}
}
// 5 sampled spans + APM stats on 10 spans are sent to datadog exporter
for len(spans) < 5 || len(stats) < 10 {
select {
case tracesBytes := <-tracesRec.ReqChan:
gz := getGzipReader(t, tracesBytes)
slurp, err := io.ReadAll(gz)
require.NoError(t, err)
var traces pb.AgentPayload
require.NoError(t, proto.Unmarshal(slurp, &traces))
for _, tps := range traces.TracerPayloads {
for _, chunks := range tps.Chunks {
spans = append(spans, chunks.Spans...)
}
}

case apmstatsBytes := <-apmstatsRec.ReqChan:
gz := getGzipReader(t, apmstatsBytes)
var spl pb.StatsPayload
require.NoError(t, msgp.Decode(gz, &spl))
for _, csps := range spl.Stats {
for _, csbs := range csps.Stats {
stats = append(stats, csbs.Stats...)
for _, stat := range csbs.Stats {
assert.True(t, strings.HasPrefix(stat.Resource, "TestSpan"))
assert.Equal(t, uint64(1), stat.Hits)
assert.Equal(t, uint64(1), stat.TopLevelHits)
if tt.featureGateEnabled {
// Peer tags aggregation is supported only when the feature gate is enabled (it's enabled by default)
assert.Equal(t, "client", stat.SpanKind)
assert.Equal(t, []string{"extra_peer_tag:tag_val", "peer.service:svc"}, stat.PeerTags)
}
}
}
case apmstatsBytes := <-apmstatsRec.ReqChan:
gz := getGzipReader(t, apmstatsBytes)
var spl pb.StatsPayload
require.NoError(t, msgp.Decode(gz, &spl))
for _, csps := range spl.Stats {
for _, csbs := range csps.Stats {
stats = append(stats, csbs.Stats...)
for _, stat := range csbs.Stats {
assert.True(t, strings.HasPrefix(stat.Resource, "TestSpan"))
assert.Equal(t, uint64(1), stat.Hits)
assert.Equal(t, uint64(1), stat.TopLevelHits)
assert.Equal(t, "client", stat.SpanKind)
assert.Equal(t, []string{"extra_peer_tag:tag_val", "peer.service:svc"}, stat.PeerTags)
}
}
}

// Verify we don't receive more than the expected numbers
assert.Len(t, spans, 5)
assert.Len(t, stats, 10)
})
}
}

// Verify we don't receive more than the expected numbers
assert.Len(t, spans, 5)
assert.Len(t, stats, 10)
}

func getIntegrationTestComponents(t *testing.T) otelcol.Factories {
Expand Down
5 changes: 1 addition & 4 deletions exporter/datadogexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics/sketches"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
)

type metricsExporter struct {
Expand Down Expand Up @@ -86,9 +85,7 @@ func translatorFromConfig(set component.TelemetrySettings, cfg *Config, attrsTra
options = append(options, otlpmetrics.WithInitialCumulMonoValueMode(
otlpmetrics.InitialCumulMonoValueMode(cfg.Metrics.SumConfig.InitialCumulativeMonotonicMode)))

if datadog.ConnectorPerformanceFeatureGate.IsEnabled() {
options = append(options, otlpmetrics.WithStatsOut(statsOut))
}
options = append(options, otlpmetrics.WithStatsOut(statsOut))
return otlpmetrics.NewTranslator(set, attrsTranslator, options...)
}

Expand Down
6 changes: 3 additions & 3 deletions internal/datadog/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ type TraceAgent struct {
exit chan struct{}
}

// ConnectorPerformanceFeatureGate uses optimized code paths for the Datadog Connector.
var ConnectorPerformanceFeatureGate = featuregate.GlobalRegistry().MustRegister(
var _ = featuregate.GlobalRegistry().MustRegister(
"connector.datadogconnector.performance",
featuregate.StageBeta,
featuregate.StageStable,
featuregate.WithRegisterDescription("Datadog Connector will use optimized code"),
featuregate.WithRegisterToVersion("0.97.0"),
)

// newAgent creates a new unstarted traceagent using the given context. Call Start to start the traceagent.
Expand Down

0 comments on commit 2801df5

Please sign in to comment.