diff --git a/cmd/trace-agent/config/config_otlp_test.go b/cmd/trace-agent/config/config_otlp_test.go index 43658bcb2003a..e33f65fb42d73 100644 --- a/cmd/trace-agent/config/config_otlp_test.go +++ b/cmd/trace-agent/config/config_otlp_test.go @@ -32,6 +32,5 @@ func TestFullYamlConfigWithOTLP(t *testing.T) { assert.NoError(applyDatadogConfig(c)) assert.Equal("0.0.0.0", c.OTLPReceiver.BindHost) - assert.Equal(0, c.OTLPReceiver.HTTPPort) assert.Equal(50053, c.OTLPReceiver.GRPCPort) } diff --git a/pkg/trace/api/otlp.go b/pkg/trace/api/otlp.go index d655962227359..c2765e561b1cf 100644 --- a/pkg/trace/api/otlp.go +++ b/pkg/trace/api/otlp.go @@ -6,12 +6,10 @@ package api import ( - "compress/gzip" "context" "encoding/binary" "encoding/hex" "fmt" - "io" "net" "net/http" "strconv" @@ -21,7 +19,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/otlp/model/attributes" "github.com/DataDog/datadog-agent/pkg/otlp/model/source" - "github.com/DataDog/datadog-agent/pkg/trace/api/apiutil" "github.com/DataDog/datadog-agent/pkg/trace/config" "github.com/DataDog/datadog-agent/pkg/trace/info" "github.com/DataDog/datadog-agent/pkg/trace/log" @@ -43,18 +40,10 @@ import ( // computed for the resource spans. const keyStatsComputed = "_dd.stats_computed" -const ( - // otlpProtocolHTTP specifies that the incoming connection was made over plain HTTP. - otlpProtocolHTTP = "http" - // otlpProtocolGRPC specifies that the incoming connection was made over gRPC. - otlpProtocolGRPC = "grpc" -) - // OTLPReceiver implements an OpenTelemetry Collector receiver which accepts incoming // data on two ports for both plain HTTP and gRPC. type OTLPReceiver struct { wg sync.WaitGroup // waits for a graceful shutdown - httpsrv *http.Server // the running HTTP server on a started receiver, if enabled grpcsrv *grpc.Server // the running GRPC server on a started receiver, if enabled out chan<- *Payload // the outgoing payload channel conf *config.AgentConfig // receiver config @@ -69,23 +58,6 @@ func NewOTLPReceiver(out chan<- *Payload, cfg *config.AgentConfig) *OTLPReceiver // Start starts the OTLPReceiver, if any of the servers were configured as active. func (o *OTLPReceiver) Start() { cfg := o.conf.OTLPReceiver - if cfg.HTTPPort != 0 { - o.httpsrv = &http.Server{ - Addr: net.JoinHostPort(cfg.BindHost, strconv.Itoa(cfg.HTTPPort)), - Handler: o, - ConnContext: connContext, - } - o.wg.Add(1) - go func() { - defer o.wg.Done() - if err := o.httpsrv.ListenAndServe(); err != nil { - if err != http.ErrServerClosed { - log.Criticalf("Error starting OpenTelemetry HTTP server: %v", err) - } - } - }() - log.Debugf("Listening to core Agent for OTLP traces on internal HTTP port (http://%s:%d, internal use only). Check core Agent logs for information on the OTLP ingest status.", cfg.BindHost, cfg.HTTPPort) - } if cfg.GRPCPort != 0 { ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.BindHost, cfg.GRPCPort)) if err != nil { @@ -107,15 +79,6 @@ func (o *OTLPReceiver) Start() { // Stop stops any running server. func (o *OTLPReceiver) Stop() { - if o.httpsrv != nil { - timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second) - go func() { - if err := o.httpsrv.Shutdown(timeout); err != nil { - log.Errorf("Error shutting down OTLP HTTP server: %v", err) - } - cancel() - }() - } if o.grpcsrv != nil { go o.grpcsrv.Stop() } @@ -126,57 +89,13 @@ func (o *OTLPReceiver) Stop() { func (o *OTLPReceiver) Export(ctx context.Context, in ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { defer timing.Since("datadog.trace_agent.otlp.process_grpc_request_ms", time.Now()) md, _ := metadata.FromIncomingContext(ctx) - metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md), otlpProtocolGRPC), 1) - o.processRequest(ctx, otlpProtocolGRPC, http.Header(md), in) + metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md)), 1) + o.processRequest(ctx, http.Header(md), in) return ptraceotlp.NewExportResponse(), nil } -// ServeHTTP implements http.Handler -func (o *OTLPReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) { - defer timing.Since("datadog.trace_agent.otlp.process_http_request_ms", time.Now()) - mtags := tagsFromHeaders(req.Header, otlpProtocolHTTP) - metrics.Count("datadog.trace_agent.otlp.payload", 1, mtags, 1) - - r := req.Body - if req.Header.Get("Content-Encoding") == "gzip" { - gzipr, err := gzip.NewReader(r) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:corrupt_gzip"), 1) - return - } - r = gzipr - } - rd := apiutil.NewLimitedReader(r, o.conf.OTLPReceiver.MaxRequestBytes) - slurp, err := io.ReadAll(rd) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:read_body"), 1) - return - } - metrics.Count("datadog.trace_agent.otlp.bytes", int64(len(slurp)), mtags, 1) - in := ptraceotlp.NewExportRequest() - switch getMediaType(req) { - case "application/x-protobuf": - if err := in.UnmarshalProto(slurp); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:decode_proto"), 1) - return - } - case "application/json": - fallthrough - default: - if err := in.UnmarshalJSON(slurp); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:decode_json"), 1) - return - } - } - o.processRequest(req.Context(), otlpProtocolHTTP, req.Header, in) -} - -func tagsFromHeaders(h http.Header, protocol string) []string { - tags := []string{"endpoint_version:opentelemetry_" + protocol + "_v1"} +func tagsFromHeaders(h http.Header) []string { + tags := []string{"endpoint_version:opentelemetry_grpc_v1"} if v := fastHeaderGet(h, headerLang); v != "" { tags = append(tags, "lang:"+v) } @@ -205,17 +124,16 @@ func fastHeaderGet(h http.Header, canonicalKey string) string { return v[0] } -// processRequest processes the incoming request in. It marks it as received by the given protocol -// using the given headers. -func (o *OTLPReceiver) processRequest(ctx context.Context, protocol string, header http.Header, in ptraceotlp.ExportRequest) { +// processRequest processes the incoming request in. +func (o *OTLPReceiver) processRequest(ctx context.Context, header http.Header, in ptraceotlp.ExportRequest) { for i := 0; i < in.Traces().ResourceSpans().Len(); i++ { rspans := in.Traces().ResourceSpans().At(i) - o.ReceiveResourceSpans(ctx, rspans, header, protocol) + o.ReceiveResourceSpans(ctx, rspans, header) } } // ReceiveResourceSpans processes the given rspans and returns the source that it identified from processing them. -func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header, protocol string) source.Source { +func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header) source.Source { // each rspans is coming from a different resource and should be considered // a separate payload; typically there is only one item in this slice attr := rspans.Resource().Attributes() @@ -254,7 +172,7 @@ func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.R Interpreter: fastHeaderGet(header, headerLangInterpreter), LangVendor: fastHeaderGet(header, headerLangInterpreterVendor), TracerVersion: fmt.Sprintf("otlp-%s", rattr[string(semconv.AttributeTelemetrySDKVersion)]), - EndpointVersion: fmt.Sprintf("opentelemetry_%s_v1", protocol), + EndpointVersion: "opentelemetry_grpc_v1", }, Stats: info.NewStats(), } diff --git a/pkg/trace/api/otlp_test.go b/pkg/trace/api/otlp_test.go index ecb1b832de003..f0c0b81cbf246 100644 --- a/pkg/trace/api/otlp_test.go +++ b/pkg/trace/api/otlp_test.go @@ -117,15 +117,15 @@ func TestOTLPMetrics(t *testing.T) { }, }).Traces().ResourceSpans() - rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{}, "") - rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{}, "") + rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{}) + rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{}) calls := stats.CountCalls assert.Equal(4, len(calls)) - assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1}) - assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1}) - assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1}) - assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1}) + assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1}) + assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1}) + assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1}) + assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1}) } func TestOTLPNameRemapping(t *testing.T) { @@ -142,7 +142,7 @@ func TestOTLPNameRemapping(t *testing.T) { {Name: "asd"}, }, }, - }).Traces().ResourceSpans().At(0), http.Header{}, "") + }).Traces().ResourceSpans().At(0), http.Header{}) timeout := time.After(500 * time.Millisecond) select { case <-timeout: @@ -341,7 +341,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) { } { t.Run("", func(t *testing.T) { rspans := testutil.NewOTLPTracesRequest(tt.in).Traces().ResourceSpans().At(0) - rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests") + rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}) timeout := time.After(500 * time.Millisecond) select { case <-timeout: @@ -363,7 +363,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) { }, }, }).Traces().ResourceSpans().At(0) - rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests") + rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}) timeout := time.After(500 * time.Millisecond) select { case <-timeout: @@ -452,7 +452,7 @@ func TestOTLPHostname(t *testing.T) { Attributes: rattr, Spans: []*testutil.OTLPSpan{{Attributes: sattr}}, }, - }).Traces().ResourceSpans().At(0), http.Header{}, "") + }).Traces().ResourceSpans().At(0), http.Header{}) assert.Equal(t, src.Kind, source.HostnameKind) assert.Equal(t, src.Identifier, tt.out) timeout := time.After(500 * time.Millisecond) @@ -475,25 +475,9 @@ func TestOTLPReceiver(t *testing.T) { o := NewOTLPReceiver(nil, config.New()) o.Start() defer o.Stop() - assert.Nil(t, o.httpsrv) assert.Nil(t, o.grpcsrv) }) - t.Run("Start/http", func(t *testing.T) { - port := testutil.FreeTCPPort(t) - cfg := config.New() - cfg.OTLPReceiver = &config.OTLP{ - BindHost: "localhost", - HTTPPort: port, - } - o := NewOTLPReceiver(nil, cfg) - o.Start() - defer o.Stop() - assert.Nil(t, o.grpcsrv) - assert.NotNil(t, o.httpsrv) - assert.Equal(t, fmt.Sprintf("localhost:%d", port), o.httpsrv.Addr) - }) - t.Run("Start/grpc", func(t *testing.T) { port := testutil.FreeTCPPort(t) cfg := config.New() @@ -505,7 +489,6 @@ func TestOTLPReceiver(t *testing.T) { o.Start() defer o.Stop() assert := assert.New(t) - assert.Nil(o.httpsrv) assert.NotNil(o.grpcsrv) svc, ok := o.grpcsrv.GetServiceInfo()["opentelemetry.proto.collector.trace.v1.TraceService"] assert.True(ok) @@ -513,25 +496,10 @@ func TestOTLPReceiver(t *testing.T) { assert.Equal("Export", svc.Methods[0].Name) }) - t.Run("Start/http+grpc", func(t *testing.T) { - port1, port2 := testutil.FreeTCPPort(t), testutil.FreeTCPPort(t) - cfg := config.New() - cfg.OTLPReceiver = &config.OTLP{ - BindHost: "localhost", - HTTPPort: port1, - GRPCPort: port2, - } - o := NewOTLPReceiver(nil, cfg) - o.Start() - defer o.Stop() - assert.NotNil(t, o.grpcsrv) - assert.NotNil(t, o.httpsrv) - }) - t.Run("processRequest", func(t *testing.T) { out := make(chan *Payload, 5) o := NewOTLPReceiver(out, config.New()) - o.processRequest(context.Background(), otlpProtocolGRPC, http.Header(map[string][]string{ + o.processRequest(context.Background(), http.Header(map[string][]string{ headerLang: {"go"}, headerContainerID: {"containerdID"}, }), otlpTestTracesRequest) @@ -768,7 +736,7 @@ func TestOTLPHelpers(t *testing.T) { headerLangVersion: {"1.14"}, headerLangInterpreter: {"x"}, headerLangInterpreterVendor: {"y"}, - }), otlpProtocolGRPC) + })) assert.Equal(t, []string{"endpoint_version:opentelemetry_grpc_v1", "lang:go", "lang_version:1.14", "interpreter:x", "lang_vendor:y"}, out) }) } @@ -1287,7 +1255,7 @@ func BenchmarkProcessRequest(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - r.processRequest(context.Background(), otlpProtocolHTTP, metadata, otlpTestTracesRequest) + r.processRequest(context.Background(), metadata, otlpTestTracesRequest) } b.StopTimer() end <- struct{}{} diff --git a/pkg/trace/config/config.go b/pkg/trace/config/config.go index cd281d21a3949..2dd7e2b867769 100644 --- a/pkg/trace/config/config.go +++ b/pkg/trace/config/config.go @@ -46,10 +46,6 @@ type OTLP struct { // BindHost specifies the host to bind the receiver to. BindHost string `mapstructure:"-"` - // HTTPPort specifies the port to use for the plain HTTP receiver. - // If unset (or 0), the receiver will be off. - HTTPPort int `mapstructure:"http_port"` - // GRPCPort specifies the port to use for the plain HTTP receiver. // If unset (or 0), the receiver will be off. GRPCPort int `mapstructure:"grpc_port"`