Skip to content

Commit

Permalink
Datadog trace flushing/export (#1266)
Browse files Browse the repository at this point in the history
This PR adds flushing+export of traces and trace-related statistics to the `datadogexporter`, as well as some very minor changes to the translation of internal traces into Datadog format. It represents the second of two PRs for the work contained in #1203. It builds on top of current master branch, and follows up to the work [done here](#1208).

The final PR explicitly enabling The Datadog exporter will follow, and will allow users to export traces to Datadog's API Intake. 

This PR Split was requested by @tigrannajaryan and hopefully should make code review a bit less cumbersome. However if there are any questions or changes to the PR format needed, please let me know.

**Testing:** There are unit tests for the different methods and helper methods within the export code.
 
**Documentation:**  Appropriate usage, including best practices for which processors to also enable, has been documented in the README, `testdata/config.yaml` and `example/config.yaml` samples.

**Notes**: This PR includes a trace exporter for non-windows environments only (metrics are fine in windows, just traces that are the issue), due to reasons explained in this pr #1274 . tl;dr is our trace export code for windows env would rely on CGO for now, which is not permitted in the collector
  • Loading branch information
ericmustin authored Oct 21, 2020
1 parent 3113db5 commit 6a410f0
Show file tree
Hide file tree
Showing 12 changed files with 856 additions and 15 deletions.
44 changes: 43 additions & 1 deletion exporter/datadogexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ datadog:
```
To send data to the Datadog EU site, set the `api.site` parameter to `datadoghq.eu`:

```yaml
datadog:
api:
Expand All @@ -25,4 +26,45 @@ See the sample configuration file under the `example` folder for other available

## Trace Export Configuration

_Note: Trace Export is not supported on windows at the moment_
_Note: Trace Export is not supported on Windows at the moment_

### **Important Pipeline Setup Details**

This exporter assumes a pipeline using the datadog exporter also includes a [batch processor](https://github.com/open-telemetry/opentelemetry-collector/tree/master/processor/batchprocessor) configured with the following:
- a `timeout` setting of `10s`(10 seconds).

Please make sure to include this processor in your pipeline. An example pipeline can be found below.

A batch representing 10 seconds of traces is a constraint of Datadog's API Intake for Trace Related Statistics. Without this setting, trace related metrics including `.hits` `.errors` and `.duration` for different services and service resources may be inaccurate over periods of time.

Example:

```
receivers:
examplereceiver:
processors:
batch:
timeout: 10s
exporters:
datadog/api:
hostname: customhostname
env: prod
service: myservice
version: myversion
tags:
- example:tag
api:
key: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
site: datadoghq.eu
service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [batch]
exporters: [datadog/api]
```
33 changes: 33 additions & 0 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogexporter

import (
"context"
"errors"
"runtime"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand All @@ -35,6 +38,7 @@ func NewFactory() component.ExporterFactory {
typeStr,
createDefaultConfig,
exporterhelper.WithMetrics(createMetricsExporter),
exporterhelper.WithTraces(createTraceExporter),
)
}

Expand Down Expand Up @@ -99,3 +103,32 @@ func createMetricsExporter(
exporterhelper.WithRetry(exporterhelper.CreateDefaultRetrySettings()),
)
}

// createTraceExporter creates a trace exporter based on this config.
func createTraceExporter(
_ context.Context,
params component.ExporterCreateParams,
c configmodels.Exporter,
) (component.TraceExporter, error) {
// TODO review if trace export can be supported on Windows
if runtime.GOOS == "windows" {
return nil, errors.New("datadog trace export is currently not supported on Windows")
}

cfg := c.(*config.Config)

params.Logger.Info("sanitizing Datadog metrics exporter configuration")
if err := cfg.Sanitize(); err != nil {
return nil, err
}

exp, err := newTraceExporter(params.Logger, cfg)
if err != nil {
return nil, err
}

return exporterhelper.NewTraceExporter(
cfg,
exp.pushTraceData,
)
}
35 changes: 33 additions & 2 deletions exporter/datadogexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datadogexporter

import (
"context"
"path"
"runtime"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -123,7 +125,7 @@ func TestCreateAPIMetricsExporter(t *testing.T) {
logger := zap.NewNop()

factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)
require.NoError(t, err)

factory := NewFactory()
factories.Exporters[configmodels.Type(typeStr)] = factory
Expand All @@ -144,6 +146,35 @@ func TestCreateAPIMetricsExporter(t *testing.T) {
cfg.Exporters["datadog/api"],
)

assert.Nil(t, err)
assert.NoError(t, err)
assert.NotNil(t, exp)
}

func TestCreateAPITracesExporter(t *testing.T) {
// TODO review if test should succeed on Windows
if runtime.GOOS == "windows" {
t.Skip()
}

logger := zap.NewNop()

factories, err := componenttest.ExampleComponents()
require.NoError(t, err)

factory := NewFactory()
factories.Exporters[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

ctx := context.Background()
exp, err := factory.CreateTraceExporter(
ctx,
component.ExporterCreateParams{Logger: logger},
cfg.Exporters["datadog/api"],
)

assert.NoError(t, err)
assert.NotNil(t, exp)
}
1 change: 1 addition & 0 deletions exporter/datadogexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ replace gopkg.in/zorkian/go-datadog-api.v2 v2.29.0 => github.com/zorkian/go-data
require (
github.com/DataDog/datadog-agent v0.0.0-20200417180928-f454c60bc16f
github.com/DataDog/viper v1.8.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.3.0
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
github.com/gogo/protobuf v1.3.1
github.com/klauspost/compress v1.10.10
Expand Down
79 changes: 79 additions & 0 deletions exporter/datadogexporter/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows

package datadogexporter

import (
"time"

"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/stats"
)

const (
statsBucketDuration int64 = int64(10 * time.Second)
)

// ComputeAPMStats calculates the stats that should be submitted to APM about a given trace
func ComputeAPMStats(tracePayload *pb.TracePayload, pushTime int64) *stats.Payload {

statsRawBuckets := make(map[int64]*stats.RawBucket)

bucketTS := pushTime - statsBucketDuration

for _, trace := range tracePayload.Traces {
spans := GetAnalyzedSpans(trace.Spans)
sublayers := stats.ComputeSublayers(trace.Spans)
for _, span := range spans {

// TODO: While this is hardcoded to assume a single 10s buckets for now,
// An improvement would be to support keeping multiple 10s buckets in buffer
// ala, [0-10][10-20][20-30], only flushing the oldest bucket, to allow traces that
// get reported late to still be counted in the correct bucket. This is how the
// datadog- agent handles stats buckets, but would be non trivial to add.

var statsRawBucket *stats.RawBucket
if existingBucket, ok := statsRawBuckets[bucketTS]; ok {
statsRawBucket = existingBucket
} else {
statsRawBucket = stats.NewRawBucket(bucketTS, statsBucketDuration)
statsRawBuckets[bucketTS] = statsRawBucket
}

// Use weight 1, as sampling in opentelemetry would occur upstream in a processor.
// Generally we want to ship 100% of traces to the backend where more accurate tail based sampling can be performed.
// TopLevel is always "true" since we only compute stats for top-level spans.
weightedSpan := &stats.WeightedSpan{
Span: span,
Weight: 1,
TopLevel: true,
}
statsRawBucket.HandleSpan(weightedSpan, tracePayload.Env, []string{}, sublayers)
}
}

// Export statsRawBuckets to statsBuckets
statsBuckets := make([]stats.Bucket, 0)
for _, statsRawBucket := range statsRawBuckets {
statsBuckets = append(statsBuckets, statsRawBucket.Export())
}

return &stats.Payload{
HostName: tracePayload.HostName,
Env: tracePayload.Env,
Stats: statsBuckets,
}
}
Loading

0 comments on commit 6a410f0

Please sign in to comment.