Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TracesWrapper to dissallow access to internal representation #2721

Merged
merged 1 commit into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 29 additions & 38 deletions consumer/pdata/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package pdata

import (
"go.opentelemetry.io/collector/internal"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)
Expand All @@ -23,52 +24,49 @@ import (

// Traces is the top-level struct that is propagated through the traces pipeline.
type Traces struct {
orig *[]*otlptrace.ResourceSpans
orig *otlpcollectortrace.ExportTraceServiceRequest
}

// NewTraces creates a new Traces.
func NewTraces() Traces {
orig := []*otlptrace.ResourceSpans(nil)
return Traces{&orig}
return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

// TracesFromOtlp creates the internal Traces representation from the OTLP.
func TracesFromOtlp(orig []*otlptrace.ResourceSpans) Traces {
return Traces{&orig}
// TracesFromInternalRep creates Traces from the internal representation.
// Should not be used outside this module.
func TracesFromInternalRep(wrapper internal.TracesWrapper) Traces {
return Traces{orig: internal.TracesToOtlp(wrapper)}
}

// TracesToOtlp converts the internal Traces to the OTLP.
func TracesToOtlp(td Traces) []*otlptrace.ResourceSpans {
return *td.orig
// TracesFromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes to the internal Traces.
//
// Returns an invalid Traces instance if error is not nil.
func TracesFromOtlpProtoBytes(data []byte) (Traces, error) {
req := otlpcollectortrace.ExportTraceServiceRequest{}
if err := req.Unmarshal(data); err != nil {
return Traces{}, err
}
return Traces{orig: &req}, nil
}

// InternalRep returns internal representation of the Traces.
// Should not be used outside this module.
func (td Traces) InternalRep() internal.TracesWrapper {
return internal.TracesFromOtlp(td.orig)
}

// ToOtlpProtoBytes converts the internal Traces to OTLP Collector
// ExportTraceServiceRequest ProtoBuf bytes.
func (td Traces) ToOtlpProtoBytes() ([]byte, error) {
traces := otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: *td.orig,
}
return traces.Marshal()
}

// FromOtlpProtoBytes converts OTLP Collector ExportTraceServiceRequest
// ProtoBuf bytes to the internal Traces. Overrides current data.
// Calling this function on zero-initialized structure causes panic.
// Use it with NewTraces or on existing initialized Traces.
func (td Traces) FromOtlpProtoBytes(data []byte) error {
traces := otlpcollectortrace.ExportTraceServiceRequest{}
if err := traces.Unmarshal(data); err != nil {
return err
}
*td.orig = traces.ResourceSpans
return nil
return td.orig.Marshal()
}

// Clone returns a copy of Traces.
func (td Traces) Clone() Traces {
rss := NewResourceSpansSlice()
td.ResourceSpans().CopyTo(rss)
return Traces(rss)
cloneTd := NewTraces()
td.ResourceSpans().CopyTo(cloneTd.ResourceSpans())
return cloneTd
}

// SpanCount calculates the total number of spans.
Expand All @@ -87,18 +85,11 @@ func (td Traces) SpanCount() int {

// Size returns size in bytes.
func (td Traces) Size() int {
size := 0
for i := 0; i < len(*td.orig); i++ {
if (*td.orig)[i] == nil {
continue
}
size += (*td.orig)[i].Size()
}
return size
return td.orig.Size()
}

func (td Traces) ResourceSpans() ResourceSpansSlice {
return newResourceSpansSlice(td.orig)
return newResourceSpansSlice(&td.orig.ResourceSpans)
}

// TraceState in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header
Expand Down
70 changes: 36 additions & 34 deletions consumer/pdata/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"

"go.opentelemetry.io/collector/internal"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)

Expand Down Expand Up @@ -50,46 +52,47 @@ func TestSpanCount(t *testing.T) {
}

func TestSize(t *testing.T) {
md := NewTraces()
assert.Equal(t, 0, md.Size())
rms := md.ResourceSpans()
td := NewTraces()
assert.Equal(t, 0, td.Size())
rms := td.ResourceSpans()
rms.Resize(1)
rms.At(0).InstrumentationLibrarySpans().Resize(1)
rms.At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
rms.At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo")
otlp := TracesToOtlp(md)
size := 0
sizeBytes := 0
for _, rspans := range otlp {
size += rspans.Size()
bts, err := rspans.Marshal()
require.NoError(t, err)
sizeBytes += len(bts)
}
assert.Equal(t, size, md.Size())
assert.Equal(t, sizeBytes, md.Size())
otlp := internal.TracesToOtlp(td.InternalRep())
size := otlp.Size()
bytes, err := otlp.Marshal()
require.NoError(t, err)
assert.Equal(t, size, td.Size())
assert.Equal(t, len(bytes), td.Size())
}

func TestTracesSizeWithNil(t *testing.T) {
assert.Equal(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{nil}).Size())
assert.Equal(t, 0, NewTraces().Size())
}

func TestSpanCountWithEmpty(t *testing.T) {
assert.EqualValues(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{{}}).SpanCount())
assert.EqualValues(t, 0, TracesFromOtlp([]*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{{}},
assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{{}},
}}.SpanCount())
assert.EqualValues(t, 0, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{{}},
},
},
}).SpanCount())
assert.EqualValues(t, 1, TracesFromOtlp([]*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{{}},
}}.SpanCount())
assert.EqualValues(t, 1, Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{{}},
},
},
},
},
}).SpanCount())
}}.SpanCount())
}

