Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/trace/api: remove unused internal OTLP HTTP server #14965

Merged
merged 4 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/trace-agent/config/config_otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ func TestFullYamlConfigWithOTLP(t *testing.T) {
assert.NoError(applyDatadogConfig(c))

assert.Equal("0.0.0.0", c.OTLPReceiver.BindHost)
assert.Equal(0, c.OTLPReceiver.HTTPPort)
assert.Equal(50053, c.OTLPReceiver.GRPCPort)
}
100 changes: 9 additions & 91 deletions pkg/trace/api/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
package api

import (
"compress/gzip"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net"
"net/http"
"strconv"
Expand All @@ -21,7 +19,6 @@ import (

"github.com/DataDog/datadog-agent/pkg/otlp/model/attributes"
"github.com/DataDog/datadog-agent/pkg/otlp/model/source"
"github.com/DataDog/datadog-agent/pkg/trace/api/apiutil"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/info"
"github.com/DataDog/datadog-agent/pkg/trace/log"
Expand All @@ -43,18 +40,10 @@ import (
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"

const (
// otlpProtocolHTTP specifies that the incoming connection was made over plain HTTP.
otlpProtocolHTTP = "http"
// otlpProtocolGRPC specifies that the incoming connection was made over gRPC.
otlpProtocolGRPC = "grpc"
)

// OTLPReceiver implements an OpenTelemetry Collector receiver which accepts incoming
// data on two ports for both plain HTTP and gRPC.
type OTLPReceiver struct {
wg sync.WaitGroup // waits for a graceful shutdown
httpsrv *http.Server // the running HTTP server on a started receiver, if enabled
grpcsrv *grpc.Server // the running GRPC server on a started receiver, if enabled
out chan<- *Payload // the outgoing payload channel
conf *config.AgentConfig // receiver config
Expand All @@ -69,23 +58,6 @@ func NewOTLPReceiver(out chan<- *Payload, cfg *config.AgentConfig) *OTLPReceiver
// Start starts the OTLPReceiver, if any of the servers were configured as active.
func (o *OTLPReceiver) Start() {
cfg := o.conf.OTLPReceiver
if cfg.HTTPPort != 0 {
o.httpsrv = &http.Server{
Addr: net.JoinHostPort(cfg.BindHost, strconv.Itoa(cfg.HTTPPort)),
Handler: o,
ConnContext: connContext,
}
o.wg.Add(1)
go func() {
defer o.wg.Done()
if err := o.httpsrv.ListenAndServe(); err != nil {
if err != http.ErrServerClosed {
log.Criticalf("Error starting OpenTelemetry HTTP server: %v", err)
}
}
}()
log.Debugf("Listening to core Agent for OTLP traces on internal HTTP port (http://%s:%d, internal use only). Check core Agent logs for information on the OTLP ingest status.", cfg.BindHost, cfg.HTTPPort)
}
if cfg.GRPCPort != 0 {
ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.BindHost, cfg.GRPCPort))
if err != nil {
Expand All @@ -107,15 +79,6 @@ func (o *OTLPReceiver) Start() {

// Stop stops any running server.
func (o *OTLPReceiver) Stop() {
if o.httpsrv != nil {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
go func() {
if err := o.httpsrv.Shutdown(timeout); err != nil {
log.Errorf("Error shutting down OTLP HTTP server: %v", err)
}
cancel()
}()
}
if o.grpcsrv != nil {
go o.grpcsrv.Stop()
}
Expand All @@ -126,57 +89,13 @@ func (o *OTLPReceiver) Stop() {
func (o *OTLPReceiver) Export(ctx context.Context, in ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
defer timing.Since("datadog.trace_agent.otlp.process_grpc_request_ms", time.Now())
md, _ := metadata.FromIncomingContext(ctx)
metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md), otlpProtocolGRPC), 1)
o.processRequest(ctx, otlpProtocolGRPC, http.Header(md), in)
metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md)), 1)
o.processRequest(ctx, http.Header(md), in)
return ptraceotlp.NewExportResponse(), nil
}

// ServeHTTP implements http.Handler
func (o *OTLPReceiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer timing.Since("datadog.trace_agent.otlp.process_http_request_ms", time.Now())
mtags := tagsFromHeaders(req.Header, otlpProtocolHTTP)
metrics.Count("datadog.trace_agent.otlp.payload", 1, mtags, 1)

r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
gzipr, err := gzip.NewReader(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:corrupt_gzip"), 1)
return
}
r = gzipr
}
rd := apiutil.NewLimitedReader(r, o.conf.OTLPReceiver.MaxRequestBytes)
slurp, err := io.ReadAll(rd)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:read_body"), 1)
return
}
metrics.Count("datadog.trace_agent.otlp.bytes", int64(len(slurp)), mtags, 1)
in := ptraceotlp.NewExportRequest()
switch getMediaType(req) {
case "application/x-protobuf":
if err := in.UnmarshalProto(slurp); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:decode_proto"), 1)
return
}
case "application/json":
fallthrough
default:
if err := in.UnmarshalJSON(slurp); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
metrics.Count("datadog.trace_agent.otlp.error", 1, append(mtags, "reason:decode_json"), 1)
return
}
}
o.processRequest(req.Context(), otlpProtocolHTTP, req.Header, in)
}

