Skip to content

Commit

Permalink
pkg/trace/api: remove unused internal OTLP HTTP server (#14965)
Browse files Browse the repository at this point in the history
* [pkg/trace/api] Remove unused OTLP HTTP server

* [pkg/trace] Remove protocol argument

* Remove unnecessary fmt.Sprintf

* Fix tests
  • Loading branch information
mx-psi authored and val06 committed Jan 16, 2023
1 parent 2443542 commit fab4ef7
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 141 deletions.
1 change: 0 additions & 1 deletion cmd/trace-agent/config/config_otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ func TestFullYamlConfigWithOTLP(t *testing.T) {
assert.NoError(applyDatadogConfig(c))

assert.Equal("0.0.0.0", c.OTLPReceiver.BindHost)
assert.Equal(0, c.OTLPReceiver.HTTPPort)
assert.Equal(50053, c.OTLPReceiver.GRPCPort)
}
100 changes: 9 additions & 91 deletions pkg/trace/api/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
package api

import (
"compress/gzip"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net"
"net/http"
"strconv"
Expand All @@ -21,7 +19,6 @@ import (

"github.com/DataDog/datadog-agent/pkg/otlp/model/attributes"
"github.com/DataDog/datadog-agent/pkg/otlp/model/source"
"github.com/DataDog/datadog-agent/pkg/trace/api/apiutil"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/info"
"github.com/DataDog/datadog-agent/pkg/trace/log"
Expand All @@ -43,18 +40,10 @@ import (
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"

const (
// otlpProtocolHTTP specifies that the incoming connection was made over plain HTTP.
otlpProtocolHTTP = "http"
// otlpProtocolGRPC specifies that the incoming connection was made over gRPC.
otlpProtocolGRPC = "grpc"
)

// OTLPReceiver implements an OpenTelemetry Collector receiver which accepts incoming
// data on two ports for both plain HTTP and gRPC.
type OTLPReceiver struct {
wg sync.WaitGroup // waits for a graceful shutdown
httpsrv *http.Server // the running HTTP server on a started receiver, if enabled
grpcsrv *grpc.Server // the running GRPC server on a started receiver, if enabled
out chan<- *Payload // the outgoing payload channel
conf *config.AgentConfig // receiver config
Expand All @@ -69,23 +58,6 @@ func NewOTLPReceiver(out chan<- *Payload, cfg *config.AgentConfig) *OTLPReceiver
// Start starts the OTLPReceiver, if any of the servers were configured as active.
func (o *OTLPReceiver) Start() {
cfg := o.conf.OTLPReceiver
if cfg.HTTPPort != 0 {
o.httpsrv = &http.Server{
Addr: net.JoinHostPort(cfg.BindHost, strconv.Itoa(cfg.HTTPPort)),
Handler: o,
ConnContext: connContext,
}
o.wg.Add(1)
go func() {
defer o.wg.Done()
if err := o.httpsrv.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
log.Criticalf("Error starting OpenTelemetry HTTP server: %v", err)
}
}
}()
log.Debugf("Listening to core Agent for OTLP traces on internal HTTP port (http://%s:%d, internal use only). Check core Agent logs for information on the OTLP ingest status.", cfg.BindHost, cfg.HTTPPort)
}
if cfg.GRPCPort != 0 {
ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.BindHost, cfg.GRPCPort))
if err != nil {
Expand All @@ -107,15 +79,6 @@ func (o *OTLPReceiver) Start() {

// Stop stops any running server.
func (o *OTLPReceiver) Stop() {
if o.httpsrv != nil {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
go func() {
if err := o.httpsrv.Shutdown(timeout); err != nil {
log.Errorf("Error shutting down OTLP HTTP server: %v", err)
}
cancel()
}()
}
if o.grpcsrv != nil {
go o.grpcsrv.Stop()
}
Expand All @@ -126,57 +89,13 @@ func (o *OTLPReceiver) Stop() {
func (o *OTLPReceiver) Export(ctx context.Context, in ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
defer timing.Since("datadog.trace_agent.otlp.process_grpc_request_ms", time.Now())
md, _ := metadata.FromIncomingContext(ctx)
metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md), otlpProtocolGRPC), 1)
o.processRequest(ctx, otlpProtocolGRPC, http.Header(md), in)
metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md)), 1)
o.processRequest(ctx, http.Header(md), in)
return ptraceotlp.NewExportResponse(), nil
}

// ServeHTTP implements http.Handler
func (o *OTLPReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer timing.Since("datadog.trace_agent.otlp.process_http_request_ms", time.Now())
mtags := tagsFromHeaders(req.Header, otlpProtocolHTTP)
metrics.Count("datadog.trace_agent.otlp.payload", 1, mtags, 1)

r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
gzipr, err := gzip.NewReader(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:corrupt_gzip"), 1)
return
}
r = gzipr
}
rd := apiutil.NewLimitedReader(r, o.conf.OTLPReceiver.MaxRequestBytes)
slurp, err := io.ReadAll(rd)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:read_body"), 1)
return
}
metrics.Count("datadog.trace_agent.otlp.bytes", int64(len(slurp)), mtags, 1)
in := ptraceotlp.NewExportRequest()
switch getMediaType(req) {
case "application/x-protobuf":
if err := in.UnmarshalProto(slurp); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:decode_proto"), 1)
return
}
case "application/json":
fallthrough
default:
if err := in.UnmarshalJSON(slurp); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:decode_json"), 1)
return
}
}
o.processRequest(req.Context(), otlpProtocolHTTP, req.Header, in)
}