func TestSpanID(t *testing.T) {
Expand Down Expand Up @@ -146,10 +149,10 @@ func TestSpanStatusCode(t *testing.T) {
}

func TestToFromOtlp(t *testing.T) {
otlp := []*otlptrace.ResourceSpans(nil)
td := TracesFromOtlp(otlp)
otlp := &otlpcollectortrace.ExportTraceServiceRequest{}
td := TracesFromInternalRep(internal.TracesFromOtlp(otlp))
assert.EqualValues(t, NewTraces(), td)
assert.EqualValues(t, otlp, TracesToOtlp(td))
assert.EqualValues(t, otlp, internal.TracesToOtlp(td.InternalRep()))
// More tests in ./tracedata/trace_test.go. Cannot have them here because of
// circular dependency.
}
Expand Down Expand Up @@ -193,14 +196,13 @@ func TestTracesToFromOtlpProtoBytes(t *testing.T) {
bytes, err := send.ToOtlpProtoBytes()
assert.NoError(t, err)

recv := NewTraces()
err = recv.FromOtlpProtoBytes(bytes)
recv, err := TracesFromOtlpProtoBytes(bytes)
assert.NoError(t, err)
assert.EqualValues(t, send, recv)
}

func TestTracesFromInvalidOtlpProtoBytes(t *testing.T) {
err := NewTraces().FromOtlpProtoBytes([]byte{0xFF})
_, err := TracesFromOtlpProtoBytes([]byte{0xFF})
assert.EqualError(t, err, "unexpected EOF")
}

Expand Down Expand Up @@ -242,8 +244,8 @@ func BenchmarkTracesFromOtlp(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
traces := NewTraces()
require.NoError(b, traces.FromOtlpProtoBytes(buf))
traces, err := TracesFromOtlpProtoBytes(buf)
require.NoError(b, err)
assert.Equal(b, baseTraces.ResourceSpans().Len(), traces.ResourceSpans().Len())
}
}
6 changes: 1 addition & 5 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config.
Expand All @@ -39,10 +38,7 @@ type fileExporter struct {
}

func (e *fileExporter) ConsumeTraces(_ context.Context, td pdata.Traces) error {
request := otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
return exportMessageAsLine(e, &request)
return exportMessageAsLine(e, internal.TracesToOtlp(td.InternalRep()))
}

func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
Expand Down
6 changes: 3 additions & 3 deletions exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func TestFileTraceExporterNoErrors(t *testing.T) {
assert.NoError(t, lte.Shutdown(context.Background()))

var unmarshaler = &jsonpb.Unmarshaler{}
var j collectortrace.ExportTraceServiceRequest
assert.NoError(t, unmarshaler.Unmarshal(mf, &j))
got := &collectortrace.ExportTraceServiceRequest{}
assert.NoError(t, unmarshaler.Unmarshal(mf, got))

assert.EqualValues(t, pdata.TracesToOtlp(td), j.ResourceSpans)
assert.EqualValues(t, internal.TracesToOtlp(td.InternalRep()), got)
}

