Skip to content

Commit

Permalink
Add TracesWrapper to dissallow access to internal representation
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Mar 17, 2021
1 parent 43f131a commit 34beafa
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 214 deletions.
67 changes: 29 additions & 38 deletions consumer/pdata/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package pdata

import (
"go.opentelemetry.io/collector/internal"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)
Expand All @@ -23,52 +24,49 @@ import (

// Traces is the top-level struct that is propagated through the traces pipeline.
type Traces struct {
orig *[]*otlptrace.ResourceSpans
orig *otlpcollectortrace.ExportTraceServiceRequest
}

// NewTraces creates a new Traces.
func NewTraces() Traces {
orig := []*otlptrace.ResourceSpans(nil)
return Traces{&orig}
return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

// TracesFromOtlp creates the internal Traces representation from the OTLP.
func TracesFromOtlp(orig []*otlptrace.ResourceSpans) Traces {
return Traces{&orig}
// TracesFromInternalRep creates Traces from the internal representation.
// Should not be used outside this module.
func TracesFromInternalRep(wrapper internal.TracesWrapper) Traces {
return Traces{orig: internal.TracesToOtlp(wrapper)}
}

// TracesToOtlp converts the internal Traces to the OTLP.
func TracesToOtlp(td Traces) []*otlptrace.ResourceSpans {
return *td.orig
// TracesFromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes to the internal Traces.
//
// Returns an invalid Traces instance if error is not nil.
func TracesFromOtlpProtoBytes(data []byte) (Traces, error) {
req := otlpcollectortrace.ExportTraceServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Traces{}, err
}
return Traces{orig: &req}, nil
}

// InternalRep returns internal representation of the Traces.
// Should not be used outside this module.
func (td Traces) InternalRep() internal.TracesWrapper {
return internal.TracesFromOtlp(td.orig)
}

// ToOtlpProtoBytes converts the internal Traces to OTLP Collector
// ExportTraceServiceRequest ProtoBuf bytes.
func (td Traces) ToOtlpProtoBytes() ([]byte, error) {
traces := otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: *td.orig,
}
return traces.Marshal()
}

// FromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes to the internal Traces. Overrides current data.
// Calling this function on zero-initialized structure causes panic.
// Use it with NewTraces or on existing initialized Traces.
func (td Traces) FromOtlpProtoBytes(data []byte) error {
traces := otlpcollectortrace.ExportTraceServiceRequest{}
if err := traces.Unmarshal(data); err != nil {
return err
}
*td.orig = traces.ResourceSpans
return nil
return td.orig.Marshal()
}

// Clone returns a copy of Traces.
func (td Traces) Clone() Traces {
rss := NewResourceSpansSlice()
td.ResourceSpans().CopyTo(rss)
return Traces(rss)
cloneTd := NewTraces()
td.ResourceSpans().CopyTo(cloneTd.ResourceSpans())
return cloneTd
}

// SpanCount calculates the total number of spans.
Expand All @@ -87,18 +85,11 @@ func (td Traces) SpanCount() int {

// Size returns size in bytes.
func (td Traces) Size() int {
size := 0
for i := 0; i < len(*td.orig); i++ {
if (*td.orig)[i] == nil {
continue
}
size += (*td.orig)[i].Size()
}
return size
return td.orig.Size()
}

func (td Traces) ResourceSpans() ResourceSpansSlice {
return newResourceSpansSlice(td.orig)
return newResourceSpansSlice(&td.orig.ResourceSpans)
}

// TraceState in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header
Expand Down
70 changes: 36 additions & 34 deletions consumer/pdata/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"

"go.opentelemetry.io/collector/internal"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)

Expand Down Expand Up @@ -50,46 +52,47 @@ func TestSpanCount(t *testing.T) {
}

func TestSize(t *testing.T) {
md := NewTraces()
assert.Equal(t, 0, md.Size())
rms := md.ResourceSpans()
td := NewTraces()
assert.Equal(t, 0, td.Size())
rms := td.ResourceSpans()
rms.Resize(1)
rms.At(0).InstrumentationLibrarySpans().Resize(1)
rms.At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
rms.At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo")
otlp := TracesToOtlp(md)
size := 0
sizeBytes := 0
for _, rspans := range otlp {
size += rspans.Size()
bts, err := rspans.Marshal()
require.NoError(t, err)
sizeBytes += len(bts)
}
assert.Equal(t, size, md.Size())
assert.Equal(t, sizeBytes, md.Size())
otlp := internal.TracesToOtlp(td.InternalRep())
size := otlp.Size()
bytes, err := otlp.Marshal()
require.NoError(t, err)
assert.Equal(t, size, td.Size())
assert.Equal(t, len(bytes), td.Size())
}

func TestTracesSizeWithNil(t *testing.T) {
assert.Equal(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{nil}).Size())
assert.Equal(t, 0, NewTraces().Size())
}

func TestSpanCountWithEmpty(t *testing.T) {
assert.EqualValues(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{{}}).SpanCount())
assert.EqualValues(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{{}},
assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{{}},
}}.SpanCount())
assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{{}},
},
},
}).SpanCount())
assert.EqualValues(t, 1, TracesFromOtlp([]*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{{}},
}}.SpanCount())
assert.EqualValues(t, 1, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{{}},
},
},
},
},
}).SpanCount())
}}.SpanCount())
}

