Skip to content

Commit

Permalink
Unify TracerProvider span processor lookups (#3942)
Browse files Browse the repository at this point in the history
* Pre-allocate spanProcessorStates slice

* Make sync.Once a non-pointer

It doesn't need to be a pointer, can be part of the struct to avoid allocating a separate object for it

* getSpanProcessors() helper

* Add tests for UnregisterSpanProcessor()
  • Loading branch information
ash2k authored Apr 1, 2023
1 parent 271df1d commit 22fd104
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 16 deletions.
23 changes: 14 additions & 9 deletions sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
}
global.Info("TracerProvider created", "config", o)

spss := spanProcessorStates{}
spss := make(spanProcessorStates, 0, len(o.processors))
for _, sp := range o.processors {
spss = append(spss, newSpanProcessorState(sp))
}
Expand Down Expand Up @@ -192,8 +192,10 @@ func (p *TracerProvider) RegisterSpanProcessor(sp SpanProcessor) {
if p.isShutdown.Load() {
return
}
newSPS := spanProcessorStates{}
newSPS = append(newSPS, *(p.spanProcessors.Load())...)

current := p.getSpanProcessors()
newSPS := make(spanProcessorStates, 0, len(current)+1)
newSPS = append(newSPS, current...)
newSPS = append(newSPS, newSpanProcessorState(sp))
p.spanProcessors.Store(&newSPS)
}
Expand All @@ -210,12 +212,12 @@ func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
if p.isShutdown.Load() {
return
}
old := *(p.spanProcessors.Load())
old := p.getSpanProcessors()
if len(old) == 0 {
return
}
spss := spanProcessorStates{}
spss = append(spss, old...)
spss := make(spanProcessorStates, len(old))
copy(spss, old)

// stop the span processor if it is started and remove it from the list
var stopOnce *spanProcessorState
Expand Down Expand Up @@ -245,7 +247,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
// ForceFlush immediately exports all spans that have not yet been exported for
// all the registered span processors.
func (p *TracerProvider) ForceFlush(ctx context.Context) error {
spss := *(p.spanProcessors.Load())
spss := p.getSpanProcessors()
if len(spss) == 0 {
return nil
}
Expand Down Expand Up @@ -278,10 +280,9 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {
if !p.isShutdown.CompareAndSwap(false, true) { // did toggle?
return nil
}
spss := *(p.spanProcessors.Load())

var retErr error
for _, sps := range spss {
for _, sps := range p.getSpanProcessors() {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -305,6 +306,10 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {
return retErr
}

func (p *TracerProvider) getSpanProcessors() spanProcessorStates {
return *(p.spanProcessors.Load())
}

// TracerProviderOption configures a TracerProvider.
type TracerProviderOption interface {
apply(tracerProviderConfig) tracerProviderConfig
Expand Down
57 changes: 54 additions & 3 deletions sdk/trace/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,57 @@ func TestForceFlushAndShutdownTraceProviderWithoutProcessor(t *testing.T) {
assert.True(t, stp.isShutdown.Load())
}

func TestUnregisterFirst(t *testing.T) {
stp := NewTracerProvider()
sp1 := &basicSpanProcessor{}
sp2 := &basicSpanProcessor{}
sp3 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp1)
stp.RegisterSpanProcessor(sp2)
stp.RegisterSpanProcessor(sp3)

stp.UnregisterSpanProcessor(sp1)

sps := stp.getSpanProcessors()
require.Len(t, sps, 2)
assert.Same(t, sp2, sps[0].sp)
assert.Same(t, sp3, sps[1].sp)
}

func TestUnregisterMiddle(t *testing.T) {
stp := NewTracerProvider()
sp1 := &basicSpanProcessor{}
sp2 := &basicSpanProcessor{}
sp3 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp1)
stp.RegisterSpanProcessor(sp2)
stp.RegisterSpanProcessor(sp3)

stp.UnregisterSpanProcessor(sp2)

sps := stp.getSpanProcessors()
require.Len(t, sps, 2)
assert.Same(t, sp1, sps[0].sp)
assert.Same(t, sp3, sps[1].sp)
}

func TestUnregisterLast(t *testing.T) {
stp := NewTracerProvider()
sp1 := &basicSpanProcessor{}
sp2 := &basicSpanProcessor{}
sp3 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp1)
stp.RegisterSpanProcessor(sp2)
stp.RegisterSpanProcessor(sp3)

stp.UnregisterSpanProcessor(sp3)

sps := stp.getSpanProcessors()
require.Len(t, sps, 2)
assert.Same(t, sp1, sps[0].sp)
assert.Same(t, sp2, sps[1].sp)
}

func TestShutdownTraceProvider(t *testing.T) {
stp := NewTracerProvider()
sp := &basicSpanProcessor{}
Expand Down Expand Up @@ -162,7 +213,7 @@ func TestRegisterAfterShutdownWithoutProcessors(t *testing.T) {

sp := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp) // no-op
assert.Empty(t, stp.spanProcessors.Load())
assert.Empty(t, stp.getSpanProcessors())
}

func TestRegisterAfterShutdownWithProcessors(t *testing.T) {
Expand All @@ -173,11 +224,11 @@ func TestRegisterAfterShutdownWithProcessors(t *testing.T) {
err := stp.Shutdown(context.Background())
assert.NoError(t, err)
assert.True(t, stp.isShutdown.Load())
assert.Empty(t, stp.spanProcessors.Load())
assert.Empty(t, stp.getSpanProcessors())

sp2 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp2) // no-op
assert.Empty(t, stp.spanProcessors.Load())
assert.Empty(t, stp.getSpanProcessors())
}

func TestTracerProviderSamplerConfigFromEnv(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sdk/trace/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
}
s.mu.Unlock()

sps := *(s.tracer.provider.spanProcessors.Load())
sps := s.tracer.provider.getSpanProcessors()
if len(sps) == 0 {
return
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/trace/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ type SpanProcessor interface {

type spanProcessorState struct {
sp SpanProcessor
state *sync.Once
state sync.Once
}

func newSpanProcessorState(sp SpanProcessor) *spanProcessorState {
return &spanProcessorState{sp: sp, state: &sync.Once{}}
return &spanProcessorState{sp: sp}
}

type spanProcessorStates []*spanProcessorState
2 changes: 1 addition & 1 deletion sdk/trace/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanS

s := tr.newSpan(ctx, name, &config)
if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() {
sps := *(tr.provider.spanProcessors.Load())
sps := tr.provider.getSpanProcessors()
for _, sp := range sps {
sp.sp.OnStart(ctx, rw)
}
Expand Down

0 comments on commit 22fd104

Please sign in to comment.