func TestFileMetricsExporterNoErrors(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions exporter/kafkaexporter/otlp_marshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func TestOTLPTracesPbMarshaller(t *testing.T) {
messages, err := m.Marshal(td)
require.NoError(t, err)
require.Len(t, messages, 1)
extracted := pdata.NewTraces()
err = extracted.FromOtlpProtoBytes(messages[0].Value)
extracted, err := pdata.TracesFromOtlpProtoBytes(messages[0].Value)
require.NoError(t, err)
assert.EqualValues(t, td, extracted)
}
Expand Down
5 changes: 1 addition & 4 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ func (e *exporterImp) shutdown(context.Context) error {
}

func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) error {
request := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
if err := e.w.exportTrace(ctx, request); err != nil {
if err := e.w.exportTrace(ctx, internal.TracesToOtlp(td.InternalRep())); err != nil {
return fmt.Errorf("failed to push trace data via OTLP exporter: %w", err)
}
return nil
Expand Down
11 changes: 4 additions & 7 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ func TestSendTraces(t *testing.T) {
// A trace with 2 spans.
td = testdata.GenerateTraceDataTwoSpansSameResource()

expectedOTLPReq := &otlptraces.ExportTraceServiceRequest{
ResourceSpans: testdata.GenerateTraceOtlpSameResourceTwoSpans(),
}
expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep())

err = exp.ConsumeTraces(context.Background(), td)
assert.NoError(t, err)
Expand Down Expand Up @@ -449,6 +447,9 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pd
// Ensure that initially there is no data in the receiver.
assert.EqualValues(t, 0, atomic.LoadInt32(&rcv.requestCount))

// Clone the request and store as expected.
expectedOTLPReq := internal.TracesToOtlp(td.Clone().InternalRep())

// Resend the request, this should succeed.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
assert.NoError(t, exp.ConsumeTraces(ctx, td))
Expand All @@ -459,10 +460,6 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pd
return atomic.LoadInt32(&rcv.requestCount) > 0
}, "receive a request")

expectedOTLPReq := &otlptraces.ExportTraceServiceRequest{
ResourceSpans: testdata.GenerateTraceOtlpSameResourceTwoSpans(),
}

// Verify received span.
assert.EqualValues(t, 2, atomic.LoadInt32(&rcv.totalItems))
assert.EqualValues(t, expectedOTLPReq, rcv.GetLastRequest())
Expand Down
7 changes: 6 additions & 1 deletion internal/goldendataset/traces_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"math/rand"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1"
)
Expand Down Expand Up @@ -49,7 +51,10 @@ func GenerateTraces(tracePairsFile string, spanPairsFile string) ([]pdata.Traces
if spanErr != nil {
err = spanErr
}
traces[index-1] = pdata.TracesFromOtlp([]*otlptrace.ResourceSpans{rscSpan})
traces[index-1] = pdata.TracesFromInternalRep(
internal.TracesFromOtlp(&otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{rscSpan},
}))
}
return traces, err
}
Expand Down
16 changes: 16 additions & 0 deletions internal/otlp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package internal
import (
otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

// MetricsWrapper is an intermediary struct that is declared in an internal package
Expand All @@ -34,6 +35,21 @@ func MetricsFromOtlp(req *otlpcollectormetrics.ExportMetricsServiceRequest) Metr
return MetricsWrapper{req: req}
}

// TracesWrapper is an intermediary struct that is declared in an internal package
// as a way to prevent certain functions of pdata.Traces data type to be callable by
// any code outside of this module.
type TracesWrapper struct {
req *otlpcollectortrace.ExportTraceServiceRequest
}

func TracesToOtlp(mw TracesWrapper) *otlpcollectortrace.ExportTraceServiceRequest {
return mw.req
}

func TracesFromOtlp(req *otlpcollectortrace.ExportTraceServiceRequest) TracesWrapper {
return TracesWrapper{req: req}
}

// LogsWrapper is an intermediary struct that is declared in an internal package
// as a way to prevent certain functions of pdata.Logs data type to be callable by
// any code outside of this module.
Expand Down
Loading