func TestSpanID(t *testing.T) {
Expand Down Expand Up @@ -146,10 +149,10 @@ func TestSpanStatusCode(t *testing.T) {
}

func TestToFromOtlp(t *testing.T) {
otlp := []*otlptrace.ResourceSpans(nil)
td := TracesFromOtlp(otlp)
otlp := &otlpcollectortrace.ExportTraceServiceRequest{}
td := TracesFromInternalRep(internal.TracesFromOtlp(otlp))
assert.EqualValues(t, NewTraces(), td)
assert.EqualValues(t, otlp, TracesToOtlp(td))
assert.EqualValues(t, otlp, internal.TracesToOtlp(td.InternalRep()))
// More tests in ./tracedata/trace_test.go. Cannot have them here because of
// circular dependency.
}
Expand Down Expand Up @@ -193,14 +196,13 @@ func TestTracesToFromOtlpProtoBytes(t *testing.T) {
bytes, err := send.ToOtlpProtoBytes()
assert.NoError(t, err)

recv := NewTraces()
err = recv.FromOtlpProtoBytes(bytes)
recv, err := TracesFromOtlpProtoBytes(bytes)
assert.NoError(t, err)
assert.EqualValues(t, send, recv)
}

func TestTracesFromInvalidOtlpProtoBytes(t *testing.T) {
err := NewTraces().FromOtlpProtoBytes([]byte{0xFF})
_, err := TracesFromOtlpProtoBytes([]byte{0xFF})
assert.EqualError(t, err, "unexpected EOF")
}

Expand Down Expand Up @@ -242,8 +244,8 @@ func BenchmarkTracesFromOtlp(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
traces := NewTraces()
require.NoError(b, traces.FromOtlpProtoBytes(buf))
traces, err := TracesFromOtlpProtoBytes(buf)
require.NoError(b, err)
assert.Equal(b, baseTraces.ResourceSpans().Len(), traces.ResourceSpans().Len())
}
}
6 changes: 1 addition & 5 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config.
Expand All @@ -39,10 +38,7 @@ type fileExporter struct {
}

func (e *fileExporter) ConsumeTraces(_ context.Context, td pdata.Traces) error {
request := otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
return exportMessageAsLine(e, &request)
return exportMessageAsLine(e, internal.TracesToOtlp(td.InternalRep()))
}

func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
Expand Down
6 changes: 3 additions & 3 deletions exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func TestFileTraceExporterNoErrors(t *testing.T) {
assert.NoError(t, lte.Shutdown(context.Background()))

var unmarshaler = &jsonpb.Unmarshaler{}
var j collectortrace.ExportTraceServiceRequest
assert.NoError(t, unmarshaler.Unmarshal(mf, &j))
got := &collectortrace.ExportTraceServiceRequest{}
assert.NoError(t, unmarshaler.Unmarshal(mf, got))

assert.EqualValues(t, pdata.TracesToOtlp(td), j.ResourceSpans)
assert.EqualValues(t, internal.TracesToOtlp(td.InternalRep()), got)
}

