Skip to content

Commit

Permalink
kvcoord: Release catchup reservation before re-acquire attempt
Browse files Browse the repository at this point in the history
Release catchup scan reservation prior to attemt to re-acquire
it.  Failure to do so could result in a stuck mux rangefeed when enough
ranges encounter an error at the same time (which can happen if
e.g. a node gets restarted).

Fixes #105058

Release note (bug fix): Fix a bug in mux rangefeed implementation that
may cause mux rangefeed to become stuck if enough ranges enounter
certain error concurrently.
  • Loading branch information
Yevgeniy Miretskiy committed Jun 16, 2023
1 parent 6b30777 commit 7031b98
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 8 deletions.
38 changes: 31 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func muxRangeFeed(
catchupSem: catchupSem,
eventCh: eventCh,
}

divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g)
return errors.CombineErrors(m.g.Wait(), ctx.Err())
}
Expand Down Expand Up @@ -209,18 +210,11 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()

// Before starting single rangefeed, acquire catchup scan quota.
catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem)
if err != nil {
return err
}

// Register active mux range feed.
stream := &activeMuxRangeFeed{
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges),
rSpan: rs,
startAfter: startAfter,
catchupRes: catchupRes,
token: token,
}

Expand All @@ -246,6 +240,15 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error {
streamID := atomic.AddInt64(&m.seqID, 1)

{
// Before starting single rangefeed, acquire catchup scan quota.
catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem)
if err != nil {
return err
}
s.catchupRes = catchupRes
}

// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); {
// If we've cleared the descriptor on failure, re-lookup.
Expand Down Expand Up @@ -445,6 +448,10 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
continue
}

if m.cfg.knobs.onMuxRangefeedEvent != nil {
m.cfg.knobs.onMuxRangefeedEvent(event)
}

switch t := event.GetValue().(type) {
case *kvpb.RangeFeedCheckpoint:
if t.Span.Contains(active.Span) {
Expand Down Expand Up @@ -530,6 +537,13 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
}
active.setLastError(reason)

// Release catchup scan reservation if any -- we will acquire another
// one when we restart.
if active.catchupRes != nil {
active.catchupRes.Release()
active.catchupRes = nil
}

doRelease := true
defer func() {
if doRelease {
Expand Down Expand Up @@ -624,3 +638,13 @@ func (c *muxStream) close() []*activeMuxRangeFeed {

return toRestart
}

// a test only option to modify mux rangefeed event.
func withOnMuxEvent(fn func(event *kvpb.MuxRangeFeedEvent)) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.knobs.onMuxRangefeedEvent = fn
})
}

// TestingWithOnMuxEvent allow external tests access to the withOnMuxEvent option.
var TestingWithOnMuxEvent = withOnMuxEvent
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type rangeFeedConfig struct {
useMuxRangeFeed bool
overSystemTable bool
withDiff bool

knobs struct {
onMuxRangefeedEvent func(event *kvpb.MuxRangeFeedEvent)
}
}

// RangeFeedOption configures a RangeFeed.
Expand Down
50 changes: 49 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ func rangeFeed(
startFrom hlc.Timestamp,
onValue func(event kvcoord.RangeFeedMessage),
useMuxRangeFeed bool,
opts ...kvcoord.RangeFeedOption,
) func() {
ds := dsI.(*kvcoord.DistSender)
events := make(chan kvcoord.RangeFeedMessage)
ctx, cancel := context.WithCancel(context.WithValue(context.Background(), testFeedCtxKey{}, struct{}{}))

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) (err error) {
var opts []kvcoord.RangeFeedOption
if useMuxRangeFeed {
opts = append(opts, kvcoord.WithMuxRangeFeed())
ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{})
Expand Down Expand Up @@ -585,3 +585,51 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) {
// retry.
require.NotZero(t, ds.Metrics().RangefeedRestartStuck.Count())
}

func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

ts := tc.Server(0)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
startTime := ts.Clock().Now()

// Initial setup: only single catchup scan allowed.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.rangefeed.catchup_scan_concurrency = 1`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo")
fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)

// This error causes rangefeed to restart after re-resolving spans, and causes
// catchup scan quota acquisition.
transientErrEvent := kvpb.RangeFeedEvent{
Error: &kvpb.RangeFeedError{
Error: *kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT)),
}}
noValuesExpected := func(event kvcoord.RangeFeedMessage) {
panic("received value when none expected")
}
const numErrsToReturn = 100
var numErrors atomic.Int32
enoughErrors := make(chan struct{})
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, noValuesExpected, true,
kvcoord.TestingWithOnMuxEvent(func(event *kvpb.MuxRangeFeedEvent) {
event.RangeFeedEvent = transientErrEvent
if numErrors.Add(1) == numErrsToReturn {
close(enoughErrors)
}
}))
channelWaitWithTimeout(t, enoughErrors)
closeFeed()
}

0 comments on commit 7031b98

Please sign in to comment.