Skip to content

Commit

Permalink
kv: include RangeID in rangefeed goroutine stacks
Browse files Browse the repository at this point in the history
This commit includes the RangeID in each of a rangefeed processor and
its registations' associated goroutine stacks. This is a cheap and easy
way to get better observability into the ranges that have active
rangefeeds. It also tells us where those goroutines are spending their
time.

This will also become easier to use in Go 1.17, which improved the
format of stack traces.
  • Loading branch information
nvanbenschoten committed Aug 4, 2021
1 parent 1443ced commit 1306da5
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (ds *DistSender) partialRangeFeed(
case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
roachpb.RangeFeedRetryError_REASON_RANGE_MERGED,
roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER:
// Evict the decriptor from the cache.
// Evict the descriptor from the cache.
rangeInfo.token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh)
default:
Expand All @@ -204,7 +204,7 @@ func (ds *DistSender) partialRangeFeed(
}

// singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed
// RPC call. Results will be send on the provided channel. Returns the timestamp
// RPC call. Results will be sent on the provided channel. Returns the timestamp
// of the maximum rangefeed checkpoint seen, which can be used to re-establish
// the rangefeed with a larger starting timestamp, reflecting the fact that all
// values up to the last checkpoint have already been observed. Returns the
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func newErrBufferCapacityExceeded() *roachpb.Error {
// Config encompasses the configuration required to create a Processor.
type Config struct {
log.AmbientContext
Clock *hlc.Clock
Span roachpb.RSpan
Clock *hlc.Clock
RangeID roachpb.RangeID
Span roachpb.RSpan

TxnPusher TxnPusher
// PushTxnsInterval specifies the interval at which a Processor will push
Expand Down Expand Up @@ -193,7 +194,7 @@ type IteratorConstructor func() storage.SimpleMVCCIterator
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, rtsIterFunc, stopper)
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
Expand All @@ -203,7 +204,10 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor

// run is called from Start and runs the rangefeed.
func (p *Processor) run(
ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper,
ctx context.Context,
_forStacks roachpb.RangeID,
rtsIterFunc IteratorConstructor,
stopper *stop.Stopper,
) {
defer close(p.stoppedC)
ctx, cancelOutputLoops := context.WithCancel(ctx)
Expand Down Expand Up @@ -256,7 +260,7 @@ func (p *Processor) run(

// Run an output loop for the registry.
runOutputLoop := func(ctx context.Context) {
r.runOutputLoop(ctx)
r.runOutputLoop(ctx, p.RangeID)
select {
case p.unregC <- &r:
case <-p.stoppedC:
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (r *registration) outputLoop(ctx context.Context) error {
}
}

func (r *registration) runOutputLoop(ctx context.Context) {
func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) {
r.mu.Lock()
ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx)
r.mu.Unlock()
Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestRegistrationBasic(t *testing.T) {
noCatchupReg.publish(ev1)
noCatchupReg.publish(ev2)
require.Equal(t, len(noCatchupReg.buf), 2)
go noCatchupReg.runOutputLoop(context.Background())
go noCatchupReg.runOutputLoop(context.Background(), 0)
require.NoError(t, noCatchupReg.waitForCaughtUp())
require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events())
noCatchupReg.disconnect(nil)
Expand All @@ -158,7 +158,7 @@ func TestRegistrationBasic(t *testing.T) {
catchupReg.publish(ev1)
catchupReg.publish(ev2)
require.Equal(t, len(catchupReg.buf), 2)
go catchupReg.runOutputLoop(context.Background())
go catchupReg.runOutputLoop(context.Background(), 0)
require.NoError(t, catchupReg.waitForCaughtUp())
events := catchupReg.stream.Events()
require.Equal(t, 6, len(events))
Expand All @@ -171,7 +171,7 @@ func TestRegistrationBasic(t *testing.T) {
disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
disconnectReg.publish(ev1)
disconnectReg.publish(ev2)
go disconnectReg.runOutputLoop(context.Background())
go disconnectReg.runOutputLoop(context.Background(), 0)
require.NoError(t, disconnectReg.waitForCaughtUp())
discErr := roachpb.NewError(fmt.Errorf("disconnection error"))
disconnectReg.disconnect(discErr)
Expand All @@ -183,7 +183,7 @@ func TestRegistrationBasic(t *testing.T) {
for i := 0; i < cap(overflowReg.buf)+3; i++ {
overflowReg.publish(ev1)
}
go overflowReg.runOutputLoop(context.Background())
go overflowReg.runOutputLoop(context.Background(), 0)
err = <-overflowReg.errC
require.Equal(t, newErrBufferCapacityExceeded(), err)
require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events()))
Expand All @@ -192,15 +192,15 @@ func TestRegistrationBasic(t *testing.T) {
streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
streamErr := fmt.Errorf("stream error")
streamErrReg.stream.SetSendErr(streamErr)
go streamErrReg.runOutputLoop(context.Background())
go streamErrReg.runOutputLoop(context.Background(), 0)
streamErrReg.publish(ev1)
err = <-streamErrReg.errC
require.Equal(t, streamErr.Error(), err.GoError().Error())

// Stream Context Canceled.
streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
streamCancelReg.stream.Cancel()
go streamCancelReg.runOutputLoop(context.Background())
go streamCancelReg.runOutputLoop(context.Background(), 0)
require.NoError(t, streamCancelReg.waitForCaughtUp())
err = <-streamCancelReg.errC
require.Equal(t, streamCancelReg.stream.Context().Err().Error(), err.GoError().Error())
Expand Down Expand Up @@ -337,10 +337,10 @@ func TestRegistryBasic(t *testing.T) {
rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */)
rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */)
rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */)
go rAB.runOutputLoop(context.Background())
go rBC.runOutputLoop(context.Background())
go rCD.runOutputLoop(context.Background())
go rAC.runOutputLoop(context.Background())
go rAB.runOutputLoop(context.Background(), 0)
go rBC.runOutputLoop(context.Background(), 0)
go rCD.runOutputLoop(context.Background(), 0)
go rAC.runOutputLoop(context.Background(), 0)
defer rAB.disconnect(nil)
defer rBC.disconnect(nil)
defer rCD.disconnect(nil)
Expand Down Expand Up @@ -446,11 +446,11 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) {
reg := makeRegistry()

rNoDiff := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */)
go rNoDiff.runOutputLoop(context.Background())
go rNoDiff.runOutputLoop(context.Background(), 0)
reg.Register(&rNoDiff.registration)

rWithDiff := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */)
go rWithDiff.runOutputLoop(context.Background())
go rWithDiff.runOutputLoop(context.Background(), 0)
reg.Register(&rWithDiff.registration)

key := roachpb.Key("a")
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) {
reg := makeRegistry()

r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false)
go r.runOutputLoop(context.Background())
go r.runOutputLoop(context.Background(), 0)
reg.Register(&r.registration)

// Publish a value with a timestamp beneath the registration's start
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ func (i iteratorWithCloser) Close() {
// of rangefeeds using catchup iterators at the same time.
func (r *Replica) RangeFeed(
args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer,
) *roachpb.Error {
return r.rangeFeedWithRangeID(r.RangeID, args, stream)
}

func (r *Replica) rangeFeedWithRangeID(
_forStacks roachpb.RangeID,
args *roachpb.RangeFeedRequest,
stream roachpb.Internal_RangeFeedServer,
) *roachpb.Error {
if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
Expand Down Expand Up @@ -352,6 +360,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
cfg := rangefeed.Config{
AmbientContext: r.AmbientContext,
Clock: r.Clock(),
RangeID: r.RangeID,
Span: desc.RSpan(),
TxnPusher: &tp,
PushTxnsInterval: r.store.TestingKnobs().RangeFeedPushTxnsInterval,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *Replica) Send(
//
// github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...)
func (r *Replica) sendWithRangeID(
ctx context.Context, rangeID roachpb.RangeID, ba *roachpb.BatchRequest,
ctx context.Context, _forStacks roachpb.RangeID, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
var br *roachpb.BatchResponse
if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
Expand Down

0 comments on commit 1306da5

Please sign in to comment.