Skip to content

Commit

Permalink
Merge #111312
Browse files Browse the repository at this point in the history
111312: kvcoord: Fix mux rangefeed startup deadlock r=miretskiy a=miretskiy

Previous PR #110919 modified rangefeed startup logic to rely on rate limit, instead of a semaphore.

The issue exposed by catchup scan rate limiter is that it allowed many more (100/sec vs 8 prior) catchup scans to be started.  If the range resides on a local node, a local "bypass" rpc is created instead (rpc/context.go) and this RPC bypass is implemented via buffered channels.

The deadlock occurs when the client attempts to send request to the server, while holding the lock, but blocks (because channel is full -- i.e. we have sent 128 outstanding requests), and the server blocks for the same reason because the client mux goroutine is blocked attempting to acquire the lock to lookup the stream for the rangefeed message.

The fix moves the state shared by the sender and the consumer -- namely the stream map -- outside of the sender lock.

This fix adds a test that reliably fails under deadlock detection.

Fixes  #111111
Fixes #111165

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Sep 27, 2023
2 parents 635e2fe + 0be08e7 commit 3083a30
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 43 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ go_test(
"@com_github_cockroachdb_errors//errutil",
"@com_github_cockroachdb_redact//:redact",
"@com_github_golang_mock//gomock",
"@com_github_sasha_s_go_deadlock//:go-deadlock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
Expand Down
85 changes: 49 additions & 36 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ func muxRangeFeed(
type muxStream struct {
nodeID roachpb.NodeID

streams syncutil.IntMap // streamID -> *activeMuxRangeFeed

// mu must be held when starting rangefeed.
mu struct {
syncutil.Mutex
sender rangeFeedRequestSender
streams map[int64]*activeMuxRangeFeed
closed bool
sender rangeFeedRequestSender
closed bool
}
}

Expand Down Expand Up @@ -290,7 +291,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error

conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID)
if err == nil {
err = conn.startRangeFeed(streamID, s, &args)
err = conn.startRangeFeed(streamID, s, &args, m.cfg.knobs.beforeSendRequest)
}

if err != nil {
Expand Down Expand Up @@ -380,7 +381,6 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(

ms := muxStream{nodeID: nodeID}
ms.mu.sender = mux
ms.mu.streams = make(map[int64]*activeMuxRangeFeed)
if err := future.MustSet(stream, muxStreamOrError{stream: &ms}); err != nil {
return err
}
Expand Down Expand Up @@ -474,7 +474,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
if active.catchupRes != nil {
m.metrics.Errors.RangefeedErrorCatchup.Inc(1)
}
ms.deleteStream(event.StreamID)
ms.streams.Delete(event.StreamID)
// Restart rangefeed on another goroutine. Restart might be a bit
// expensive, particularly if we have to resolve span. We do not want
// to block receiveEventsFromNode for too long.
Expand Down Expand Up @@ -555,53 +555,66 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
// on this node connection. If no error returned, registers stream
// with this connection. Otherwise, stream is not registered.
func (c *muxStream) startRangeFeed(
streamID int64, stream *activeMuxRangeFeed, req *kvpb.RangeFeedRequest,
) error {
streamID int64, stream *activeMuxRangeFeed, req *kvpb.RangeFeedRequest, beforeSend func(),
) (retErr error) {
// NB: lock must be held for the duration of this method.
// The reasons for this are twofold:
// 1. Send calls must be protected against concurrent calls.
// 2. The muxStream may be in the process of restart -- that is receiveEventsFromNode just
// returned an error. When that happens, muxStream is closed, and all rangefeeds
// belonging to this muxStream are restarted. The lock here synchronizes with the close()
// call so that we either observe the fact that muxStream is closed when this method runs,
// or that the close waits until this call completes.
// Note also, the Send method may block. That's alright. If the call is blocked because
// the server side just returned an error, then, the send call should abort and cause an
// error to be returned, releasing the lock, and letting close proceed.
c.mu.Lock()
defer c.mu.Unlock()

// As soon as we issue Send below, the stream may return an event or an error that
// may be seen by the event consumer (receiveEventsFromNode).
// Therefore, we update streams map immediately, but undo this insert in case of an error,
// which is returned to the caller for retry.
c.streams.Store(streamID, unsafe.Pointer(stream))

defer func() {
if retErr != nil {
// undo stream registration.
c.streams.Delete(streamID)
}
}()

if c.mu.closed {
return net.ErrClosed
}

// Concurrent Send calls are not thread safe; thus Send calls must be
// synchronized.
if err := c.mu.sender.Send(req); err != nil {
return err
if beforeSend != nil {
beforeSend()
}

// As soon as we issue Send above, the stream may return an error that
// may be seen by the event consumer (receiveEventsFromNode).
// Therefore, we update streams map under the lock to ensure that the
// receiver will be able to observe this stream.
c.mu.streams[streamID] = stream
return nil
return c.mu.sender.Send(req)
}

func (c *muxStream) lookupStream(streamID int64) (a *activeMuxRangeFeed) {
c.mu.Lock()
a = c.mu.streams[streamID]
c.mu.Unlock()
return a
}

func (c *muxStream) deleteStream(streamID int64) {
c.mu.Lock()
delete(c.mu.streams, streamID)
c.mu.Unlock()
func (c *muxStream) lookupStream(streamID int64) *activeMuxRangeFeed {
v, ok := c.streams.Load(streamID)
if ok {
return (*activeMuxRangeFeed)(v)
}
return nil
}

// close closes mux stream returning the list of active range feeds.
func (c *muxStream) close() []*activeMuxRangeFeed {
func (c *muxStream) close() (toRestart []*activeMuxRangeFeed) {
// NB: lock must be held for the duration of this method to synchronize with startRangeFeed.
c.mu.Lock()
defer c.mu.Unlock()

c.mu.closed = true
toRestart := make([]*activeMuxRangeFeed, 0, len(c.mu.streams))
for _, a := range c.mu.streams {
toRestart = append(toRestart, a)
}
c.mu.streams = nil
c.mu.Unlock()

c.streams.Range(func(_ int64, v unsafe.Pointer) bool {
toRestart = append(toRestart, (*activeMuxRangeFeed)(v))
return true
})

return toRestart
}
Expand Down
47 changes: 41 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type rangeFeedConfig struct {
// captureMuxRangeFeedRequestSender is a callback invoked when mux
// rangefeed establishes connection to the node.
captureMuxRangeFeedRequestSender func(nodeID roachpb.NodeID, sender func(req *kvpb.RangeFeedRequest) error)
// beforeSendRequest is a mux rangefeed callback invoked prior to sending rangefeed request.
beforeSendRequest func()
}
}

Expand Down Expand Up @@ -973,6 +975,14 @@ func TestingWithMuxRangeFeedRequestSenderCapture(
})
}

// TestingWithBeforeSendRequest returns a test only option that invokes
// function before sending rangefeed request.
func TestingWithBeforeSendRequest(fn func()) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.knobs.beforeSendRequest = fn
})
}

// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use.
var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics

Expand All @@ -983,10 +993,15 @@ type catchupScanRateLimiter struct {
}

func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
rl := &catchupScanRateLimiter{sv: sv}
rl.limit = getCatchupRateLimit(rl.sv)
rl.pacer = quotapool.NewRateLimiter("distSenderCatchupLimit", rl.limit, 0 /* smooth rate */)
return rl
const slowAcquisitionThreshold = 5 * time.Second
lim := getCatchupRateLimit(sv)
return &catchupScanRateLimiter{
sv: sv,
limit: lim,
pacer: quotapool.NewRateLimiter(
"distSenderCatchupLimit", lim, 0, /* smooth rate limit without burst */
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowCatchupScanAcquisition(slowAcquisitionThreshold))),
}
}

func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
Expand All @@ -1001,8 +1016,28 @@ func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error {
// Take opportunity to update limits if they have changed.
if lim := getCatchupRateLimit(rl.sv); lim != rl.limit {
rl.limit = lim
rl.pacer.UpdateLimit(lim, 0)
rl.pacer.UpdateLimit(lim, 0 /* smooth rate limit without burst */)
}

return rl.pacer.WaitN(ctx, 1)
}

// logSlowCatchupScanAcquisition is a function returning a quotapool.SlowAcquisitionFunction.
// It differs from the quotapool.LogSlowAcquisition in that only some of slow acquisition
// events are logged to reduce log spam.
func logSlowCatchupScanAcquisition(loggingMinInterval time.Duration) quotapool.SlowAcquisitionFunc {
logSlowAcquire := log.Every(loggingMinInterval)

return func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) func() {
shouldLog := logSlowAcquire.ShouldLog()
if shouldLog {
log.Warningf(ctx, "have been waiting %s attempting to acquire catchup scan quota",
timeutil.Since(start))
}

return func() {
if shouldLog {
log.Infof(ctx, "acquired catchup quota after %s", timeutil.Since(start))
}
}
}
}
64 changes: 64 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/sasha-s/go-deadlock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -258,6 +259,9 @@ func channelWaitWithTimeout(t *testing.T, ch chan struct{}) {
if util.RaceEnabled {
timeOut *= 10
}
if syncutil.DeadlockEnabled {
timeOut = 2 * deadlock.Opts.DeadlockTimeout
}
select {
case <-ch:
case <-time.After(timeOut):
Expand Down Expand Up @@ -1052,3 +1056,63 @@ func TestMuxRangeFeedCanCloseStream(t *testing.T) {
// Mux rangefeed should retry, and thus we expect frontier to keep advancing.
}
}

// TestMuxRangeFeedDoesNotDeadlockWithLocalStreams verifies mux rangefeed does not
// deadlock when running against many local ranges. Local ranges use local RPC
// bypass (rpc/context.go) which utilize buffered channels for client/server streaming
// RPC communication.
func TestMuxRangeFeedDoesNotDeadlockWithLocalStreams(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

if !syncutil.DeadlockEnabled {
t.Log("skipping test: it requires deadlock detection enabled.")
return
}

// Lower syncutil deadlock timeout.
deadlock.Opts.DeadlockTimeout = 2 * time.Minute

// Make deadlock more likely: use unbuffered channel.
defer rpc.TestingSetLocalStreamChannelBufferSize(0)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

ts := tc.Server(0).ApplicationLayer()
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

// Insert 1000 rows, and split them into many ranges.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.closed_timestamp.target_duration='100ms'`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
)

startFrom := ts.Clock().Now()

sqlDB.ExecMultiple(t,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 20))`,
)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo")
fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)

allSeen, onValue := observeNValues(1000)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startFrom, onValue, true,
kvcoord.WithMuxRangeFeed(),
kvcoord.TestingWithBeforeSendRequest(func() {
// Prior to sending rangefeed request, block for just a bit
// to make deadlock more likely.
time.Sleep(100 * time.Millisecond)
}),
)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
}
14 changes: 13 additions & 1 deletion pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"math"
"net"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -1258,6 +1259,17 @@ type pipe struct {
errC chan error
}

// buffer size for channel used to connect local streaming rpcs.
var localStreamChannelBufferSize int64 = 128 // accessed atomically.

// TestingSetLocalStreamChannelBufferSize overrides channel buffer size
// used for streaming RPCs.
func TestingSetLocalStreamChannelBufferSize(s int64) func() {
old := atomic.LoadInt64(&localStreamChannelBufferSize)
atomic.StoreInt64(&localStreamChannelBufferSize, s)
return func() { atomic.StoreInt64(&localStreamChannelBufferSize, old) }
}

// makePipe creates a pipe and return it as its two ends.
//
// assignPtr is a function that implements *dst = *src for the type of the
Expand All @@ -1267,7 +1279,7 @@ type pipe struct {
// (i.e. interface{}) way.
func makePipe(assignPtr func(dst interface{}, src interface{})) (pipeWriter, pipeReader) {
p := &pipe{
respC: make(chan interface{}, 128),
respC: make(chan interface{}, atomic.LoadInt64(&localStreamChannelBufferSize)),
errC: make(chan error, 1),
}
w := pipeWriter{pipe: p}
Expand Down

0 comments on commit 3083a30

Please sign in to comment.