Skip to content

Commit

Permalink
Remove locking from Jaeger exporter shutdown/export (open-telemetry#1807
Browse files Browse the repository at this point in the history
)
  • Loading branch information
MrAlias authored Apr 17, 2021
1 parent 4f9fec2 commit 2de86f2
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"sync"

"google.golang.org/api/support/bundler"

Expand Down Expand Up @@ -115,8 +114,10 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
return nil, fmt.Errorf("failed to get service name from default resource")
}

stopCh := make(chan struct{})
e := &Exporter{
uploader: uploader,
stopCh: stopCh,
defaultServiceName: defaultServiceName,
}
bundler := bundler.NewBundler((*sdktrace.SpanSnapshot)(nil), func(bundle interface{}) {
Expand Down Expand Up @@ -180,8 +181,7 @@ type Exporter struct {
bundler *bundler.Bundler
uploader batchUploader

stoppedMu sync.RWMutex
stopped bool
stopCh chan struct{}

defaultServiceName string
}
Expand All @@ -190,13 +190,27 @@ var _ sdktrace.SpanExporter = (*Exporter)(nil)

// ExportSpans exports SpanSnapshots to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error {
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
// Return fast if context is already canceled or Exporter shutdown.
select {
case <-ctx.Done():
return ctx.Err()
case <-e.stopCh:
return nil
default:
}

// Cancel export if Exporter is shutdown.
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
case <-e.stopCh:
cancel()
}
}(ctx, cancel)

for _, span := range ss {
// TODO(jbd): Handle oversized bundlers.
err := e.bundler.AddWait(ctx, span, 1)
Expand All @@ -220,9 +234,8 @@ var flush = func(e *Exporter) {

// Shutdown stops the exporter flushing any pending exports.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
// Stop any active and subsequent exports.
close(e.stopCh)

done := make(chan struct{}, 1)
// Shadow so if the goroutine is leaked in testing it doesn't cause a race
Expand Down Expand Up @@ -408,6 +421,12 @@ func getBoolTag(k string, b bool) *gen.Tag {
//
// This is useful if your program is ending and you do not want to lose recent spans.
func (e *Exporter) Flush() {
// Return fast if Exporter shutdown.
select {
case <-e.stopCh:
return
default:
}
flush(e)
}

Expand Down

0 comments on commit 2de86f2

Please sign in to comment.