From 6a410f029b4b48ea1979bf5c2b6b4dcca6432843 Mon Sep 17 00:00:00 2001 From: Eric Mustin Date: Wed, 21 Oct 2020 15:41:24 +0200 Subject: [PATCH] Datadog trace flushing/export (#1266) This PR adds flushing+export of traces and trace-related statistics to the `datadogexporter`, as well as some very minor changes to the translation of internal traces into Datadog format. It represents the second of two PRs for the work contained in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/1203. It builds on top of current master branch, and follows up to the work [done here](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/1208). The final PR explicitly enabling The Datadog exporter will follow, and will allow users to export traces to Datadog's API Intake. This PR Split was requested by @tigrannajaryan and hopefully should make code review a bit less cumbersome. However if there are any questions or changes to the PR format needed, please let me know. **Testing:** There are unit tests for the different methods and helper methods within the export code. **Documentation:** Appropriate usage, including best practices for which processors to also enable, has been documented in the README, `testdata/config.yaml` and `example/config.yaml` samples. **Notes**: This PR includes a trace exporter for non-windows environments only (metrics are fine in windows, just traces that are the issue), due to reasons explained in this pr https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/1274 . tl;dr is our trace export code for windows env would rely on CGO for now, which is not permitted in the collector --- exporter/datadogexporter/README.md | 44 +++- exporter/datadogexporter/factory.go | 33 +++ exporter/datadogexporter/factory_test.go | 35 ++- exporter/datadogexporter/go.mod | 1 + exporter/datadogexporter/stats.go | 79 ++++++ exporter/datadogexporter/trace_connection.go | 180 ++++++++++++++ exporter/datadogexporter/traces_exporter.go | 131 ++++++++++ .../datadogexporter/traces_exporter_test.go | 233 ++++++++++++++++++ .../traces_exporter_windows.go | 40 +++ exporter/datadogexporter/translate_traces.go | 12 +- .../datadogexporter/translate_traces_test.go | 12 +- exporter/datadogexporter/utils/http.go | 71 ++++++ 12 files changed, 856 insertions(+), 15 deletions(-) create mode 100644 exporter/datadogexporter/stats.go create mode 100644 exporter/datadogexporter/trace_connection.go create mode 100644 exporter/datadogexporter/traces_exporter.go create mode 100644 exporter/datadogexporter/traces_exporter_test.go create mode 100644 exporter/datadogexporter/traces_exporter_windows.go create mode 100644 exporter/datadogexporter/utils/http.go diff --git a/exporter/datadogexporter/README.md b/exporter/datadogexporter/README.md index 3e420ed2da9c..b336f190fa58 100644 --- a/exporter/datadogexporter/README.md +++ b/exporter/datadogexporter/README.md @@ -12,6 +12,7 @@ datadog: ``` To send data to the Datadog EU site, set the `api.site` parameter to `datadoghq.eu`: + ```yaml datadog: api: @@ -25,4 +26,45 @@ See the sample configuration file under the `example` folder for other available ## Trace Export Configuration -_Note: Trace Export is not supported on windows at the moment_ \ No newline at end of file +_Note: Trace Export is not supported on Windows at the moment_ + +### **Important Pipeline Setup Details** + +This exporter assumes a pipeline using the datadog exporter also includes a [batch processor](https://github.com/open-telemetry/opentelemetry-collector/tree/master/processor/batchprocessor) configured with the following: + - a `timeout` setting of `10s`(10 seconds). + +Please make sure to include this processor in your pipeline. An example pipeline can be found below. + +A batch representing 10 seconds of traces is a constraint of Datadog's API Intake for Trace Related Statistics. Without this setting, trace related metrics including `.hits` `.errors` and `.duration` for different services and service resources may be inaccurate over periods of time. + +Example: + + ``` +receivers: + examplereceiver: + +processors: + batch: + timeout: 10s + +exporters: + datadog/api: + hostname: customhostname + env: prod + service: myservice + version: myversion + + tags: + - example:tag + + api: + key: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + site: datadoghq.eu + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [batch] + exporters: [datadog/api] + ``` diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index bd622649326c..9ffe07904881 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -11,10 +11,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package datadogexporter import ( "context" + "errors" + "runtime" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" @@ -35,6 +38,7 @@ func NewFactory() component.ExporterFactory { typeStr, createDefaultConfig, exporterhelper.WithMetrics(createMetricsExporter), + exporterhelper.WithTraces(createTraceExporter), ) } @@ -99,3 +103,32 @@ func createMetricsExporter( exporterhelper.WithRetry(exporterhelper.CreateDefaultRetrySettings()), ) } + +// createTraceExporter creates a trace exporter based on this config. +func createTraceExporter( + _ context.Context, + params component.ExporterCreateParams, + c configmodels.Exporter, +) (component.TraceExporter, error) { + // TODO review if trace export can be supported on Windows + if runtime.GOOS == "windows" { + return nil, errors.New("datadog trace export is currently not supported on Windows") + } + + cfg := c.(*config.Config) + + params.Logger.Info("sanitizing Datadog metrics exporter configuration") + if err := cfg.Sanitize(); err != nil { + return nil, err + } + + exp, err := newTraceExporter(params.Logger, cfg) + if err != nil { + return nil, err + } + + return exporterhelper.NewTraceExporter( + cfg, + exp.pushTraceData, + ) +} diff --git a/exporter/datadogexporter/factory_test.go b/exporter/datadogexporter/factory_test.go index 0fe8501b57c1..abd24039b6f3 100644 --- a/exporter/datadogexporter/factory_test.go +++ b/exporter/datadogexporter/factory_test.go @@ -11,11 +11,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package datadogexporter import ( "context" "path" + "runtime" "testing" "github.com/stretchr/testify/assert" @@ -123,7 +125,7 @@ func TestCreateAPIMetricsExporter(t *testing.T) { logger := zap.NewNop() factories, err := componenttest.ExampleComponents() - assert.NoError(t, err) + require.NoError(t, err) factory := NewFactory() factories.Exporters[configmodels.Type(typeStr)] = factory @@ -144,6 +146,35 @@ func TestCreateAPIMetricsExporter(t *testing.T) { cfg.Exporters["datadog/api"], ) - assert.Nil(t, err) + assert.NoError(t, err) + assert.NotNil(t, exp) +} + +func TestCreateAPITracesExporter(t *testing.T) { + // TODO review if test should succeed on Windows + if runtime.GOOS == "windows" { + t.Skip() + } + + logger := zap.NewNop() + + factories, err := componenttest.ExampleComponents() + require.NoError(t, err) + + factory := NewFactory() + factories.Exporters[configmodels.Type(typeStr)] = factory + cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + ctx := context.Background() + exp, err := factory.CreateTraceExporter( + ctx, + component.ExporterCreateParams{Logger: logger}, + cfg.Exporters["datadog/api"], + ) + + assert.NoError(t, err) assert.NotNil(t, exp) } diff --git a/exporter/datadogexporter/go.mod b/exporter/datadogexporter/go.mod index f091a4c152b5..714e7297d002 100644 --- a/exporter/datadogexporter/go.mod +++ b/exporter/datadogexporter/go.mod @@ -7,6 +7,7 @@ replace gopkg.in/zorkian/go-datadog-api.v2 v2.29.0 => github.com/zorkian/go-data require ( github.com/DataDog/datadog-agent v0.0.0-20200417180928-f454c60bc16f github.com/DataDog/viper v1.8.0 // indirect + github.com/census-instrumentation/opencensus-proto v0.3.0 github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect github.com/gogo/protobuf v1.3.1 github.com/klauspost/compress v1.10.10 diff --git a/exporter/datadogexporter/stats.go b/exporter/datadogexporter/stats.go new file mode 100644 index 000000000000..b98bf39628ac --- /dev/null +++ b/exporter/datadogexporter/stats.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package datadogexporter + +import ( + "time" + + "github.com/DataDog/datadog-agent/pkg/trace/pb" + "github.com/DataDog/datadog-agent/pkg/trace/stats" +) + +const ( + statsBucketDuration int64 = int64(10 * time.Second) +) + +// ComputeAPMStats calculates the stats that should be submitted to APM about a given trace +func ComputeAPMStats(tracePayload *pb.TracePayload, pushTime int64) *stats.Payload { + + statsRawBuckets := make(map[int64]*stats.RawBucket) + + bucketTS := pushTime - statsBucketDuration + + for _, trace := range tracePayload.Traces { + spans := GetAnalyzedSpans(trace.Spans) + sublayers := stats.ComputeSublayers(trace.Spans) + for _, span := range spans { + + // TODO: While this is hardcoded to assume a single 10s buckets for now, + // An improvement would be to support keeping multiple 10s buckets in buffer + // ala, [0-10][10-20][20-30], only flushing the oldest bucket, to allow traces that + // get reported late to still be counted in the correct bucket. This is how the + // datadog- agent handles stats buckets, but would be non trivial to add. + + var statsRawBucket *stats.RawBucket + if existingBucket, ok := statsRawBuckets[bucketTS]; ok { + statsRawBucket = existingBucket + } else { + statsRawBucket = stats.NewRawBucket(bucketTS, statsBucketDuration) + statsRawBuckets[bucketTS] = statsRawBucket + } + + // Use weight 1, as sampling in opentelemetry would occur upstream in a processor. + // Generally we want to ship 100% of traces to the backend where more accurate tail based sampling can be performed. + // TopLevel is always "true" since we only compute stats for top-level spans. + weightedSpan := &stats.WeightedSpan{ + Span: span, + Weight: 1, + TopLevel: true, + } + statsRawBucket.HandleSpan(weightedSpan, tracePayload.Env, []string{}, sublayers) + } + } + + // Export statsRawBuckets to statsBuckets + statsBuckets := make([]stats.Bucket, 0) + for _, statsRawBucket := range statsRawBuckets { + statsBuckets = append(statsBuckets, statsRawBucket.Export()) + } + + return &stats.Payload{ + HostName: tracePayload.HostName, + Env: tracePayload.Env, + Stats: statsBuckets, + } +} diff --git a/exporter/datadogexporter/trace_connection.go b/exporter/datadogexporter/trace_connection.go new file mode 100644 index 000000000000..5f904c8b337e --- /dev/null +++ b/exporter/datadogexporter/trace_connection.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package datadogexporter + +import ( + "bytes" + "context" + "fmt" + "net/http" + "time" + + "github.com/DataDog/datadog-agent/pkg/trace/pb" + "github.com/DataDog/datadog-agent/pkg/trace/stats" + "github.com/gogo/protobuf/proto" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/utils" +) + +// TraceEdgeConnection is used to send data to trace edge +type TraceEdgeConnection interface { + SendTraces(ctx context.Context, trace *pb.TracePayload, maxRetries int) error + SendStats(ctx context.Context, stats *stats.Payload, maxRetries int) error +} + +type traceEdgeConnection struct { + traceURL string + statsURL string + apiKey string + InsecureSkipVerify bool +} + +const ( + traceEdgeTimeout time.Duration = 10 * time.Second + traceEdgeRetryInterval time.Duration = 10 * time.Second +) + +// CreateTraceEdgeConnection returns a new TraceEdgeConnection +func CreateTraceEdgeConnection(rootURL, apiKey string) TraceEdgeConnection { + + return &traceEdgeConnection{ + traceURL: rootURL + "/api/v0.2/traces", + statsURL: rootURL + "/api/v0.2/stats", + apiKey: apiKey, + } +} + +// Payload represents a data payload to be sent to some endpoint +type Payload struct { + CreationDate time.Time + Bytes []byte + Headers map[string]string +} + +// SendTraces serializes a trace payload to protobuf and sends it to Trace Edge +func (con *traceEdgeConnection) SendTraces(ctx context.Context, trace *pb.TracePayload, maxRetries int) error { + binary, marshallErr := proto.Marshal(trace) + if marshallErr != nil { + return fmt.Errorf("failed to serialize trace payload to protobuf: %w", marshallErr) + } + if len(trace.Traces) == 0 { + return fmt.Errorf("no traces in payload") + } + + // Set Headers + headers := utils.ProtobufHeaders + + // Construct a Payload{} from the headers and binary + payload := Payload{ + CreationDate: time.Now().UTC(), + Bytes: binary, + Headers: headers, + } + + var sendErr error + var shouldRetry bool + // If error while sending to trace-edge, retry maximum maxRetries number of times + // NOTE: APM stores traces by trace id, however, Logs pipeline does NOT dedupe APM events, + // and retries may potentially cause duplicate APM events in Trace Search + for retries := 1; retries <= maxRetries; retries++ { + if shouldRetry, sendErr = con.sendPayloadToTraceEdge(ctx, con.apiKey, &payload, con.traceURL); sendErr == nil { + return nil + } + + if !shouldRetry { + break + } + + time.Sleep(traceEdgeRetryInterval) + } + return fmt.Errorf("failed to send trace payload to trace edge: %w", sendErr) +} + +// SendStats serializes a stats payload to json and sends it to Trace Edge +func (con *traceEdgeConnection) SendStats(ctx context.Context, sts *stats.Payload, maxRetries int) error { + var b bytes.Buffer + err := stats.EncodePayload(&b, sts) + if err != nil { + return fmt.Errorf("failed to encode stats payload: %w", err) + } + binary := b.Bytes() + + // Set Headers + headers := utils.JSONHeaders + + // Construct a Payload{} from the headers and binary + payload := Payload{ + CreationDate: time.Now().UTC(), + Bytes: binary, + Headers: headers, + } + + var sendErr error + var shouldRetry bool + // If error while sending to trace-edge, retry maximum maxRetries number of times + // NOTE: APM does NOT dedupe, and retries may potentially cause duplicate/inaccurate stats + for retries := 1; retries <= maxRetries; retries++ { + if shouldRetry, sendErr = con.sendPayloadToTraceEdge(ctx, con.apiKey, &payload, con.statsURL); sendErr == nil { + return nil + } + + if !shouldRetry { + break + } + + time.Sleep(traceEdgeRetryInterval) + } + return fmt.Errorf("failed to send stats payload to trace edge: %w", sendErr) +} + +// sendPayloadToTraceEdge sends a payload to Trace Edge +func (con *traceEdgeConnection) sendPayloadToTraceEdge(ctx context.Context, apiKey string, payload *Payload, url string) (bool, error) { + // Create the request to be sent to the API + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(payload.Bytes)) + + if err != nil { + return false, err + } + + utils.SetDDHeaders(req.Header, apiKey) + utils.SetExtraHeaders(req.Header, payload.Headers) + + client := utils.NewHTTPClient(traceEdgeTimeout) + resp, err := client.Do(req) + + if err != nil { + // in this case, the payload and client are malformed in some way, so we should not retry + return false, err + } + defer resp.Body.Close() + + // We check the status code to see if the request has succeeded. + // TODO: define all legit status code and behave accordingly. + if resp.StatusCode/100 != 2 { + err := fmt.Errorf("request to %s responded with %s", url, resp.Status) + if resp.StatusCode/100 == 5 { + // 5xx errors are retriable + return true, err + } + + // All others aren't + return false, err + } + + // Everything went fine + return false, nil +} diff --git a/exporter/datadogexporter/traces_exporter.go b/exporter/datadogexporter/traces_exporter.go new file mode 100644 index 000000000000..06fbd45c8b9a --- /dev/null +++ b/exporter/datadogexporter/traces_exporter.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package datadogexporter + +import ( + "context" + "time" + + "github.com/DataDog/datadog-agent/pkg/trace/obfuscate" + "github.com/DataDog/datadog-agent/pkg/trace/pb" + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" +) + +type traceExporter struct { + logger *zap.Logger + cfg *config.Config + edgeConnection TraceEdgeConnection + obfuscator *obfuscate.Obfuscator + tags []string +} + +func newTraceExporter(logger *zap.Logger, cfg *config.Config) (*traceExporter, error) { + // removes potentially sensitive info and PII, approach taken from serverless approach + // https://github.com/DataDog/datadog-serverless-functions/blob/11f170eac105d66be30f18eda09eca791bc0d31b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L43 + obfuscator := obfuscate.NewObfuscator(&obfuscate.Config{ + ES: obfuscate.JSONSettings{ + Enabled: true, + }, + Mongo: obfuscate.JSONSettings{ + Enabled: true, + }, + RemoveQueryString: true, + RemovePathDigits: true, + RemoveStackTraces: true, + Redis: true, + Memcached: true, + }) + + // Calculate tags at startup + tags := cfg.TagsConfig.GetTags(false) + exporter := &traceExporter{ + logger: logger, + cfg: cfg, + edgeConnection: CreateTraceEdgeConnection(cfg.Traces.TCPAddr.Endpoint, cfg.API.Key), + obfuscator: obfuscator, + tags: tags, + } + + return exporter, nil +} + +// TODO: when component.Host exposes a way to retrieve processors, check for batch processors +// and log a warning if not set + +// Start tells the exporter to start. The exporter may prepare for exporting +// by connecting to the endpoint. Host parameter can be used for communicating +// with the host after Start() has already returned. If error is returned by +// Start() then the collector startup will be aborted. +// func (exp *traceExporter) Start(_ context.Context, _ component.Host) error { +// return nil +// } + +func (exp *traceExporter) pushTraceData( + ctx context.Context, + td pdata.Traces, +) (int, error) { + + // convert traces to datadog traces and group trace payloads by env + // we largely apply the same logic as the serverless implementation, simplified a bit + // https://github.com/DataDog/datadog-serverless-functions/blob/f5c3aedfec5ba223b11b76a4239fcbf35ec7d045/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L61-L83 + ddTraces, err := ConvertToDatadogTd(td, exp.cfg, exp.tags) + + if err != nil { + exp.logger.Info("failed to convert traces", zap.Error(err)) + return 0, err + } + + // group the traces by env to reduce the number of flushes + aggregatedTraces := AggregateTracePayloadsByEnv(ddTraces) + + // security/obfuscation for db, query strings, stack traces, pii, etc + // TODO: is there any config we want here? OTEL has their own pipeline for regex obfuscation + ObfuscatePayload(exp.obfuscator, aggregatedTraces) + + pushTime := time.Now().UTC().UnixNano() + for _, ddTracePayload := range aggregatedTraces { + // currently we don't want to do retries since api endpoints may not dedupe in certain situations + // adding a helper function here to make custom retry logic easier in the future + exp.pushWithRetry(ctx, ddTracePayload, 1, pushTime, func() error { + return nil + }) + } + + return len(aggregatedTraces), nil +} + +// gives us flexibility to add custom retry logic later +func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, maxRetries int, pushTime int64, fn func() error) error { + err := exp.edgeConnection.SendTraces(ctx, ddTracePayload, maxRetries) + + if err != nil { + exp.logger.Info("failed to send traces", zap.Error(err)) + } + + // this is for generating metrics like hits, errors, and latency, it uses a separate endpoint than Traces + stats := ComputeAPMStats(ddTracePayload, pushTime) + errStats := exp.edgeConnection.SendStats(context.Background(), stats, maxRetries) + + if errStats != nil { + exp.logger.Info("failed to send trace stats", zap.Error(errStats)) + } + + return fn() +} diff --git a/exporter/datadogexporter/traces_exporter_test.go b/exporter/datadogexporter/traces_exporter_test.go new file mode 100644 index 000000000000..753eb1d87794 --- /dev/null +++ b/exporter/datadogexporter/traces_exporter_test.go @@ -0,0 +1,233 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows + +package datadogexporter + +import ( + "compress/gzip" + "context" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/DataDog/datadog-agent/pkg/trace/pb" + "github.com/DataDog/datadog-agent/pkg/trace/stats" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" +) + +func testTraceExporterHelper(td pdata.Traces, t *testing.T) []string { + var got []string + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + assert.Equal(t, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", req.Header.Get("DD-Api-Key")) + + contentType := req.Header.Get("Content-Type") + + data := []string{contentType} + got = append(got, data...) + + if contentType == "application/x-protobuf" { + testProtobufTracePayload(t, rw, req) + } else if contentType == "application/json" { + testJSONTraceStatsPayload(t, rw, req) + } + rw.WriteHeader(http.StatusAccepted) + })) + + defer server.Close() + cfg := config.Config{ + API: config.APIConfig{ + Key: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + }, + TagsConfig: config.TagsConfig{ + Hostname: "test_host", + Env: "test_env", + Tags: []string{"key:val"}, + }, + Traces: config.TracesConfig{ + SampleRate: 1, + TCPAddr: confignet.TCPAddr{ + Endpoint: server.URL, + }, + }, + } + + params := component.ExporterCreateParams{Logger: zap.NewNop()} + + exporter, err := createTraceExporter(context.Background(), params, &cfg) + + assert.NoError(t, err) + + defer exporter.Shutdown(context.Background()) + + ctx := context.Background() + errConsume := exporter.ConsumeTraces(ctx, td) + assert.NoError(t, errConsume) + + return got +} + +func testProtobufTracePayload(t *testing.T, rw http.ResponseWriter, req *http.Request) { + var traceData pb.TracePayload + b, err := ioutil.ReadAll(req.Body) + + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + assert.NoError(t, err, "http server received malformed trace payload") + return + } + + defer req.Body.Close() + + if marshallErr := proto.Unmarshal(b, &traceData); marshallErr != nil { + http.Error(rw, marshallErr.Error(), http.StatusInternalServerError) + assert.NoError(t, marshallErr, "http server received malformed trace payload") + return + } + + assert.NotNil(t, traceData.Env) + assert.NotNil(t, traceData.HostName) + assert.NotNil(t, traceData.Traces) +} + +func testJSONTraceStatsPayload(t *testing.T, rw http.ResponseWriter, req *http.Request) { + var statsData stats.Payload + + gz, err := gzip.NewReader(req.Body) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + require.NoError(t, err, "http server received malformed stats payload") + return + } + + defer req.Body.Close() + defer gz.Close() + + statsBytes, err := ioutil.ReadAll(gz) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + require.NoError(t, err, "http server received malformed stats payload") + return + } + + if marshallErr := json.Unmarshal(statsBytes, &statsData); marshallErr != nil { + http.Error(rw, marshallErr.Error(), http.StatusInternalServerError) + require.NoError(t, marshallErr, "http server received malformed stats payload") + return + } + + assert.NotNil(t, statsData.Env) + assert.NotNil(t, statsData.HostName) + assert.NotNil(t, statsData.Stats) +} + +func TestNewTraceExporter(t *testing.T) { + cfg := &config.Config{} + cfg.API.Key = "ddog_32_characters_long_api_key1" + logger := zap.NewNop() + + // The client should have been created correctly + exp, err := newTraceExporter(logger, cfg) + assert.NoError(t, err) + assert.NotNil(t, exp) +} + +func TestPushTraceData(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + assert.Equal(t, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", req.Header.Get("DD-Api-Key")) + rw.WriteHeader(http.StatusAccepted) + })) + + defer server.Close() + cfg := &config.Config{ + API: config.APIConfig{ + Key: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + }, + TagsConfig: config.TagsConfig{ + Hostname: "test_host", + Env: "test_env", + Tags: []string{"key:val"}, + }, + Traces: config.TracesConfig{ + SampleRate: 1, + TCPAddr: confignet.TCPAddr{ + Endpoint: server.URL, + }, + }, + } + logger := zap.NewNop() + + exp, err := newTraceExporter(logger, cfg) + + assert.NoError(t, err) + + tracesLength, err := exp.pushTraceData(context.Background(), func() pdata.Traces { + traces := pdata.NewTraces() + resourceSpans := traces.ResourceSpans() + resourceSpans.Resize(1) + resourceSpans.At(0).InitEmpty() + resourceSpans.At(0).InstrumentationLibrarySpans().Resize(1) + resourceSpans.At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) + return traces + }()) + + assert.NoError(t, err) + assert.Equal(t, 1, tracesLength) + +} + +func TestTraceAndStatsExporter(t *testing.T) { + // ensure that the protobuf serialized traces payload contains HostName Env and Traces + // ensure that the json gzipped stats payload contains HostName Env and Stats + got := testTraceExporterHelper(simpleTraces(), t) + + // ensure a protobuf and json payload are sent + assert.Equal(t, 2, len(got)) + assert.Equal(t, "application/json", got[1]) + assert.Equal(t, "application/x-protobuf", got[0]) +} + +func simpleTraces() pdata.Traces { + return simpleTracesWithID(pdata.NewTraceID([]byte{1, 2, 3, 4})) +} + +func simpleTracesWithID(traceID pdata.TraceID) pdata.Traces { + span := pdata.NewSpan() + span.InitEmpty() + span.SetTraceID(traceID) + + ils := pdata.NewInstrumentationLibrarySpans() + ils.InitEmpty() + ils.Spans().Append(span) + + rs := pdata.NewResourceSpans() + rs.InitEmpty() + rs.InstrumentationLibrarySpans().Append(ils) + + traces := pdata.NewTraces() + traces.ResourceSpans().Append(rs) + + return traces +} diff --git a/exporter/datadogexporter/traces_exporter_windows.go b/exporter/datadogexporter/traces_exporter_windows.go new file mode 100644 index 000000000000..8074860dde61 --- /dev/null +++ b/exporter/datadogexporter/traces_exporter_windows.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package datadogexporter + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" +) + +type traceExporterStub struct {} + +func newTraceExporter(logger *zap.Logger, cfg *config.Config) (*traceExporterStub, error) { + return &traceExporterStub{}, errors.New("datadog trace export is currently not supported on Windows") +} + +func (exp *traceExporterStub) pushTraceData( + _ context.Context, + _ pdata.Traces, +) (int, error) { + return 0, errors.New("datadog trace export is currently not supported on Windows") +} diff --git a/exporter/datadogexporter/translate_traces.go b/exporter/datadogexporter/translate_traces.go index 0ecba31a4e85..f47c8ec4d358 100644 --- a/exporter/datadogexporter/translate_traces.go +++ b/exporter/datadogexporter/translate_traces.go @@ -384,9 +384,9 @@ func attributeMapToStringMap(attrMap pdata.AttributeMap) map[string]string { func spanKindToDatadogType(kind pdata.SpanKind) string { switch kind { case pdata.SpanKindCLIENT: - return "client" + return "http" case pdata.SpanKindSERVER: - return "server" + return "web" default: return "custom" } @@ -450,18 +450,18 @@ func getDatadogSpanName(s pdata.Span, datadogTags map[string]string) string { // The spec has changed over time and, depending on the original exporter, IL Name could represented a few different ways // so we try to account for all permutations if ilnOtlp, okOtlp := datadogTags[tracetranslator.TagInstrumentationName]; okOtlp { - return fmt.Sprintf("%s.%s", ilnOtlp, s.Kind()) + return strings.ReplaceAll(fmt.Sprintf("%s.%s", ilnOtlp, s.Kind()), "::", "_") } if ilnOtelCur, okOtelCur := datadogTags[currentILNameTag]; okOtelCur { - return fmt.Sprintf("%s.%s", ilnOtelCur, s.Kind()) + return strings.ReplaceAll(fmt.Sprintf("%s.%s", ilnOtelCur, s.Kind()), "::", "_") } if ilnOtelOld, okOtelOld := datadogTags[oldILNameTag]; okOtelOld { - return fmt.Sprintf("%s.%s", ilnOtelOld, s.Kind()) + return strings.ReplaceAll(fmt.Sprintf("%s.%s", ilnOtelOld, s.Kind()), "::", "_") } - return fmt.Sprintf("%s.%s", "opentelemetry", s.Kind()) + return strings.ReplaceAll(fmt.Sprintf("%s.%s", "opentelemetry", s.Kind()), "::", "_") } func getDatadogResourceName(s pdata.Span, datadogTags map[string]string) string { diff --git a/exporter/datadogexporter/translate_traces_test.go b/exporter/datadogexporter/translate_traces_test.go index 108bd8c601e3..9e917b669b56 100644 --- a/exporter/datadogexporter/translate_traces_test.go +++ b/exporter/datadogexporter/translate_traces_test.go @@ -128,7 +128,7 @@ func TestConvertToDatadogTd(t *testing.T) { outputTraces, err := ConvertToDatadogTd(traces, &config.Config{}, []string{}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 1, len(outputTraces)) } @@ -137,7 +137,7 @@ func TestConvertToDatadogTdNoResourceSpans(t *testing.T) { outputTraces, err := ConvertToDatadogTd(traces, &config.Config{}, []string{}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 0, len(outputTraces)) } @@ -174,7 +174,7 @@ func TestObfuscation(t *testing.T) { outputTraces, err := ConvertToDatadogTd(traces, &config.Config{}, []string{}) - assert.Nil(t, err) + assert.NoError(t, err) aggregatedTraces := AggregateTracePayloadsByEnv(outputTraces) @@ -244,7 +244,7 @@ func TestBasicTracesTranslation(t *testing.T) { assert.Equal(t, fmt.Sprintf("%s.%s", datadogPayload.Traces[0].Spans[0].Meta[tracetranslator.TagInstrumentationName], pdata.SpanKindSERVER), datadogPayload.Traces[0].Spans[0].Name) // ensure that span.type is based on otlp span.kind - assert.Equal(t, "server", datadogPayload.Traces[0].Spans[0].Type) + assert.Equal(t, "web", datadogPayload.Traces[0].Spans[0].Type) // ensure that span.meta and span.metrics pick up attibutes, instrumentation ibrary and resource attribs assert.Equal(t, 10, len(datadogPayload.Traces[0].Spans[0].Meta)) @@ -441,8 +441,8 @@ func TestSpanTypeTranslation(t *testing.T) { spanTypeServer := spanKindToDatadogType(pdata.SpanKindSERVER) spanTypeCustom := spanKindToDatadogType(pdata.SpanKindUNSPECIFIED) - assert.Equal(t, "client", spanTypeClient) - assert.Equal(t, "server", spanTypeServer) + assert.Equal(t, "http", spanTypeClient) + assert.Equal(t, "web", spanTypeServer) assert.Equal(t, "custom", spanTypeCustom) } diff --git a/exporter/datadogexporter/utils/http.go b/exporter/datadogexporter/utils/http.go new file mode 100644 index 000000000000..e7ca004ffe9a --- /dev/null +++ b/exporter/datadogexporter/utils/http.go @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "time" +) + +var ( + JSONHeaders map[string]string = map[string]string{ + "Content-Type": "application/json", + "Content-Encoding": "gzip", + } + ProtobufHeaders map[string]string = map[string]string{ + "Content-Type": "application/x-protobuf", + "Content-Encoding": "identity", + } +) + +// NewClient returns a http.Client configured with the Agent options. +func NewHTTPClient(timeout time.Duration) *http.Client { + return &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + // Disable RFC 6555 Fast Fallback ("Happy Eyeballs") + FallbackDelay: -1 * time.Nanosecond, + }).DialContext, + MaxIdleConns: 100, + // Not supported by intake + ForceAttemptHTTP2: false, + TLSClientConfig: &tls.Config{InsecureSkipVerify: false}, + }, + } +} + +// SetExtraHeaders appends a header map to HTTP headers. +func SetExtraHeaders(h http.Header, extras map[string]string) { + for key, value := range extras { + h.Set(key, value) + } +} + +func SetDDHeaders(reqHeader http.Header, apiKey string) { + // userAgent is the computed user agent we'll use when + // communicating with Datadog + var userAgent = fmt.Sprintf( + "%s/%s/%s (+%s)", + "otel-collector-exporter", "0.1", "1", "http://localhost", + ) + + reqHeader.Set("DD-Api-Key", apiKey) + reqHeader.Set("User-Agent", userAgent) +}