From 34beafa5100513ba67252ebb12ce3e2555b44e6b Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 16 Mar 2021 19:26:10 -0700 Subject: [PATCH] Add TracesWrapper to dissallow access to internal representation Signed-off-by: Bogdan Drutu --- consumer/pdata/trace.go | 67 ++++----- consumer/pdata/trace_test.go | 70 +++++----- exporter/fileexporter/file_exporter.go | 6 +- exporter/fileexporter/file_exporter_test.go | 6 +- .../kafkaexporter/otlp_marshaller_test.go | 3 +- exporter/otlpexporter/otlp.go | 5 +- exporter/otlpexporter/otlp_test.go | 11 +- internal/goldendataset/traces_generator.go | 7 +- internal/otlp_wrapper.go | 16 +++ internal/testdata/trace.go | 127 ++++++++++-------- internal/testdata/trace_test.go | 11 +- receiver/kafkareceiver/otlp_unmarshaller.go | 4 +- .../kafkareceiver/otlp_unmarshaller_test.go | 3 +- receiver/otlpreceiver/marshal_jsonpb_test.go | 10 +- receiver/otlpreceiver/otlp_test.go | 34 ++--- receiver/otlpreceiver/trace/otlp.go | 3 +- receiver/otlpreceiver/trace/otlp_test.go | 39 +++--- testbed/testbed/data_providers.go | 4 +- testbed/testbed/validator.go | 3 +- .../internaldata/resource_to_oc_test.go | 6 +- 20 files changed, 221 insertions(+), 214 deletions(-) diff --git a/consumer/pdata/trace.go b/consumer/pdata/trace.go index a26fa18a7fb..601d6ee150f 100644 --- a/consumer/pdata/trace.go +++ b/consumer/pdata/trace.go @@ -15,6 +15,7 @@ package pdata import ( + "go.opentelemetry.io/collector/internal" otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" ) @@ -23,52 +24,49 @@ import ( // Traces is the top-level struct that is propagated through the traces pipeline. type Traces struct { - orig *[]*otlptrace.ResourceSpans + orig *otlpcollectortrace.ExportTraceServiceRequest } // NewTraces creates a new Traces. func NewTraces() Traces { - orig := []*otlptrace.ResourceSpans(nil) - return Traces{&orig} + return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{}} } -// TracesFromOtlp creates the internal Traces representation from the OTLP. -func TracesFromOtlp(orig []*otlptrace.ResourceSpans) Traces { - return Traces{&orig} +// TracesFromInternalRep creates Traces from the internal representation. +// Should not be used outside this module. +func TracesFromInternalRep(wrapper internal.TracesWrapper) Traces { + return Traces{orig: internal.TracesToOtlp(wrapper)} } -// TracesToOtlp converts the internal Traces to the OTLP. -func TracesToOtlp(td Traces) []*otlptrace.ResourceSpans { - return *td.orig +// TracesFromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest +// ProtoBuf bytes to the internal Traces. +// +// Returns an invalid Traces instance if error is not nil. +func TracesFromOtlpProtoBytes(data []byte) (Traces, error) { + req := otlpcollectortrace.ExportTraceServiceRequest{} + if err := req.Unmarshal(data); err != nil { + return Traces{}, err + } + return Traces{orig: &req}, nil +} + +// InternalRep returns internal representation of the Traces. +// Should not be used outside this module. +func (td Traces) InternalRep() internal.TracesWrapper { + return internal.TracesFromOtlp(td.orig) } // ToOtlpProtoBytes converts the internal Traces to OTLP Collector // ExportTraceServiceRequest ProtoBuf bytes. func (td Traces) ToOtlpProtoBytes() ([]byte, error) { - traces := otlpcollectortrace.ExportTraceServiceRequest{ - ResourceSpans: *td.orig, - } - return traces.Marshal() -} - -// FromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest -// ProtoBuf bytes to the internal Traces. Overrides current data. -// Calling this function on zero-initialized structure causes panic. -// Use it with NewTraces or on existing initialized Traces. -func (td Traces) FromOtlpProtoBytes(data []byte) error { - traces := otlpcollectortrace.ExportTraceServiceRequest{} - if err := traces.Unmarshal(data); err != nil { - return err - } - *td.orig = traces.ResourceSpans - return nil + return td.orig.Marshal() } // Clone returns a copy of Traces. func (td Traces) Clone() Traces { - rss := NewResourceSpansSlice() - td.ResourceSpans().CopyTo(rss) - return Traces(rss) + cloneTd := NewTraces() + td.ResourceSpans().CopyTo(cloneTd.ResourceSpans()) + return cloneTd } // SpanCount calculates the total number of spans. @@ -87,18 +85,11 @@ func (td Traces) SpanCount() int { // Size returns size in bytes. func (td Traces) Size() int { - size := 0 - for i := 0; i < len(*td.orig); i++ { - if (*td.orig)[i] == nil { - continue - } - size += (*td.orig)[i].Size() - } - return size + return td.orig.Size() } func (td Traces) ResourceSpans() ResourceSpansSlice { - return newResourceSpansSlice(td.orig) + return newResourceSpansSlice(&td.orig.ResourceSpans) } // TraceState in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header diff --git a/consumer/pdata/trace_test.go b/consumer/pdata/trace_test.go index 07afd4972e6..7ab095182fe 100644 --- a/consumer/pdata/trace_test.go +++ b/consumer/pdata/trace_test.go @@ -23,6 +23,8 @@ import ( goproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" + "go.opentelemetry.io/collector/internal" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" ) @@ -50,46 +52,47 @@ func TestSpanCount(t *testing.T) { } func TestSize(t *testing.T) { - md := NewTraces() - assert.Equal(t, 0, md.Size()) - rms := md.ResourceSpans() + td := NewTraces() + assert.Equal(t, 0, td.Size()) + rms := td.ResourceSpans() rms.Resize(1) rms.At(0).InstrumentationLibrarySpans().Resize(1) rms.At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) rms.At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo") - otlp := TracesToOtlp(md) - size := 0 - sizeBytes := 0 - for _, rspans := range otlp { - size += rspans.Size() - bts, err := rspans.Marshal() - require.NoError(t, err) - sizeBytes += len(bts) - } - assert.Equal(t, size, md.Size()) - assert.Equal(t, sizeBytes, md.Size()) + otlp := internal.TracesToOtlp(td.InternalRep()) + size := otlp.Size() + bytes, err := otlp.Marshal() + require.NoError(t, err) + assert.Equal(t, size, td.Size()) + assert.Equal(t, len(bytes), td.Size()) } func TestTracesSizeWithNil(t *testing.T) { - assert.Equal(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{nil}).Size()) + assert.Equal(t, 0, NewTraces().Size()) } func TestSpanCountWithEmpty(t *testing.T) { - assert.EqualValues(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{{}}).SpanCount()) - assert.EqualValues(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{ - { - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{{}}, + assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{{}}, + }}.SpanCount()) + assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{{}}, + }, }, - }).SpanCount()) - assert.EqualValues(t, 1, TracesFromOtlp([]*otlptrace.ResourceSpans{ - { - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{{}}, + }}.SpanCount()) + assert.EqualValues(t, 1, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{{}}, + }, }, }, }, - }).SpanCount()) + }}.SpanCount()) } func TestSpanID(t *testing.T) { @@ -146,10 +149,10 @@ func TestSpanStatusCode(t *testing.T) { } func TestToFromOtlp(t *testing.T) { - otlp := []*otlptrace.ResourceSpans(nil) - td := TracesFromOtlp(otlp) + otlp := &otlpcollectortrace.ExportTraceServiceRequest{} + td := TracesFromInternalRep(internal.TracesFromOtlp(otlp)) assert.EqualValues(t, NewTraces(), td) - assert.EqualValues(t, otlp, TracesToOtlp(td)) + assert.EqualValues(t, otlp, internal.TracesToOtlp(td.InternalRep())) // More tests in ./tracedata/trace_test.go. Cannot have them here because of // circular dependency. } @@ -193,14 +196,13 @@ func TestTracesToFromOtlpProtoBytes(t *testing.T) { bytes, err := send.ToOtlpProtoBytes() assert.NoError(t, err) - recv := NewTraces() - err = recv.FromOtlpProtoBytes(bytes) + recv, err := TracesFromOtlpProtoBytes(bytes) assert.NoError(t, err) assert.EqualValues(t, send, recv) } func TestTracesFromInvalidOtlpProtoBytes(t *testing.T) { - err := NewTraces().FromOtlpProtoBytes([]byte{0xFF}) + _, err := TracesFromOtlpProtoBytes([]byte{0xFF}) assert.EqualError(t, err, "unexpected EOF") } @@ -242,8 +244,8 @@ func BenchmarkTracesFromOtlp(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - traces := NewTraces() - require.NoError(b, traces.FromOtlpProtoBytes(buf)) + traces, err := TracesFromOtlpProtoBytes(buf) + require.NoError(b, err) assert.Equal(b, baseTraces.ResourceSpans().Len(), traces.ResourceSpans().Len()) } } diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index 580cb4f73dc..b9053cfa806 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal" - otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" ) // Marshaler configuration used for marhsaling Protobuf to JSON. Use default config. @@ -39,10 +38,7 @@ type fileExporter struct { } func (e *fileExporter) ConsumeTraces(_ context.Context, td pdata.Traces) error { - request := otlptrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(td), - } - return exportMessageAsLine(e, &request) + return exportMessageAsLine(e, internal.TracesToOtlp(td.InternalRep())) } func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { diff --git a/exporter/fileexporter/file_exporter_test.go b/exporter/fileexporter/file_exporter_test.go index 8a7b3071594..dce3fd5a137 100644 --- a/exporter/fileexporter/file_exporter_test.go +++ b/exporter/fileexporter/file_exporter_test.go @@ -45,10 +45,10 @@ func TestFileTraceExporterNoErrors(t *testing.T) { assert.NoError(t, lte.Shutdown(context.Background())) var unmarshaler = &jsonpb.Unmarshaler{} - var j collectortrace.ExportTraceServiceRequest - assert.NoError(t, unmarshaler.Unmarshal(mf, &j)) + got := &collectortrace.ExportTraceServiceRequest{} + assert.NoError(t, unmarshaler.Unmarshal(mf, got)) - assert.EqualValues(t, pdata.TracesToOtlp(td), j.ResourceSpans) + assert.EqualValues(t, internal.TracesToOtlp(td.InternalRep()), got) } func TestFileMetricsExporterNoErrors(t *testing.T) { diff --git a/exporter/kafkaexporter/otlp_marshaller_test.go b/exporter/kafkaexporter/otlp_marshaller_test.go index 069c8a4614c..098b208be2a 100644 --- a/exporter/kafkaexporter/otlp_marshaller_test.go +++ b/exporter/kafkaexporter/otlp_marshaller_test.go @@ -31,8 +31,7 @@ func TestOTLPTracesPbMarshaller(t *testing.T) { messages, err := m.Marshal(td) require.NoError(t, err) require.Len(t, messages, 1) - extracted := pdata.NewTraces() - err = extracted.FromOtlpProtoBytes(messages[0].Value) + extracted, err := pdata.TracesFromOtlpProtoBytes(messages[0].Value) require.NoError(t, err) assert.EqualValues(t, td, extracted) } diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index f03023cb525..26aa08c2cf5 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -66,10 +66,7 @@ func (e *exporterImp) shutdown(context.Context) error { } func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) error { - request := &otlptrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(td), - } - if err := e.w.exportTrace(ctx, request); err != nil { + if err := e.w.exportTrace(ctx, internal.TracesToOtlp(td.InternalRep())); err != nil { return fmt.Errorf("failed to push trace data via OTLP exporter: %w", err) } return nil diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index afc94a24f93..26d99c27194 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -239,9 +239,7 @@ func TestSendTraces(t *testing.T) { // A trace with 2 spans. td = testdata.GenerateTraceDataTwoSpansSameResource() - expectedOTLPReq := &otlptraces.ExportTraceServiceRequest{ - ResourceSpans: testdata.GenerateTraceOtlpSameResourceTwoSpans(), - } + expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep()) err = exp.ConsumeTraces(context.Background(), td) assert.NoError(t, err) @@ -449,6 +447,9 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pd // Ensure that initially there is no data in the receiver. assert.EqualValues(t, 0, atomic.LoadInt32(&rcv.requestCount)) + // Clone the request and store as expected. + expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep()) + // Resend the request, this should succeed. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) assert.NoError(t, exp.ConsumeTraces(ctx, td)) @@ -459,10 +460,6 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pd return atomic.LoadInt32(&rcv.requestCount) > 0 }, "receive a request") - expectedOTLPReq := &otlptraces.ExportTraceServiceRequest{ - ResourceSpans: testdata.GenerateTraceOtlpSameResourceTwoSpans(), - } - // Verify received span. assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.totalItems)) assert.EqualValues(t, expectedOTLPReq, rcv.GetLastRequest()) diff --git a/internal/goldendataset/traces_generator.go b/internal/goldendataset/traces_generator.go index df1d67a6cc4..4cbd807f915 100644 --- a/internal/goldendataset/traces_generator.go +++ b/internal/goldendataset/traces_generator.go @@ -20,6 +20,8 @@ import ( "math/rand" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" ) @@ -49,7 +51,10 @@ func GenerateTraces(tracePairsFile string, spanPairsFile string) ([]pdata.Traces if spanErr != nil { err = spanErr } - traces[index-1] = pdata.TracesFromOtlp([]*otlptrace.ResourceSpans{rscSpan}) + traces[index-1] = pdata.TracesFromInternalRep( + internal.TracesFromOtlp(&otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{rscSpan}, + })) } return traces, err } diff --git a/internal/otlp_wrapper.go b/internal/otlp_wrapper.go index a5c187c3ed2..c80e96b50a7 100644 --- a/internal/otlp_wrapper.go +++ b/internal/otlp_wrapper.go @@ -17,6 +17,7 @@ package internal import ( otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" ) // MetricsWrapper is an intermediary struct that is declared in an internal package @@ -34,6 +35,21 @@ func MetricsFromOtlp(req *otlpcollectormetrics.ExportMetricsServiceRequest) Metr return MetricsWrapper{req: req} } +// TracesWrapper is an intermediary struct that is declared in an internal package +// as a way to prevent certain functions of pdata.Traces data type to be callable by +// any code outside of this module. +type TracesWrapper struct { + req *otlpcollectortrace.ExportTraceServiceRequest +} + +func TracesToOtlp(mw TracesWrapper) *otlpcollectortrace.ExportTraceServiceRequest { + return mw.req +} + +func TracesFromOtlp(req *otlpcollectortrace.ExportTraceServiceRequest) TracesWrapper { + return TracesWrapper{req: req} +} + // LogsWrapper is an intermediary struct that is declared in an internal package // as a way to prevent certain functions of pdata.Logs data type to be callable by // any code outside of this module. diff --git a/internal/testdata/trace.go b/internal/testdata/trace.go index 8e113190ff4..0a7cb0a928b 100644 --- a/internal/testdata/trace.go +++ b/internal/testdata/trace.go @@ -17,6 +17,7 @@ package testdata import ( "time" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/consumer/pdata" @@ -38,8 +39,8 @@ func GenerateTraceDataEmpty() pdata.Traces { return td } -func generateTraceOtlpEmpty() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans(nil) +func generateTraceOtlpEmpty() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{} } func GenerateTraceDataOneEmptyResourceSpans() pdata.Traces { @@ -48,9 +49,11 @@ func GenerateTraceDataOneEmptyResourceSpans() pdata.Traces { return td } -func generateTraceOtlpOneEmptyResourceSpans() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans{ - {}, +func generateTraceOtlpOneEmptyResourceSpans() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + {}, + }, } } @@ -61,10 +64,12 @@ func GenerateTraceDataNoLibraries() pdata.Traces { return td } -func generateTraceOtlpNoLibraries() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans{ - { - Resource: generateOtlpResource1(), +func generateTraceOtlpNoLibraries() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + Resource: generateOtlpResource1(), + }, }, } } @@ -76,12 +81,14 @@ func GenerateTraceDataOneEmptyInstrumentationLibrary() pdata.Traces { return td } -func generateTraceOtlpOneEmptyInstrumentationLibrary() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans{ - { - Resource: generateOtlpResource1(), - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - {}, +func generateTraceOtlpOneEmptyInstrumentationLibrary() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + Resource: generateOtlpResource1(), + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + {}, + }, }, }, } @@ -97,13 +104,15 @@ func GenerateTraceDataOneSpanNoResource() pdata.Traces { return td } -func generateTraceOtlpOneSpanNoResource() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans{ - { - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - generateOtlpSpanOne(), +func generateTraceOtlpOneSpanNoResource() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + generateOtlpSpanOne(), + }, }, }, }, @@ -119,14 +128,16 @@ func GenerateTraceDataOneSpan() pdata.Traces { return td } -func generateTraceOtlpOneSpan() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans{ - { - Resource: generateOtlpResource1(), - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - generateOtlpSpanOne(), +func generateTraceOtlpOneSpan() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + Resource: generateOtlpResource1(), + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + generateOtlpSpanOne(), + }, }, }, }, @@ -143,16 +154,18 @@ func GenerateTraceDataTwoSpansSameResource() pdata.Traces { return td } -// GenerateTraceOtlpSameResourceTwoSpans returns the OTLP representation of the GenerateTraceOtlpSameResourceTwoSpans. -func GenerateTraceOtlpSameResourceTwoSpans() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans{ - { - Resource: generateOtlpResource1(), - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - generateOtlpSpanOne(), - generateOtlpSpanTwo(), +// generateTraceOtlpSameResourceTwoSpans returns the OTLP representation of the generateTraceOtlpSameResourceTwoSpans. +func generateTraceOtlpSameResourceTwoSpans() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + Resource: generateOtlpResource1(), + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + generateOtlpSpanOne(), + generateOtlpSpanTwo(), + }, }, }, }, @@ -189,25 +202,27 @@ func GenerateTraceDataManySpansSameResource(spansCount int) pdata.Traces { return td } -func generateTraceOtlpTwoSpansSameResourceOneDifferent() []*otlptrace.ResourceSpans { - return []*otlptrace.ResourceSpans{ - { - Resource: generateOtlpResource1(), - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - generateOtlpSpanOne(), - generateOtlpSpanTwo(), +func generateTraceOtlpTwoSpansSameResourceOneDifferent() *otlpcollectortrace.ExportTraceServiceRequest { + return &otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + Resource: generateOtlpResource1(), + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + generateOtlpSpanOne(), + generateOtlpSpanTwo(), + }, }, }, }, - }, - { - Resource: generateOtlpResource2(), - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - generateOtlpSpanThree(), + { + Resource: generateOtlpResource2(), + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + generateOtlpSpanThree(), + }, }, }, }, diff --git a/internal/testdata/trace_test.go b/internal/testdata/trace_test.go index 1f3547e1ea5..be09c77f1ea 100644 --- a/internal/testdata/trace_test.go +++ b/internal/testdata/trace_test.go @@ -19,7 +19,8 @@ import ( "github.com/stretchr/testify/assert" - otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" + "go.opentelemetry.io/collector/internal" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" "go.opentelemetry.io/collector/consumer/pdata" ) @@ -27,7 +28,7 @@ import ( type traceTestCase struct { name string td pdata.Traces - otlp []*otlptrace.ResourceSpans + otlp *otlpcollectortrace.ExportTraceServiceRequest } func generateAllTraceTestCases() []traceTestCase { @@ -65,7 +66,7 @@ func generateAllTraceTestCases() []traceTestCase { { name: "two-spans-same-resource", td: GenerateTraceDataTwoSpansSameResource(), - otlp: GenerateTraceOtlpSameResourceTwoSpans(), + otlp: generateTraceOtlpSameResourceTwoSpans(), }, { name: "two-spans-same-resource-one-different", @@ -81,9 +82,9 @@ func TestToFromOtlpTrace(t *testing.T) { for i := range allTestCases { test := allTestCases[i] t.Run(test.name, func(t *testing.T) { - td := pdata.TracesFromOtlp(test.otlp) + td := pdata.TracesFromInternalRep(internal.TracesFromOtlp(test.otlp)) assert.EqualValues(t, test.td, td) - otlp := pdata.TracesToOtlp(td) + otlp := internal.TracesToOtlp(td.InternalRep()) assert.EqualValues(t, test.otlp, otlp) }) } diff --git a/receiver/kafkareceiver/otlp_unmarshaller.go b/receiver/kafkareceiver/otlp_unmarshaller.go index 4a1621377a7..c4ba90d3908 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller.go +++ b/receiver/kafkareceiver/otlp_unmarshaller.go @@ -24,9 +24,7 @@ type otlpTracesPbUnmarshaller struct { var _ Unmarshaller = (*otlpTracesPbUnmarshaller)(nil) func (p *otlpTracesPbUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { - td := pdata.NewTraces() - err := td.FromOtlpProtoBytes(bytes) - return td, err + return pdata.TracesFromOtlpProtoBytes(bytes) } func (*otlpTracesPbUnmarshaller) Encoding() string { diff --git a/receiver/kafkareceiver/otlp_unmarshaller_test.go b/receiver/kafkareceiver/otlp_unmarshaller_test.go index 09cfb28d90c..cd3a77b5f09 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller_test.go +++ b/receiver/kafkareceiver/otlp_unmarshaller_test.go @@ -41,7 +41,6 @@ func TestUnmarshallOTLP(t *testing.T) { func TestUnmarshallOTLP_error(t *testing.T) { p := otlpTracesPbUnmarshaller{} - got, err := p.Unmarshal([]byte("+$%")) - assert.Equal(t, pdata.NewTraces(), got) + _, err := p.Unmarshal([]byte("+$%")) assert.Error(t, err) } diff --git a/receiver/otlpreceiver/marshal_jsonpb_test.go b/receiver/otlpreceiver/marshal_jsonpb_test.go index 16f7a5c4aad..a95fba08563 100644 --- a/receiver/otlpreceiver/marshal_jsonpb_test.go +++ b/receiver/otlpreceiver/marshal_jsonpb_test.go @@ -19,7 +19,7 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" v1 "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/internal/testdata" ) @@ -84,8 +84,8 @@ func TestJSONPbMarshal(t *testing.T) { Indent: " ", } td := testdata.GenerateTraceDataOneSpan() - otlp := pdata.TracesToOtlp(td) - bytes, err := jpb.Marshal(otlp[0]) + otlp := internal.TracesToOtlp(td.InternalRep()) + bytes, err := jpb.Marshal(otlp.ResourceSpans[0]) assert.NoError(t, err) assert.JSONEq(t, expectedJSON, string(bytes)) } @@ -98,6 +98,6 @@ func TestJSONPbUnmarshal(t *testing.T) { err := jpb.Unmarshal([]byte(expectedJSON), &proto) assert.NoError(t, err) td := testdata.GenerateTraceDataOneSpan() - otlp := pdata.TracesToOtlp(td) - assert.EqualValues(t, &proto, otlp[0]) + otlp := internal.TracesToOtlp(td.InternalRep()) + assert.EqualValues(t, &proto, otlp.ResourceSpans[0]) } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index ebd68625e95..100c6894df1 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -47,6 +47,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/internal/data" collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1" @@ -127,7 +128,9 @@ var resourceSpansOtlp = otlptrace.ResourceSpans{ }, } -var traceOtlp = pdata.TracesFromOtlp([]*otlptrace.ResourceSpans{&resourceSpansOtlp}) +var traceOtlp = pdata.TracesFromInternalRep(internal.TracesFromOtlp(&collectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{&resourceSpansOtlp}, +})) func TestJsonHttp(t *testing.T) { tests := []struct { @@ -345,10 +348,7 @@ func TestProtoHttp(t *testing.T) { // Wait for the servers to start <-time.After(10 * time.Millisecond) - wantOtlp := pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan()) - traceProto := collectortrace.ExportTraceServiceRequest{ - ResourceSpans: wantOtlp, - } + traceProto := internal.TracesToOtlp(testdata.GenerateTraceDataOneSpan().InternalRep()) traceBytes, err := traceProto.Marshal() if err != nil { t.Errorf("Error marshaling protobuf: %v", err) @@ -365,7 +365,7 @@ func TestProtoHttp(t *testing.T) { t.Run(test.name+targetURLPath, func(t *testing.T) { url := fmt.Sprintf("http://%s%s", addr, targetURLPath) tSink.Reset() - testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, wantOtlp) + testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, traceProto) }) } } @@ -400,7 +400,7 @@ func testHTTPProtobufRequest( encoding string, traceBytes []byte, expectedErr error, - wantOtlp []*otlptrace.ResourceSpans, + wantOtlp *collectortrace.ExportTraceServiceRequest, ) { tSink.SetConsumeError(expectedErr) @@ -426,20 +426,8 @@ func testHTTPProtobufRequest( require.Len(t, allTraces, 1) - gotOtlp := pdata.TracesToOtlp(allTraces[0]) - - if len(gotOtlp) != len(wantOtlp) { - t.Fatalf("len(traces):\nGot: %d\nWant: %d\n", len(gotOtlp), len(wantOtlp)) - } - - got := gotOtlp[0] - want := wantOtlp[0] - - if !assert.EqualValues(t, got, want) { - t.Errorf("Sending trace proto over http failed\nGot:\n%v\nWant:\n%v\n", - got.String(), - want.String()) - } + gotOtlp := internal.TracesToOtlp(allTraces[0].InternalRep()) + assert.EqualValues(t, gotOtlp, wantOtlp) } else { errStatus := &spb.Status{} assert.NoError(t, proto.Unmarshal(respBytes, errStatus)) @@ -569,9 +557,7 @@ func TestHTTPStartWithoutConsumers(t *testing.T) { } func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest { - return &collectortrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan()), - } + return internal.TracesToOtlp(testdata.GenerateTraceDataOneSpan().InternalRep()) } // TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver diff --git a/receiver/otlpreceiver/trace/otlp.go b/receiver/otlpreceiver/trace/otlp.go index 6dd2d3cf887..b6b36bf8ac6 100644 --- a/receiver/otlpreceiver/trace/otlp.go +++ b/receiver/otlpreceiver/trace/otlp.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/obsreport" @@ -76,7 +77,7 @@ func (r *Receiver) Export(ctx context.Context, req *collectortrace.ExportTraceSe } } - td := pdata.TracesFromOtlp(req.ResourceSpans) + td := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req)) err := r.sendToNextConsumer(ctxWithReceiverName, td) if err != nil { return nil, err diff --git a/receiver/otlpreceiver/trace/otlp_test.go b/receiver/otlpreceiver/trace/otlp_test.go index 673c6021d16..ae2fe8cfb9e 100644 --- a/receiver/otlpreceiver/trace/otlp_test.go +++ b/receiver/otlpreceiver/trace/otlp_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/internal/data" collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" @@ -54,20 +55,22 @@ func TestExport(t *testing.T) { unixnanos := uint64(12578940000000012345) traceID := [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1} spanID := [8]byte{8, 7, 6, 5, 4, 3, 2, 1} - resourceSpans := []*otlptrace.ResourceSpans{ - { - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - { - TraceId: data.NewTraceID(traceID), - SpanId: data.NewSpanID(spanID), - Name: "operationB", - Kind: otlptrace.Span_SPAN_KIND_SERVER, - StartTimeUnixNano: unixnanos, - EndTimeUnixNano: unixnanos, - Status: otlptrace.Status{Message: "status-cancelled", Code: otlptrace.Status_STATUS_CODE_ERROR}, - TraceState: "a=text,b=123", + req := &collectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + { + TraceId: data.NewTraceID(traceID), + SpanId: data.NewSpanID(spanID), + Name: "operationB", + Kind: otlptrace.Span_SPAN_KIND_SERVER, + StartTimeUnixNano: unixnanos, + EndTimeUnixNano: unixnanos, + Status: otlptrace.Status{Message: "status-cancelled", Code: otlptrace.Status_STATUS_CODE_ERROR}, + TraceState: "a=text,b=123", + }, }, }, }, @@ -77,11 +80,7 @@ func TestExport(t *testing.T) { // Keep trace data to compare the test result against it // Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream - traceData := pdata.TracesFromOtlp(resourceSpans).Clone() - - req := &collectortrace.ExportTraceServiceRequest{ - ResourceSpans: resourceSpans, - } + traceData := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req)).Clone() resp, err := traceClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export trace: %v", err) @@ -288,7 +287,7 @@ func TestDeprecatedStatusCode(t *testing.T) { // Check that Code is as expected. assert.EqualValues(t, rcvdStatus.Code(), test.expectedRcvCode) - spanProto := pdata.TracesToOtlp(traceSink.AllTraces()[0])[0].InstrumentationLibrarySpans[0].Spans[0] + spanProto := internal.TracesToOtlp(traceSink.AllTraces()[0].InternalRep()).ResourceSpans[0].InstrumentationLibrarySpans[0].Spans[0] // Check that DeprecatedCode is passed as is. assert.EqualValues(t, spanProto.Status.DeprecatedCode, test.expectedDeprecatedCode) diff --git a/testbed/testbed/data_providers.go b/testbed/testbed/data_providers.go index 7f57f5aaf4d..54c17080752 100644 --- a/testbed/testbed/data_providers.go +++ b/testbed/testbed/data_providers.go @@ -287,7 +287,7 @@ func (dp *GoldenDataProvider) GetGeneratedSpan(traceID pdata.TraceID, spanID pda if dp.spansMap == nil { var resourceSpansList []*otlptrace.ResourceSpans for _, td := range dp.tracesGenerated { - resourceSpansList = append(resourceSpansList, pdata.TracesToOtlp(td)...) + resourceSpansList = append(resourceSpansList, internal.TracesToOtlp(td.InternalRep()).ResourceSpans...) } dp.spansMap = populateSpansMap(resourceSpansList) } @@ -345,7 +345,7 @@ func NewFileDataProvider(filePath string, dataType configmodels.DataType) (*File } message = &msg - md := pdata.TracesFromOtlp(msg.ResourceSpans) + md := pdata.TracesFromInternalRep(internal.TracesFromOtlp(&msg)) dataPointCount = md.SpanCount() case configmodels.MetricsDataType: diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go index ebf2401546c..b0d681f6e49 100644 --- a/testbed/testbed/validator.go +++ b/testbed/testbed/validator.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" ) @@ -123,7 +124,7 @@ func (v *CorrectnessTestValidator) RecordResults(tc *TestCase) { func (v *CorrectnessTestValidator) assertSentRecdTracingDataEqual(tracesList []pdata.Traces) { for _, td := range tracesList { - resourceSpansList := pdata.TracesToOtlp(td) + resourceSpansList := internal.TracesToOtlp(td.InternalRep()).ResourceSpans for _, rs := range resourceSpansList { for _, ils := range rs.InstrumentationLibrarySpans { for _, recdSpan := range ils.Spans { diff --git a/translator/internaldata/resource_to_oc_test.go b/translator/internaldata/resource_to_oc_test.go index 78d004decc4..7f90314b99f 100644 --- a/translator/internaldata/resource_to_oc_test.go +++ b/translator/internaldata/resource_to_oc_test.go @@ -28,6 +28,8 @@ import ( "google.golang.org/protobuf/testing/protocmp" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal" + otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/internal/goldendataset" "go.opentelemetry.io/collector/translator/conventions" @@ -231,7 +233,9 @@ func TestResourceToOCAndBack(t *testing.T) { } for _, test := range tests { t.Run(string(test), func(t *testing.T) { - traces := pdata.TracesFromOtlp([]*otlptrace.ResourceSpans{{Resource: goldendataset.GenerateResource(test)}}) + traces := pdata.TracesFromInternalRep(internal.TracesFromOtlp(&otlpcollectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{{Resource: goldendataset.GenerateResource(test)}}, + })) expected := traces.ResourceSpans().At(0).Resource() ocNode, ocResource := internalResourceToOC(expected) actual := pdata.NewResource()