func TestFileMetricsExporterNoErrors(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions exporter/kafkaexporter/otlp_marshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func TestOTLPTracesPbMarshaller(t *testing.T) {
messages, err := m.Marshal(td)
require.NoError(t, err)
require.Len(t, messages, 1)
extracted := pdata.NewTraces()
err = extracted.FromOtlpProtoBytes(messages[0].Value)
extracted, err := pdata.TracesFromOtlpProtoBytes(messages[0].Value)
require.NoError(t, err)
assert.EqualValues(t, td, extracted)
}
Expand Down
5 changes: 1 addition & 4 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ func (e *exporterImp) shutdown(context.Context) error {
}

func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) error {
request := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
if err := e.w.exportTrace(ctx, request); err != nil {
if err := e.w.exportTrace(ctx, internal.TracesToOtlp(td.InternalRep())); err != nil {
return fmt.Errorf("failed to push trace data via OTLP exporter: %w", err)
}
return nil
Expand Down
11 changes: 4 additions & 7 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ func TestSendTraces(t *testing.T) {
// A trace with 2 spans.
td = testdata.GenerateTraceDataTwoSpansSameResource()

expectedOTLPReq := &otlptraces.ExportTraceServiceRequest{
ResourceSpans: testdata.GenerateTraceOtlpSameResourceTwoSpans(),
}
expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep())

err = exp.ConsumeTraces(context.Background(), td)
assert.NoError(t, err)
Expand Down Expand Up @@ -449,6 +447,9 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pd
// Ensure that initially there is no data in the receiver.
assert.EqualValues(t, 0, atomic.LoadInt32(&rcv.requestCount))

// Clone the request and store as expected.
expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep())

// Resend the request, this should succeed.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
assert.NoError(t, exp.ConsumeTraces(ctx, td))
Expand All @@ -459,10 +460,6 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pd
return atomic.LoadInt32(&rcv.requestCount) > 0
}, "receive a request")

expectedOTLPReq := &otlptraces.ExportTraceServiceRequest{
ResourceSpans: testdata.GenerateTraceOtlpSameResourceTwoSpans(),
}

// Verify received span.
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.totalItems))
assert.EqualValues(t, expectedOTLPReq, rcv.GetLastRequest())
Expand Down
7 changes: 6 additions & 1 deletion internal/goldendataset/traces_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"math/rand"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)
Expand Down Expand Up @@ -49,7 +51,10 @@ func GenerateTraces(tracePairsFile string, spanPairsFile string) ([]pdata.Traces
if spanErr != nil {
err = spanErr
}
traces[index-1] = pdata.TracesFromOtlp([]*otlptrace.ResourceSpans{rscSpan})
traces[index-1] = pdata.TracesFromInternalRep(
internal.TracesFromOtlp(&otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{rscSpan},
}))
}
return traces, err
}
Expand Down
16 changes: 16 additions & 0 deletions internal/otlp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package internal
import (
otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

// MetricsWrapper is an intermediary struct that is declared in an internal package
Expand All @@ -34,6 +35,21 @@ func MetricsFromOtlp(req *otlpcollectormetrics.ExportMetricsServiceRequest) Metr
return MetricsWrapper{req: req}
}

// TracesWrapper is an intermediary struct that is declared in an internal package
// as a way to prevent certain functions of pdata.Traces data type to be callable by
// any code outside of this module.
type TracesWrapper struct {
req *otlpcollectortrace.ExportTraceServiceRequest
}

func TracesToOtlp(mw TracesWrapper) *otlpcollectortrace.ExportTraceServiceRequest {
return mw.req
}

func TracesFromOtlp(req *otlpcollectortrace.ExportTraceServiceRequest) TracesWrapper {
return TracesWrapper{req: req}
}

// LogsWrapper is an intermediary struct that is declared in an internal package
// as a way to prevent certain functions of pdata.Logs data type to be callable by
// any code outside of this module.
Expand Down
Loading

0 comments on commit 34beafa

Please sign in to comment.