Skip to content

Commit

Permalink
add mw_account_key in datadogreciver
Browse files Browse the repository at this point in the history
  • Loading branch information
meenal-developer committed Apr 23, 2024
1 parent d445997 commit 4b3e824
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 9 deletions.
2 changes: 1 addition & 1 deletion receiver/datadogreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewFactory() receiver.Factory {
func createDefaultConfig() component.Config {
return &Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: "localhost:8126",
Endpoint: "localhost:8120",
},
ReadTimeout: 60 * time.Second,
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/datadogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ go 1.20

require (
github.com/DataDog/datadog-agent/pkg/proto v0.48.0-beta.1
github.com/gogo/protobuf v1.3.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.84.0
github.com/stretchr/testify v1.8.4
github.com/tinylib/msgp v1.1.8
github.com/vmihailenco/msgpack/v4 v4.3.12
go.opentelemetry.io/collector v0.84.0
go.opentelemetry.io/collector/component v0.84.0
Expand All @@ -23,7 +25,6 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -38,7 +39,6 @@ require (
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.9.0 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
github.com/vmihailenco/tagparser v0.1.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.84.0 // indirect
Expand Down
97 changes: 95 additions & 2 deletions receiver/datadogreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"

import (
"compress/gzip"
"compress/zlib"
"context"
"errors"
"fmt"
"net/http"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver"
"io"
"net/http"
"strings"
)

type datadogReceiver struct {
Expand Down Expand Up @@ -48,9 +51,15 @@ func newDataDogReceiver(config *Config, nextConsumer consumer.Traces, params rec

func (ddr *datadogReceiver) Start(_ context.Context, host component.Host) error {
ddmux := http.NewServeMux()
ddmux.HandleFunc("/api/v0.2/traces", ddr.handleV2Traces)
ddmux.HandleFunc("/v0.2/traces", ddr.handleV2Traces)
ddmux.HandleFunc("/api/v0.3/traces", ddr.handleTraces)
ddmux.HandleFunc("/v0.3/traces", ddr.handleTraces)
ddmux.HandleFunc("/api/v0.4/traces", ddr.handleTraces)
ddmux.HandleFunc("/v0.4/traces", ddr.handleTraces)
ddmux.HandleFunc("/api/v0.5/traces", ddr.handleTraces)
ddmux.HandleFunc("/v0.5/traces", ddr.handleTraces)
ddmux.HandleFunc("/api/v0.7/traces", ddr.handleTraces)
ddmux.HandleFunc("/v0.7/traces", ddr.handleTraces)

var err error
Expand Down Expand Up @@ -81,6 +90,90 @@ func (ddr *datadogReceiver) Shutdown(ctx context.Context) (err error) {
return ddr.server.Shutdown(ctx)
}

func readCloserFromRequest(req *http.Request) (io.ReadCloser, error) {
rc := struct {
io.Reader
io.Closer
}{
Reader: req.Body,
Closer: req.Body,
}
if req.Header.Get("Accept-Encoding") == "gzip" {
gz, err := gzip.NewReader(req.Body)
if err != nil {
return nil, err
}
defer gz.Close()
rc.Reader = gz
}
return rc, nil
}

func readAndCloseBody(resp http.ResponseWriter, req *http.Request) ([]byte, bool) {
// Check if the request body is compressed
var reader io.Reader = req.Body
if strings.Contains(req.Header.Get("Content-Encoding"), "gzip") {
// Decompress gzip
gz, err := gzip.NewReader(req.Body)
if err != nil {
fmt.Println("err", err)
// return
}
defer gz.Close()
reader = gz
} else if strings.Contains(req.Header.Get("Content-Encoding"), "deflate") {
// Decompress deflate
zlibReader, err := zlib.NewReader(req.Body)
if err != nil {
fmt.Println("err", err)
// return
}
defer zlibReader.Close()
reader = zlibReader
}

body, err := io.ReadAll(reader)
if err != nil {
fmt.Println("err", err)
return nil, false
}
if err = req.Body.Close(); err != nil {
fmt.Println("err", err)
return nil, false
}
return body, true
}

func (ddr *datadogReceiver) handleV2Traces(w http.ResponseWriter, req *http.Request) {
body, err := readAndCloseBody(w, req)
if !err {
http.Error(w, "Unable to unmarshal reqs", http.StatusBadRequest)
ddr.params.Logger.Error("Unable to unmarshal reqs")
return
}
var tracerPayload pb.AgentPayload
err1 := tracerPayload.UnmarshalVT(body)
if err1 != nil {
http.Error(w, "Unable to unmarshal reqs", http.StatusBadRequest)
ddr.params.Logger.Error("Unable to unmarshal reqs")
return
}
obsCtx := ddr.tReceiver.StartTracesOp(req.Context())
tracs := tracerPayload.GetTracerPayloads()
if len(tracs) > 0 {
otelTraces := toTraces(tracerPayload.GetTracerPayloads()[0], req)
errs := ddr.nextConsumer.ConsumeTraces(obsCtx, otelTraces)
if errs != nil {
http.Error(w, "Trace consumer errored out", http.StatusInternalServerError)
ddr.params.Logger.Error("Trace consumer errored out")
} else {
_, _ = w.Write([]byte("OK"))
}
} else {
_, _ = w.Write([]byte("OK"))
}
}

func (ddr *datadogReceiver) handleTraces(w http.ResponseWriter, req *http.Request) {
obsCtx := ddr.tReceiver.StartTracesOp(req.Context())
var err error
Expand Down
10 changes: 6 additions & 4 deletions receiver/datadogreceiver/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ func toTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
rs.SetSchemaUrl(semconv.SchemaURL)
sharedAttributes.CopyTo(rs.Resource().Attributes())
rs.Resource().Attributes().PutStr(semconv.AttributeServiceName, service)

if mwAPIKey := req.Header.Get("dd-api-key"); mwAPIKey != "" {
rs.Resource().Attributes().PutStr("mw.account_key", mwAPIKey)
}
in := rs.ScopeSpans().AppendEmpty()
in.Scope().SetName("Datadog")
in.Scope().SetVersion(payload.TracerVersion)
Expand Down Expand Up @@ -199,7 +201,7 @@ func handlePayload(req *http.Request) (tp *pb.TracerPayload, err error) {
}()

switch {
case strings.HasPrefix(req.URL.Path, "/v0.7"):
case strings.Contains(req.URL.Path, "/v0.7"):
buf := getBuffer()
defer putBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
Expand All @@ -208,7 +210,7 @@ func handlePayload(req *http.Request) (tp *pb.TracerPayload, err error) {
var tracerPayload pb.TracerPayload
_, err = tracerPayload.UnmarshalMsg(buf.Bytes())
return &tracerPayload, err
case strings.HasPrefix(req.URL.Path, "/v0.5"):
case strings.Contains(req.URL.Path, "/v0.5"):
buf := getBuffer()
defer putBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
Expand All @@ -222,7 +224,7 @@ func handlePayload(req *http.Request) (tp *pb.TracerPayload, err error) {
TracerVersion: req.Header.Get("Datadog-Meta-Tracer-Version"),
Chunks: traceChunksFromTraces(traces),
}, err
case strings.HasPrefix(req.URL.Path, "/v0.1"):
case strings.Contains(req.URL.Path, "/v0.1"):
var spans []pb.Span
if err = json.NewDecoder(req.Body).Decode(&spans); err != nil {
return nil, err
Expand Down

0 comments on commit 4b3e824

Please sign in to comment.