func tagsFromHeaders(h http.Header, protocol string) []string {
tags := []string{"endpoint_version:opentelemetry_" + protocol + "_v1"}
func tagsFromHeaders(h http.Header) []string {
tags := []string{"endpoint_version:opentelemetry_grpc_v1"}
if v := fastHeaderGet(h, headerLang); v != "" {
tags = append(tags, "lang:"+v)
}
Expand Down Expand Up @@ -205,17 +124,16 @@ func fastHeaderGet(h http.Header, canonicalKey string) string {
return v[0]
}

// processRequest processes the incoming request in. It marks it as received by the given protocol
// using the given headers.
func (o *OTLPReceiver) processRequest(ctx context.Context, protocol string, header http.Header, in ptraceotlp.ExportRequest) {
// processRequest processes the incoming request in.
func (o *OTLPReceiver) processRequest(ctx context.Context, header http.Header, in ptraceotlp.ExportRequest) {
for i := 0; i < in.Traces().ResourceSpans().Len(); i++ {
rspans := in.Traces().ResourceSpans().At(i)
o.ReceiveResourceSpans(ctx, rspans, header, protocol)
o.ReceiveResourceSpans(ctx, rspans, header)
}
}

// ReceiveResourceSpans processes the given rspans and returns the source that it identified from processing them.
func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header, protocol string) source.Source {
func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header) source.Source {
// each rspans is coming from a different resource and should be considered
// a separate payload; typically there is only one item in this slice
attr := rspans.Resource().Attributes()
Expand Down Expand Up @@ -254,7 +172,7 @@ func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.R
Interpreter: fastHeaderGet(header, headerLangInterpreter),
LangVendor: fastHeaderGet(header, headerLangInterpreterVendor),
TracerVersion: fmt.Sprintf("otlp-%s", rattr[string(semconv.AttributeTelemetrySDKVersion)]),
EndpointVersion: fmt.Sprintf("opentelemetry_%s_v1", protocol),
EndpointVersion: "opentelemetry_grpc_v1",
},
Stats: info.NewStats(),
}
Expand Down
58 changes: 13 additions & 45 deletions pkg/trace/api/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ func TestOTLPMetrics(t *testing.T) {
},
}).Traces().ResourceSpans()

rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{}, "")
rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{}, "")
rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{})
rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{})

calls := stats.CountCalls
assert.Equal(4, len(calls))
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
}

func TestOTLPNameRemapping(t *testing.T) {
Expand All @@ -142,7 +142,7 @@ func TestOTLPNameRemapping(t *testing.T) {
{Name: "asd"},
},
},
}).Traces().ResourceSpans().At(0), http.Header{}, "")
}).Traces().ResourceSpans().At(0), http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) {
} {
t.Run("", func(t *testing.T) {
rspans := testutil.NewOTLPTracesRequest(tt.in).Traces().ResourceSpans().At(0)
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests")
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand All @@ -363,7 +363,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) {
},
},
}).Traces().ResourceSpans().At(0)
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests")
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestOTLPHostname(t *testing.T) {
Attributes: rattr,
Spans: []*testutil.OTLPSpan{{Attributes: sattr}},
},
}).Traces().ResourceSpans().At(0), http.Header{}, "")
}).Traces().ResourceSpans().At(0), http.Header{})
assert.Equal(t, src.Kind, source.HostnameKind)
assert.Equal(t, src.Identifier, tt.out)
timeout := time.After(500 * time.Millisecond)
Expand All @@ -475,25 +475,9 @@ func TestOTLPReceiver(t *testing.T) {
o := NewOTLPReceiver(nil, config.New())
o.Start()
defer o.Stop()
assert.Nil(t, o.httpsrv)
assert.Nil(t, o.grpcsrv)
})

