Skip to content

Commit

Permalink
kvcoord: Fix error handling and retries in mux rangefeed client
Browse files Browse the repository at this point in the history
Fix error handling and retries when restarting rangefeeds.
A big difference between regular rangefeed, and mux rangefeed,
is regular rangefeed has a dedicated go routine per range.
This go routine is responsible for running a rangefeed, handling
its errors, management of state pertaining to the RPC call
(transport, retry information, routing information, etc), and
restarting rangefeed with backoff as needed.

Mux rangefeed, on the other hand, is not "loop" based.  Prior
to this PR, mux rangefeed, when it encountered a transient error,
would loose a lot of the restart state mentioned above.  For example,
it would loose the transport information, so that the restart would
run against the same node as before, resulting, potentially, in
busy loops.  Those busy loops (where the RPC call is restarted
against the same node/replica that just experienced an error), would
tend to make test flaky since they would take longer time to converge
to the state expected by the tests (such as `TestDecommission`) test.

This PR fixes this loss of state pertaining to single range restart
by associating this state with the long lived `activeMuxRangeFeed` state.

Fixes cockroachdb#96630
Fixes cockroachdb#100783
Informs cockroachdb#99631
Informs cockroachdb#101614

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 24, 2023
1 parent 3c23346 commit 0c0053f
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 68 deletions.
193 changes: 126 additions & 67 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -72,12 +73,11 @@ func muxRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
// TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved.
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans))
start := timeutil.Now()
defer func() {
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
}()
}

Expand Down Expand Up @@ -126,20 +126,68 @@ type muxStreamOrError struct {
}

// activeMuxRangeFeed augments activeRangeFeed with additional state.
// This is a long-lived data structure representing the lifetime of a single
// range feed.
// The lifetime of a single range feed is as follows:
//
// ┌─* muxRangeFeed
// ┌───►├─► divideSpanOnRangeBoundaries
// │ │ Divide target span(s) on range boundaries
// │ ├─► startSingleRangeFeed
// │ │ Allocate activeMuxRangeFeed, acquire catchup scan quota
// │┌──►├─► activeMuxRangeFeed.start
// ││ │ Determine the first replica that's usable for running range feed
// ││ │ Establish MuxRangeFeed connection with the node
// ││ │ Start single rangefeed on that node
// ││ │ Assign unique "stream ID" which will be echoed back by range feed server
// ││ │
// ││┌─►├─► receiveEventsFromNode
// │││ │ Lookup activeMuxRangeFeed corresponding to the received event
// │││ ◊─► Handle event
// ││└──│── Maybe release catchup scan quota
// ││ │
// ││ ├─► OR Handle error
// ││ └─► restartActiveRangeFeeds
// ││ Determine if the error is fatal, if so terminate
// ││ Transient error can be retried, using retry/transport state
// │└────── stored in this structure (for example: try next replica)
// │ Some errors (range split) need to perform full lookup
// └─────── activeMuxRangeFeed is released, replaced by one or more new instances
type activeMuxRangeFeed struct {
*activeRangeFeed
token rangecache.EvictionToken
roachpb.ReplicaDescriptor
startAfter hlc.Timestamp
catchupRes catchupAlloc

// State pertaining to execution of rangefeed call, along with
// the state needed to manage retries.
retry retry.Retry
token rangecache.EvictionToken
transport Transport
}

