Skip to content

Commit

Permalink
spanconfigsubscriber: pass ctx to KVSubscriber callbacks
Browse files Browse the repository at this point in the history
One of these callbacks, in store.go, was capturing a ctx and using it
async. That was a tracing span use-after-finish. This patch fixes it by
plumbing contexts and avoiding the capture.

Release note: None
  • Loading branch information
andreimatei committed Feb 9, 2022
1 parent 90629ae commit 14001c5
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 13 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
require.True(t, added[0].Config.Equal(conf))

require.NotNil(t, mockSubscriber.callback)
mockSubscriber.callback(span) // invoke the callback
mockSubscriber.callback(ctx, span) // invoke the callback
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(keys.MustAddr(key))
gotConfig := repl.SpanConfig()
Expand All @@ -92,7 +92,7 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
}

type mockSpanConfigSubscriber struct {
callback func(config roachpb.Span)
callback func(ctx context.Context, config roachpb.Span)
spanconfig.Store
}

Expand Down Expand Up @@ -122,6 +122,6 @@ func (m *mockSpanConfigSubscriber) LastUpdated() hlc.Timestamp {
panic("unimplemented")
}

func (m *mockSpanConfigSubscriber) Subscribe(callback func(roachpb.Span)) {
func (m *mockSpanConfigSubscriber) Subscribe(callback func(context.Context, roachpb.Span)) {
m.callback = callback
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
}

if !s.cfg.SpanConfigsDisabled {
s.cfg.SpanConfigSubscriber.Subscribe(func(update roachpb.Span) {
s.cfg.SpanConfigSubscriber.Subscribe(func(ctx context.Context, update roachpb.Span) {
s.onSpanConfigUpdate(ctx, update)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type KVAccessor interface {
type KVSubscriber interface {
StoreReader
LastUpdated() hlc.Timestamp
Subscribe(func(updated roachpb.Span))
Subscribe(func(ctx context.Context, updated roachpb.Span))
}

// SQLTranslator translates SQL descriptors and their corresponding zone
Expand Down
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestDataDriven(t *testing.T) {
},
)

kvSubscriber.Subscribe(func(span roachpb.Span) {
kvSubscriber.Subscribe(func(ctx context.Context, span roachpb.Span) {
mu.Lock()
defer mu.Unlock()
mu.receivedUpdates = append(mu.receivedUpdates, span)
Expand Down
14 changes: 7 additions & 7 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *KVSubscriber) Start(ctx context.Context, stopper *stop.Stopper) error {

// Subscribe installs a callback that's invoked with whatever span may have seen
// a config update.
func (s *KVSubscriber) Subscribe(fn func(roachpb.Span)) {
func (s *KVSubscriber) Subscribe(fn func(context.Context, roachpb.Span)) {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -230,7 +230,7 @@ func (s *KVSubscriber) handleCompleteUpdate(
handlers := s.mu.handlers
s.mu.Unlock()
for _, h := range handlers {
h.invoke(keys.EverythingSpan)
h.invoke(ctx, keys.EverythingSpan)
}
}

Expand All @@ -255,28 +255,28 @@ func (s *KVSubscriber) handlePartialUpdate(
// here as well.
sp := ev.(*bufferEvent).Update.Target.GetSpan()
if sp != nil {
h.invoke(*sp)
h.invoke(ctx, *sp)
}
}
}
}

type handler struct {
initialized bool // tracks whether we need to invoke with a [min,max) span first
fn func(update roachpb.Span)
fn func(ctx context.Context, update roachpb.Span)
}

func (h *handler) invoke(update roachpb.Span) {
func (h *handler) invoke(ctx context.Context, update roachpb.Span) {
if !h.initialized {
h.fn(keys.EverythingSpan)
h.fn(ctx, keys.EverythingSpan)
h.initialized = true

if update.Equal(keys.EverythingSpan) {
return // we can opportunistically avoid re-invoking with the same update
}
}

h.fn(update)
h.fn(ctx, update)
}

type bufferEvent struct {
Expand Down

0 comments on commit 14001c5

Please sign in to comment.