t.Run("Start/http", func(t *testing.T) {
port := testutil.FreeTCPPort(t)
cfg := config.New()
cfg.OTLPReceiver = &config.OTLP{
BindHost: "localhost",
HTTPPort: port,
}
o := NewOTLPReceiver(nil, cfg)
o.Start()
defer o.Stop()
assert.Nil(t, o.grpcsrv)
assert.NotNil(t, o.httpsrv)
assert.Equal(t, fmt.Sprintf("localhost:%d", port), o.httpsrv.Addr)
})

t.Run("Start/grpc", func(t *testing.T) {
port := testutil.FreeTCPPort(t)
cfg := config.New()
Expand All @@ -505,33 +489,17 @@ func TestOTLPReceiver(t *testing.T) {
o.Start()
defer o.Stop()
assert := assert.New(t)
assert.Nil(o.httpsrv)
assert.NotNil(o.grpcsrv)
svc, ok := o.grpcsrv.GetServiceInfo()["opentelemetry.proto.collector.trace.v1.TraceService"]
assert.True(ok)
assert.Equal("opentelemetry/proto/collector/trace/v1/trace_service.proto", svc.Metadata)
assert.Equal("Export", svc.Methods[0].Name)
})

t.Run("Start/http+grpc", func(t *testing.T) {
port1, port2 := testutil.FreeTCPPort(t), testutil.FreeTCPPort(t)
cfg := config.New()
cfg.OTLPReceiver = &config.OTLP{
BindHost: "localhost",
HTTPPort: port1,
GRPCPort: port2,
}
o := NewOTLPReceiver(nil, cfg)
o.Start()
defer o.Stop()
assert.NotNil(t, o.grpcsrv)
assert.NotNil(t, o.httpsrv)
})

t.Run("processRequest", func(t *testing.T) {
out := make(chan *Payload, 5)
o := NewOTLPReceiver(out, config.New())
o.processRequest(context.Background(), otlpProtocolGRPC, http.Header(map[string][]string{
o.processRequest(context.Background(), http.Header(map[string][]string{
headerLang: {"go"},
headerContainerID: {"containerdID"},
}), otlpTestTracesRequest)
Expand Down Expand Up @@ -768,7 +736,7 @@ func TestOTLPHelpers(t *testing.T) {
headerLangVersion: {"1.14"},
headerLangInterpreter: {"x"},
headerLangInterpreterVendor: {"y"},
}), otlpProtocolGRPC)
}))
assert.Equal(t, []string{"endpoint_version:opentelemetry_grpc_v1", "lang:go", "lang_version:1.14", "interpreter:x", "lang_vendor:y"}, out)
})
}
Expand Down Expand Up @@ -1287,7 +1255,7 @@ func BenchmarkProcessRequest(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
r.processRequest(context.Background(), otlpProtocolHTTP, metadata, otlpTestTracesRequest)
r.processRequest(context.Background(), metadata, otlpTestTracesRequest)
}
b.StopTimer()
end <- struct{}{}
Expand Down
4 changes: 0 additions & 4 deletions pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ type OTLP struct {
// BindHost specifies the host to bind the receiver to.
BindHost string `mapstructure:"-"`

// HTTPPort specifies the port to use for the plain HTTP receiver.
// If unset (or 0), the receiver will be off.
HTTPPort int `mapstructure:"http_port"`

// GRPCPort specifies the port to use for the plain HTTP receiver.
// If unset (or 0), the receiver will be off.
GRPCPort int `mapstructure:"grpc_port"`
Expand Down