func tagsFromHeaders(h http.Header, protocol string) []string {
tags := []string{"endpoint_version:opentelemetry_" + protocol + "_v1"}
func tagsFromHeaders(h http.Header) []string {
tags := []string{"endpoint_version:opentelemetry_grpc_v1"}
if v := fastHeaderGet(h, headerLang); v != "" {
tags = append(tags, "lang:"+v)
}
Expand Down Expand Up @@ -205,17 +124,16 @@ func fastHeaderGet(h http.Header, canonicalKey string) string {
return v[0]
}

// processRequest processes the incoming request in. It marks it as received by the given protocol
// using the given headers.
func (o *OTLPReceiver) processRequest(ctx context.Context, protocol string, header http.Header, in ptraceotlp.ExportRequest) {
// processRequest processes the incoming request in.
func (o *OTLPReceiver) processRequest(ctx context.Context, header http.Header, in ptraceotlp.ExportRequest) {
for i := 0; i < in.Traces().ResourceSpans().Len(); i++ {
rspans := in.Traces().ResourceSpans().At(i)
o.ReceiveResourceSpans(ctx, rspans, header, protocol)
o.ReceiveResourceSpans(ctx, rspans, header)
}
}

// ReceiveResourceSpans processes the given rspans and returns the source that it identified from processing them.
func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header, protocol string) source.Source {
func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header) source.Source {
// each rspans is coming from a different resource and should be considered
// a separate payload; typically there is only one item in this slice
attr := rspans.Resource().Attributes()
Expand Down Expand Up @@ -254,7 +172,7 @@ func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.R
Interpreter: fastHeaderGet(header, headerLangInterpreter),
LangVendor: fastHeaderGet(header, headerLangInterpreterVendor),
TracerVersion: fmt.Sprintf("otlp-%s", rattr[string(semconv.AttributeTelemetrySDKVersion)]),
EndpointVersion: fmt.Sprintf("opentelemetry_%s_v1", protocol),
EndpointVersion: "opentelemetry_grpc_v1",
},
Stats: info.NewStats(),
}
Expand Down
58 changes: 13 additions & 45 deletions pkg/trace/api/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ func TestOTLPMetrics(t *testing.T) {
},
}).Traces().ResourceSpans()

rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{}, "")
rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{}, "")
rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{})
rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{})

calls := stats.CountCalls
assert.Equal(4, len(calls))
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
}