func (a *activeMuxRangeFeed) release() {
a.activeRangeFeed.release()
if a.catchupRes != nil {
a.catchupRes.Release()
func (s *activeMuxRangeFeed) release() {
s.activeRangeFeed.release()
if s.catchupRes != nil {
s.catchupRes.Release()
}
if s.transport != nil {
s.transport.Release()
s.transport = nil
}
}

func (s *activeMuxRangeFeed) resetRouting(ctx context.Context, newToken rangecache.EvictionToken) {
if s.token.Valid() {
s.token.Evict(ctx)
}
if s.transport != nil {
s.transport.Release()
s.transport = nil
}
s.token = newToken
}

// the "Send" portion of the kvpb.Internal_MuxRangeFeedClient
type rangeFeedRequestSender interface {
Send(req *kvpb.RangeFeedRequest) error
Expand All @@ -158,15 +206,6 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()

var releaseTransport func()
maybeReleaseTransport := func() {
if releaseTransport != nil {
releaseTransport()
releaseTransport = nil
}
}
defer maybeReleaseTransport()

// Before starting single rangefeed, acquire catchup scan quota.
catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem)
if err != nil {
Expand All @@ -179,24 +218,30 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
startAfter: startAfter,
catchupRes: catchupRes,
token: token,
retry: retry.StartWithCtx(ctx, m.ds.rpcRetryOptions),
}
streamID := atomic.AddInt64(&m.seqID, 1)

// stream ownership gets transferred (indicated by setting stream to nil) when
// we successfully send request. If we fail to do so, cleanup.
defer func() {
if stream != nil {
stream.release()
}
}()
if err := stream.start(ctx, m); err != nil {
stream.release()
return err
}

// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); {
maybeReleaseTransport()
return nil
}

// start begins execution of activeMuxRangeFeed.
// Backoff is applied if same stream restarts.
func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error {
streamID := atomic.AddInt64(&m.seqID, 1)

// Start a retry loop for sending the batch to the range.
for s.retry.Next() {
// If we've cleared the descriptor on failure, re-lookup.
if !token.Valid() {
var err error
if !s.token.Valid() {
rs, err := keys.SpanAddr(s.Span)
if err != nil {
return err
}
ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err)
Expand All @@ -205,50 +250,53 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
}
continue
}
token = ri
s.resetRouting(ctx, ri)
}

// Establish a RangeFeed for a single Range.
log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)",
span, startAfter, token.Desc().RangeID, r.CurrentAttempt())
transport, err := newTransportForRange(ctx, token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err)
continue
if s.transport == nil {
transport, err := newTransportForRange(ctx, s.token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 1, "Failed to create transport for %s (err=%s) ", s.token.String(), err)
continue
}
s.transport = transport
}
releaseTransport = transport.Release

for !transport.IsExhausted() {
args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff)
args.Replica = transport.NextReplica()
for !s.transport.IsExhausted() {
args := makeRangeFeedRequest(
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff)
args.Replica = s.transport.NextReplica()
args.StreamID = streamID

rpcClient, err := transport.NextInternalClient(ctx)
s.ReplicaDescriptor = args.Replica
rpcClient, err := s.transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err)
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
continue
}

log.VEventf(ctx, 1,
"MuxRangeFeed starting for span %s@%s (rangeID %d, replica %s, attempt %d)",
s.Span, s.startAfter, s.token.Desc().RangeID, args.Replica, s.retry.CurrentAttempt())

conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID)
if err != nil {
return err
if err == nil {
err = conn.startRangeFeed(streamID, s, &args)
}

if err := conn.startRangeFeed(streamID, stream, &args); err != nil {
log.VErrEventf(ctx, 0,
"RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err)
if err != nil {
log.VErrEventf(ctx, 1,
"RPC error establishing mux rangefeed to r%d, replica %s: %s", args.RangeID, args.Replica, err)
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return err
}
continue
}
// We successfully established rangefeed, so the responsibility
// for releasing the stream is transferred to the mux go routine.
stream = nil
return nil
}

// If the transport is exhausted, we evict routing token and retry range
// resolution.
token.Evict(ctx)
token = rangecache.EvictionToken{}
s.resetRouting(ctx, rangecache.EvictionToken{}) // Transport exhausted; reset and retry.
}

return ctx.Err()
Expand Down Expand Up @@ -293,7 +341,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer restore()

if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID)
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -339,7 +387,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
}

toRestart := ms.close()
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart))
}
return m.restartActiveRangeFeeds(ctx, recvErr, toRestart)
Expand Down Expand Up @@ -383,7 +431,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// which one executes is a coin flip) and so it is possible that we may see
// additional event(s) arriving for a stream that is no longer active.
if active == nil {
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event)
}
continue
Expand Down Expand Up @@ -467,22 +515,28 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
) error {
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(0) {
log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason)
if log.V(1) {
log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor,
timeutil.Since(active.Resolved.GoTime()), reason)
}
active.setLastError(reason)
active.release()

doRelease := true
defer func() {
if doRelease {
active.release()
}
}()

errInfo, err := handleRangefeedError(ctx, reason)
if err != nil {
// If this is an error we cannot recover from, terminate the rangefeed.
return err
}

if errInfo.evict && active.token.Valid() {
active.token.Evict(ctx)
active.token = rangecache.EvictionToken{}
if errInfo.evict {
active.resetRouting(ctx, rangecache.EvictionToken{})
}

rs, err := keys.SpanAddr(active.Span)
Expand All @@ -493,7 +547,12 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
if errInfo.resolveSpan {
return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed)
}
return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token)

if err := active.start(ctx, m); err != nil {
return err
}
doRelease = false // active stream ownership transferred to start above.
return nil
}

// startRangeFeed initiates rangefeed for the specified request running
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3267,7 +3267,6 @@ HAVING

func TestDecommission(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 96630, "flaky test")
defer log.Scope(t).Close(t)

// Five nodes is too much to reliably run under testrace with our aggressive
Expand Down

0 comments on commit 0c0053f

Please sign in to comment.