Skip to content

Commit

Permalink
[pkg/trace] Remove protocol argument
Browse files Browse the repository at this point in the history
  • Loading branch information
mx-psi committed Jan 11, 2023
1 parent e18cf3d commit a3737b9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 28 deletions.
24 changes: 9 additions & 15 deletions pkg/trace/api/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ import (
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"

const (
// otlpProtocolGRPC specifies that the incoming connection was made over gRPC.
otlpProtocolGRPC = "grpc"
)

// OTLPReceiver implements an OpenTelemetry Collector receiver which accepts incoming
// data on two ports for both plain HTTP and gRPC.
type OTLPReceiver struct {
Expand Down Expand Up @@ -94,13 +89,13 @@ func (o *OTLPReceiver) Stop() {
func (o *OTLPReceiver) Export(ctx context.Context, in ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
defer timing.Since("datadog.trace_agent.otlp.process_grpc_request_ms", time.Now())
md, _ := metadata.FromIncomingContext(ctx)
metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md), otlpProtocolGRPC), 1)
o.processRequest(ctx, otlpProtocolGRPC, http.Header(md), in)
metrics.Count("datadog.trace_agent.otlp.payload", 1, tagsFromHeaders(http.Header(md)), 1)
o.processRequest(ctx, http.Header(md), in)
return ptraceotlp.NewExportResponse(), nil
}

func tagsFromHeaders(h http.Header, protocol string) []string {
tags := []string{"endpoint_version:opentelemetry_" + protocol + "_v1"}
func tagsFromHeaders(h http.Header) []string {
tags := []string{"endpoint_version:opentelemetry_grpc_v1"}
if v := fastHeaderGet(h, headerLang); v != "" {
tags = append(tags, "lang:"+v)
}
Expand Down Expand Up @@ -129,17 +124,16 @@ func fastHeaderGet(h http.Header, canonicalKey string) string {
return v[0]
}

// processRequest processes the incoming request in. It marks it as received by the given protocol
// using the given headers.
func (o *OTLPReceiver) processRequest(ctx context.Context, protocol string, header http.Header, in ptraceotlp.ExportRequest) {
// processRequest processes the incoming request in.
func (o *OTLPReceiver) processRequest(ctx context.Context, header http.Header, in ptraceotlp.ExportRequest) {
for i := 0; i < in.Traces().ResourceSpans().Len(); i++ {
rspans := in.Traces().ResourceSpans().At(i)
o.ReceiveResourceSpans(ctx, rspans, header, protocol)
o.ReceiveResourceSpans(ctx, rspans, header)
}
}

// ReceiveResourceSpans processes the given rspans and returns the source that it identified from processing them.
func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header, protocol string) source.Source {
func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, header http.Header) source.Source {
// each rspans is coming from a different resource and should be considered
// a separate payload; typically there is only one item in this slice
attr := rspans.Resource().Attributes()
Expand Down Expand Up @@ -178,7 +172,7 @@ func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.R
Interpreter: fastHeaderGet(header, headerLangInterpreter),
LangVendor: fastHeaderGet(header, headerLangInterpreterVendor),
TracerVersion: fmt.Sprintf("otlp-%s", rattr[string(semconv.AttributeTelemetrySDKVersion)]),
EndpointVersion: fmt.Sprintf("opentelemetry_%s_v1", protocol),
EndpointVersion: fmt.Sprintf("opentelemetry_grpc_v1"),
},
Stats: info.NewStats(),
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/trace/api/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ func TestOTLPMetrics(t *testing.T) {
},
}).Traces().ResourceSpans()

rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{}, "")
rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{}, "")
rcv.ReceiveResourceSpans(context.Background(), rspans.At(0), http.Header{})
rcv.ReceiveResourceSpans(context.Background(), rspans.At(1), http.Header{})

calls := stats.CountCalls
assert.Equal(4, len(calls))
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry__v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 3, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.spans", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 1, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
assert.Contains(calls, teststatsd.MetricsArgs{Name: "datadog.trace_agent.otlp.traces", Value: 2, Tags: []string{"tracer_version:otlp-", "endpoint_version:opentelemetry_grpc_v1"}, Rate: 1})
}

func TestOTLPNameRemapping(t *testing.T) {
Expand All @@ -142,7 +142,7 @@ func TestOTLPNameRemapping(t *testing.T) {
{Name: "asd"},
},
},
}).Traces().ResourceSpans().At(0), http.Header{}, "")
}).Traces().ResourceSpans().At(0), http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) {
} {
t.Run("", func(t *testing.T) {
rspans := testutil.NewOTLPTracesRequest(tt.in).Traces().ResourceSpans().At(0)
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests")
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand All @@ -363,7 +363,7 @@ func TestOTLPReceiveResourceSpans(t *testing.T) {
},
},
}).Traces().ResourceSpans().At(0)
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{}, "agent_tests")
rcv.ReceiveResourceSpans(context.Background(), rspans, http.Header{})
timeout := time.After(500 * time.Millisecond)
select {
case <-timeout:
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestOTLPHostname(t *testing.T) {
Attributes: rattr,
Spans: []*testutil.OTLPSpan{{Attributes: sattr}},
},
}).Traces().ResourceSpans().At(0), http.Header{}, "")
}).Traces().ResourceSpans().At(0), http.Header{})
assert.Equal(t, src.Kind, source.HostnameKind)
assert.Equal(t, src.Identifier, tt.out)
timeout := time.After(500 * time.Millisecond)
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestOTLPReceiver(t *testing.T) {
t.Run("processRequest", func(t *testing.T) {
out := make(chan *Payload, 5)
o := NewOTLPReceiver(out, config.New())
o.processRequest(context.Background(), otlpProtocolGRPC, http.Header(map[string][]string{
o.processRequest(context.Background(), http.Header(map[string][]string{
headerLang: {"go"},
headerContainerID: {"containerdID"},
}), otlpTestTracesRequest)
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestOTLPHelpers(t *testing.T) {
headerLangVersion: {"1.14"},
headerLangInterpreter: {"x"},
headerLangInterpreterVendor: {"y"},
}), otlpProtocolGRPC)
}))
assert.Equal(t, []string{"endpoint_version:opentelemetry_grpc_v1", "lang:go", "lang_version:1.14", "interpreter:x", "lang_vendor:y"}, out)
})
}
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func BenchmarkProcessRequest(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
r.processRequest(context.Background(), otlpProtocolGRPC, metadata, otlpTestTracesRequest)
r.processRequest(context.Background(), metadata, otlpTestTracesRequest)
}
b.StopTimer()
end <- struct{}{}
Expand Down

0 comments on commit a3737b9

Please sign in to comment.