Skip to content

Commit

Permalink
initial changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stevencl1013 committed Jul 10, 2020
1 parent 9bdff63 commit 120c4ff
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 9 deletions.
2 changes: 1 addition & 1 deletion exporter/stackdriverexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
}

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(logger *zap.Logger, cfg configmodels.Exporter) (component.TraceExporterOld, error) {
func (f *Factory) CreateTraceExporter(logger *zap.Logger, cfg configmodels.Exporter) (component.TraceExporter, error) {
eCfg := cfg.(*Config)
return newStackdriverTraceExporter(eCfg)
}
Expand Down
95 changes: 95 additions & 0 deletions exporter/stackdriverexporter/spandata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ package stackdriverexporter

import (
"errors"
"time"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
"go.opencensus.io/trace"
"go.opencensus.io/trace/tracestate"
"go.opentelemetry.io/collector/consumer/pdata"
// "go.opentelemetry.io/otel/api/kv"
apitrace "go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
"google.golang.org/grpc/codes"
)

var errNilSpan = errors.New("expected a non-nil span")
Expand Down Expand Up @@ -65,6 +72,94 @@ func protoSpanToOCSpanData(span *tracepb.Span, resource *resourcepb.Resource) (*
return sd, nil
}

func pdataResourceSpansToOTSpanData(rs pdata.ResourceSpans) ([]*export.SpanData, error) {
resource := rs.Resource()
var sds []*export.SpanData
ilss := rs.InstrumentationLibrarySpans()
for i := 0; i < ilss.Len(); i++ {
ils := ilss.At(i)
spans := ils.Spans()
for j := 0; j < spans.Len(); j++ {
sd, err := pdataSpanToOTSpanData(spans.At(j), resource, ils.InstrumentationLibrary())
if err != nil {
return nil, err
}
sds = append(sds, sd)
}
}

return sds, nil
}

func pdataSpanToOTSpanData(
span pdata.Span,
resource pdata.Resource,
il pdata.InstrumentationLibrary,
) (*export.SpanData, error) {
sc := apitrace.SpanContext{}
copy(sc.TraceID[:], span.TraceID())
copy(sc.SpanID[:], span.SpanID())
var parentSpanID apitrace.SpanID
copy(parentSpanID[:], span.ParentSpanID())
startTime := time.Unix(0, int64(span.StartTime()))
endTime := time.Unix(0, int64(span.EndTime()))
status := span.Status()
instrumentationLibrary := instrumentation.Library{
Name: il.Name(),
Version: il.Version(),
}
sd := &export.SpanData{
SpanContext: sc,
ParentSpanID: parentSpanID,
SpanKind: pdataSpanKindToOTSpanKind(span.Kind()),
StartTime: startTime,
EndTime: endTime,
Name: span.Name(),
/* // TODO
Attributes: protoAttributesToOCAttributes(span.Attributes, resource),
Links: protoLinksToOCLinks(span.Links),
MessageEvents: protoTimeEventsToOCMessageEvents(span.TimeEvents),
*/
HasRemoteParent: false, // no field for this?
StatusCode: pdataStatusCodeToGRPCCode(status.Code()),
StatusMessage: status.Message(),
DroppedAttributeCount: int(span.DroppedAttributesCount()),
DroppedMessageEventCount: int(span.DroppedEventsCount()),
DroppedLinkCount: int(span.DroppedLinksCount()),
InstrumentationLibrary: instrumentationLibrary,
}

return sd, nil
}

// func commonAttributesToOTAttributes(ckv *[]common.KeyValue, r *otresourcepb.Resource) []kv.KeyValue {
// // TODO
// return nil
// }

func pdataSpanKindToOTSpanKind(k pdata.SpanKind) apitrace.SpanKind {
switch k {
case pdata.SpanKindUNSPECIFIED:
return apitrace.SpanKindInternal
case pdata.SpanKindINTERNAL:
return apitrace.SpanKindInternal
case pdata.SpanKindSERVER:
return apitrace.SpanKindServer
case pdata.SpanKindCLIENT:
return apitrace.SpanKindClient
case pdata.SpanKindPRODUCER:
return apitrace.SpanKindProducer
case pdata.SpanKindCONSUMER:
return apitrace.SpanKindConsumer
default:
return apitrace.SpanKindUnspecified
}
}

func pdataStatusCodeToGRPCCode(c pdata.StatusCode) codes.Code {
return codes.Code(c)
}

func derefTruncatableString(ts *tracepb.TruncatableString) string {
if ts == nil {
return ""
Expand Down
68 changes: 61 additions & 7 deletions exporter/stackdriverexporter/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ import (
"fmt"

"contrib.go.opencensus.io/exporter/stackdriver"
cloudtrace "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
export "go.opentelemetry.io/otel/sdk/export/trace"
"google.golang.org/api/option"
"google.golang.org/grpc"
)

// stackdriverExporter is a wrapper struct of Stackdriver exporter
type stackdriverExporter struct {
exporter *stackdriver.Exporter
texporter *cloudtrace.Exporter
}

func (*stackdriverExporter) Name() string {
Expand All @@ -45,16 +49,40 @@ func (se *stackdriverExporter) Shutdown(context.Context) error {
return nil
}

func newStackdriverTraceExporter(cfg *Config) (component.TraceExporterOld, error) {
sde, serr := newStackdriverExporter(cfg)
if serr != nil {
return nil, fmt.Errorf("cannot configure Stackdriver Trace exporter: %v", serr)
func newStackdriverTraceExporter(cfg *Config) (component.TraceExporter, error) {
// sde, serr := newStackdriverExporter(cfg)
// if serr != nil {
// return nil, fmt.Errorf("cannot configure Stackdriver Trace exporter: %v", serr)
// }
// tExp := &stackdriverExporter{exporter: sde}
topts := []cloudtrace.Option{
cloudtrace.WithProjectID(cfg.ProjectID),
}
if cfg.Endpoint != "" {
var copts []option.ClientOption
if cfg.UseInsecure {
conn, err := grpc.Dial(cfg.Endpoint, grpc.WithInsecure())
if err != nil {
return nil, fmt.Errorf("cannot configure grpc conn: %v", err)
}
copts = append(copts, option.WithGRPCConn(conn))
} else {
copts = append(copts, option.WithEndpoint(cfg.Endpoint))
}
topts = append(topts, cloudtrace.WithTraceClientOptions(copts))
}
if cfg.NumOfWorkers > 0 {
topts = append(topts, cloudtrace.WithMaxNumberOfWorkers(cfg.NumOfWorkers))
}
exp, err := cloudtrace.NewExporter(topts...)
if err != nil {
return nil, fmt.Errorf("Error creating Stackdriver Trace exporter: %v", err)
}
tExp := &stackdriverExporter{exporter: sde}
tExp := &stackdriverExporter{texporter: exp}

return exporterhelper.NewTraceExporterOld(
return exporterhelper.NewTraceExporter(
cfg,
tExp.pushTraceData,
tExp.newPushTraceData,
exporterhelper.WithShutdown(tExp.Shutdown))
}

Expand Down Expand Up @@ -141,3 +169,29 @@ func (se *stackdriverExporter) pushTraceData(ctx context.Context, td consumerdat

return len(td.Spans) - goodSpans, componenterror.CombineErrors(errs)
}


// pushTraceData is a wrapper method on StackdriverExporter.PushTraceSpans
func (se *stackdriverExporter) newPushTraceData(ctx context.Context, td pdata.Traces) (int, error) {
var errs []error
resourceSpans := td.ResourceSpans()
numSpans := td.SpanCount()
goodSpans := 0
spans := make([]*export.SpanData, 0, numSpans)

for i := 0; i < resourceSpans.Len(); i++ {
sd, err := pdataResourceSpansToOTSpanData(resourceSpans.At(i))
if err == nil {
spans = append(spans, sd...)
} else {
errs = append(errs, err)
}
}

for _, span := range spans {
se.texporter.ExportSpan(ctx, span)
goodSpans++
}

return numSpans - goodSpans, componenterror.CombineErrors(errs)
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib
go 1.14

require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v0.2.1-0.20200710010320-8556f4b5bc41 // indirect
github.com/client9/misspell v0.3.4
github.com/golangci/golangci-lint v1.27.0
github.com/google/addlicense v0.0.0-20200301095109-7c013a14f2e2
Expand Down Expand Up @@ -37,9 +38,10 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinscribereceiver v0.0.0
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
github.com/tcnksm/ghr v0.13.0
go.opentelemetry.io/collector v0.5.0
go.opentelemetry.io/otel v0.8.0 // indirect
honnef.co/go/tools v0.0.1-2020.1.3
)

Expand Down
Loading

0 comments on commit 120c4ff

Please sign in to comment.