func TestOTLPNameRemapping(t *testing.T) {
Expand All @@ -142,7 +142,7 @@ func TestOTLPNameRemapping(t *testing.T) {
{Name: "asd"},
},
},
}).Traces().ResourceSpans().At(0), http.Header{}, "")
}).Traces().ResourceSpans().At(0), http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) {
} {
t.Run("", func(t *testing.T) {
rspans := testutil.NewOTLPTracesRequest(tt.in).Traces().ResourceSpans().At(0)
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests")
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand All @@ -363,7 +363,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) {
},
},
}).Traces().ResourceSpans().At(0)
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests")
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestOTLPHostname(t *testing.T) {
Attributes: rattr,
Spans: []*testutil.OTLPSpan{{Attributes: sattr}},
},
}).Traces().ResourceSpans().At(0), http.Header{}, "")
}).Traces().ResourceSpans().At(0), http.Header{})
assert.Equal(t, src.Kind, source.HostnameKind)
assert.Equal(t, src.Identifier, tt.out)
timeout := time.After(500 * time.Millisecond)
Expand All @@ -475,25 +475,9 @@ func TestOTLPReceiver(t *testing.T) {
o := NewOTLPReceiver(nil, config.New())
o.Start()
defer o.Stop()
assert.Nil(t, o.httpsrv)
assert.Nil(t, o.grpcsrv)
})

t.Run("Start/http", func(t *testing.T) {
port := testutil.FreeTCPPort(t)
cfg := config.New()
cfg.OTLPReceiver = &config.OTLP{
BindHost: "localhost",
HTTPPort: port,
}
o := NewOTLPReceiver(nil, cfg)
o.Start()
defer o.Stop()
assert.Nil(t, o.grpcsrv)
assert.NotNil(t, o.httpsrv)
assert.Equal(t, fmt.Sprintf("localhost:%d", port), o.httpsrv.Addr)
})

t.Run("Start/grpc", func(t *testing.T) {
port := testutil.FreeTCPPort(t)
cfg := config.New()
Expand All @@ -505,33 +489,17 @@ func TestOTLPReceiver(t *testing.T) {
o.Start()
defer o.Stop()
assert := assert.New(t)
assert.Nil(o.httpsrv)
assert.NotNil(o.grpcsrv)
svc, ok := o.grpcsrv.GetServiceInfo()["opentelemetry.proto.collector.trace.v1.TraceService"]
assert.True(ok)
assert.Equal("opentelemetry/proto/collector/trace/v1/trace_service.proto", svc.Metadata)
assert.Equal("Export", svc.Methods[0].Name)
})

t.Run("Start/http+grpc", func(t *testing.T) {
port1, port2 := testutil.FreeTCPPort(t), testutil.FreeTCPPort(t)
cfg := config.New()
cfg.OTLPReceiver = &config.OTLP{
BindHost: "localhost",
HTTPPort: port1,
GRPCPort: port2,
}
o := NewOTLPReceiver(nil, cfg)
o.Start()
defer o.Stop()
assert.NotNil(t, o.grpcsrv)
assert.NotNil(t, o.httpsrv)
})

t.Run("processRequest", func(t *testing.T) {
out := make(chan *Payload, 5)
o := NewOTLPReceiver(out, config.New())
o.processRequest(context.Background(), otlpProtocolGRPC, http.Header(map[string][]string{
o.processRequest(context.Background(), http.Header(map[string][]string{
headerLang: {"go"},
headerContainerID: {"containerdID"},
}), otlpTestTracesRequest)
Expand Down Expand Up @@ -768,7 +736,7 @@ func TestOTLPHelpers(t *testing.T) {
headerLangVersion: {"1.14"},
headerLangInterpreter: {"x"},
headerLangInterpreterVendor: {"y"},
}), otlpProtocolGRPC)
}))
assert.Equal(t, []string{"endpoint_version:opentelemetry_grpc_v1", "lang:go", "lang_version:1.14", "interpreter:x", "lang_vendor:y"}, out)
})
}
Expand Down Expand Up @@ -1287,7 +1255,7 @@ func BenchmarkProcessRequest(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
r.processRequest(context.Background(), otlpProtocolHTTP, metadata, otlpTestTracesRequest)
r.processRequest(context.Background(), metadata, otlpTestTracesRequest)
}
b.StopTimer()
end <- struct{}{}
Expand Down
4 changes: 0 additions & 4 deletions pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ type OTLP struct {
// BindHost specifies the host to bind the receiver to.
BindHost string `mapstructure:"-"`

// HTTPPort specifies the port to use for the plain HTTP receiver.
// If unset (or 0), the receiver will be off.
HTTPPort int `mapstructure:"http_port"`

// GRPCPort specifies the port to use for the plain HTTP receiver.
// If unset (or 0), the receiver will be off.
GRPCPort int `mapstructure:"grpc_port"`
Expand Down

0 comments on commit fab4ef7

Please sign in to comment.