From 1306da5d75f6c708d1dd4d8e98576832f69a6d2c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 4 Aug 2021 15:29:41 -0400 Subject: [PATCH] kv: include RangeID in rangefeed goroutine stacks This commit includes the RangeID in each of a rangefeed processor and its registations' associated goroutine stacks. This is a cheap and easy way to get better observability into the ranges that have active rangefeeds. It also tells us where those goroutines are spending their time. This will also become easier to use in Go 1.17, which improved the format of stack traces. --- .../kvclient/kvcoord/dist_sender_rangefeed.go | 4 +-- pkg/kv/kvserver/rangefeed/processor.go | 14 ++++++---- pkg/kv/kvserver/rangefeed/registry.go | 2 +- pkg/kv/kvserver/rangefeed/registry_test.go | 26 +++++++++---------- pkg/kv/kvserver/replica_rangefeed.go | 9 +++++++ pkg/kv/kvserver/replica_send.go | 2 +- 6 files changed, 35 insertions(+), 22 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index a2d1269eacba..fc2cb46a7a5b 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -189,7 +189,7 @@ func (ds *DistSender) partialRangeFeed( case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, roachpb.RangeFeedRetryError_REASON_RANGE_MERGED, roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: - // Evict the decriptor from the cache. + // Evict the descriptor from the cache. rangeInfo.token.Evict(ctx) return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh) default: @@ -204,7 +204,7 @@ func (ds *DistSender) partialRangeFeed( } // singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed -// RPC call. Results will be send on the provided channel. Returns the timestamp +// RPC call. Results will be sent on the provided channel. Returns the timestamp // of the maximum rangefeed checkpoint seen, which can be used to re-establish // the rangefeed with a larger starting timestamp, reflecting the fact that all // values up to the last checkpoint have already been observed. Returns the diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 0d56a46f0e63..bc1f835116ed 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -50,8 +50,9 @@ func newErrBufferCapacityExceeded() *roachpb.Error { // Config encompasses the configuration required to create a Processor. type Config struct { log.AmbientContext - Clock *hlc.Clock - Span roachpb.RSpan + Clock *hlc.Clock + RangeID roachpb.RangeID + Span roachpb.RSpan TxnPusher TxnPusher // PushTxnsInterval specifies the interval at which a Processor will push @@ -193,7 +194,7 @@ type IteratorConstructor func() storage.SimpleMVCCIterator func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { - p.run(ctx, rtsIterFunc, stopper) + p.run(ctx, p.RangeID, rtsIterFunc, stopper) }); err != nil { pErr := roachpb.NewError(err) p.reg.DisconnectWithErr(all, pErr) @@ -203,7 +204,10 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor // run is called from Start and runs the rangefeed. func (p *Processor) run( - ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper, + ctx context.Context, + _forStacks roachpb.RangeID, + rtsIterFunc IteratorConstructor, + stopper *stop.Stopper, ) { defer close(p.stoppedC) ctx, cancelOutputLoops := context.WithCancel(ctx) @@ -256,7 +260,7 @@ func (p *Processor) run( // Run an output loop for the registry. runOutputLoop := func(ctx context.Context) { - r.runOutputLoop(ctx) + r.runOutputLoop(ctx, p.RangeID) select { case p.unregC <- &r: case <-p.stoppedC: diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index d6041ad62faf..cff3d8ce6af5 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -264,7 +264,7 @@ func (r *registration) outputLoop(ctx context.Context) error { } } -func (r *registration) runOutputLoop(ctx context.Context) { +func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) { r.mu.Lock() ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx) r.mu.Unlock() diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index c52983006bcf..ced4abaabdd9 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -142,7 +142,7 @@ func TestRegistrationBasic(t *testing.T) { noCatchupReg.publish(ev1) noCatchupReg.publish(ev2) require.Equal(t, len(noCatchupReg.buf), 2) - go noCatchupReg.runOutputLoop(context.Background()) + go noCatchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, noCatchupReg.waitForCaughtUp()) require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events()) noCatchupReg.disconnect(nil) @@ -158,7 +158,7 @@ func TestRegistrationBasic(t *testing.T) { catchupReg.publish(ev1) catchupReg.publish(ev2) require.Equal(t, len(catchupReg.buf), 2) - go catchupReg.runOutputLoop(context.Background()) + go catchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, catchupReg.waitForCaughtUp()) events := catchupReg.stream.Events() require.Equal(t, 6, len(events)) @@ -171,7 +171,7 @@ func TestRegistrationBasic(t *testing.T) { disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) disconnectReg.publish(ev1) disconnectReg.publish(ev2) - go disconnectReg.runOutputLoop(context.Background()) + go disconnectReg.runOutputLoop(context.Background(), 0) require.NoError(t, disconnectReg.waitForCaughtUp()) discErr := roachpb.NewError(fmt.Errorf("disconnection error")) disconnectReg.disconnect(discErr) @@ -183,7 +183,7 @@ func TestRegistrationBasic(t *testing.T) { for i := 0; i < cap(overflowReg.buf)+3; i++ { overflowReg.publish(ev1) } - go overflowReg.runOutputLoop(context.Background()) + go overflowReg.runOutputLoop(context.Background(), 0) err = <-overflowReg.errC require.Equal(t, newErrBufferCapacityExceeded(), err) require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events())) @@ -192,7 +192,7 @@ func TestRegistrationBasic(t *testing.T) { streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) streamErr := fmt.Errorf("stream error") streamErrReg.stream.SetSendErr(streamErr) - go streamErrReg.runOutputLoop(context.Background()) + go streamErrReg.runOutputLoop(context.Background(), 0) streamErrReg.publish(ev1) err = <-streamErrReg.errC require.Equal(t, streamErr.Error(), err.GoError().Error()) @@ -200,7 +200,7 @@ func TestRegistrationBasic(t *testing.T) { // Stream Context Canceled. streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) streamCancelReg.stream.Cancel() - go streamCancelReg.runOutputLoop(context.Background()) + go streamCancelReg.runOutputLoop(context.Background(), 0) require.NoError(t, streamCancelReg.waitForCaughtUp()) err = <-streamCancelReg.errC require.Equal(t, streamCancelReg.stream.Context().Err().Error(), err.GoError().Error()) @@ -337,10 +337,10 @@ func TestRegistryBasic(t *testing.T) { rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */) rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */) rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */) - go rAB.runOutputLoop(context.Background()) - go rBC.runOutputLoop(context.Background()) - go rCD.runOutputLoop(context.Background()) - go rAC.runOutputLoop(context.Background()) + go rAB.runOutputLoop(context.Background(), 0) + go rBC.runOutputLoop(context.Background(), 0) + go rCD.runOutputLoop(context.Background(), 0) + go rAC.runOutputLoop(context.Background(), 0) defer rAB.disconnect(nil) defer rBC.disconnect(nil) defer rCD.disconnect(nil) @@ -446,11 +446,11 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) { reg := makeRegistry() rNoDiff := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */) - go rNoDiff.runOutputLoop(context.Background()) + go rNoDiff.runOutputLoop(context.Background(), 0) reg.Register(&rNoDiff.registration) rWithDiff := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */) - go rWithDiff.runOutputLoop(context.Background()) + go rWithDiff.runOutputLoop(context.Background(), 0) reg.Register(&rWithDiff.registration) key := roachpb.Key("a") @@ -498,7 +498,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { reg := makeRegistry() r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false) - go r.runOutputLoop(context.Background()) + go r.runOutputLoop(context.Background(), 0) reg.Register(&r.registration) // Publish a value with a timestamp beneath the registration's start diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index a7a9c0d7a553..ae1becf2a18b 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -139,6 +139,14 @@ func (i iteratorWithCloser) Close() { // of rangefeeds using catchup iterators at the same time. func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) *roachpb.Error { + return r.rangeFeedWithRangeID(r.RangeID, args, stream) +} + +func (r *Replica) rangeFeedWithRangeID( + _forStacks roachpb.RangeID, + args *roachpb.RangeFeedRequest, + stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", @@ -352,6 +360,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( cfg := rangefeed.Config{ AmbientContext: r.AmbientContext, Clock: r.Clock(), + RangeID: r.RangeID, Span: desc.RSpan(), TxnPusher: &tp, PushTxnsInterval: r.store.TestingKnobs().RangeFeedPushTxnsInterval, diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index d6e55d141b3d..6313e8cabb73 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -55,7 +55,7 @@ func (r *Replica) Send( // // github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...) func (r *Replica) sendWithRangeID( - ctx context.Context, rangeID roachpb.RangeID, ba *roachpb.BatchRequest, + ctx context.Context, _forStacks roachpb.RangeID, ba *roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { var br *roachpb.BatchResponse if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {