Skip to content

Commit

Permalink
kvcoord: Fix error handling and retries in mux rangefeed client
Browse files Browse the repository at this point in the history
Fix error handling and retries when restarting rangefeeds.
A big difference between regular rangefeed, and mux rangefeed,
is regular rangefeed has a dedicated go routine per range.
This go routine is responsible for running a rangefeed, handling
its errors, management of state pertaining to the RPC call
(transport, retry information, routing information, etc), and
restarting rangefeed with backoff as needed.

Mux rangefeed, on the other hand, is not "loop" based.  Prior
to this PR, mux rangefeed, when it encountered a transient error,
would loose a lot of the restart state mentioned above.  For example,
it would loose the transport information, so that the restart would
run against the same node as before, resulting, potentially, in
busy loops.  Those busy loops (where the RPC call is restarted
against the same node/replica that just experienced an error), would
tend to make test flaky since they would take longer time to converge
to the state expected by the tests (such as `TestDecommission`) test.

This PR fixes this loss of state pertaining to single range restart
by associating this state with the long lived `activeMuxRangeFeed` state.

Fixes #96630
Fixes #100783
Informs #99631
Informs #101614

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 25, 2023
1 parent 1927969 commit c0a5c30
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 89 deletions.
214 changes: 138 additions & 76 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -72,12 +72,11 @@ func muxRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
// TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved.
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans))
start := timeutil.Now()
defer func() {
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
}()
}

Expand Down Expand Up @@ -126,20 +125,72 @@ type muxStreamOrError struct {
}

// activeMuxRangeFeed augments activeRangeFeed with additional state.
// This is a long-lived data structure representing the lifetime of a single
// range feed.
// The lifetime of a single range feed is as follows:
//
// ┌─* muxRangeFeed
// ┌───►├─► divideSpanOnRangeBoundaries
// │ │ Divide target span(s) on range boundaries
// │ ├─► startSingleRangeFeed
// │ │ Allocate activeMuxRangeFeed, acquire catchup scan quota
// │┌──►├─► activeMuxRangeFeed.start
// ││ │ Determine the first replica that's usable for running range feed
// ││ │ Establish MuxRangeFeed connection with the node
// ││ │ Start single rangefeed on that node
// ││ │ Assign unique "stream ID" which will be echoed back by range feed server
// ││ │
// ││┌─►├─► receiveEventsFromNode
// │││ │ Lookup activeMuxRangeFeed corresponding to the received event
// │││ ◊─► Handle event
// ││└──│── Maybe release catchup scan quota
// ││ │
// ││ ├─► OR Handle error
// ││ └─► restartActiveRangeFeeds
// ││ Determine if the error is fatal, if so terminate
// ││ Transient error can be retried, using retry/transport state
// │└────── stored in this structure (for example: try next replica)
// │ Some errors (range split) need to perform full lookup
// └─────── activeMuxRangeFeed is released, replaced by one or more new instances
type activeMuxRangeFeed struct {
*activeRangeFeed
token rangecache.EvictionToken
rSpan roachpb.RSpan
roachpb.ReplicaDescriptor
startAfter hlc.Timestamp

// cathchupRes is the catchup scan quota acquired upon the
// start of rangefeed.
// It is released when this stream receives first non-empty checkpoint
// (meaning: catchup scan completes).
catchupRes catchupAlloc

// State pertaining to execution of rangefeed call.
token rangecache.EvictionToken
transport Transport
}

func (a *activeMuxRangeFeed) release() {
a.activeRangeFeed.release()
if a.catchupRes != nil {
a.catchupRes.Release()
func (s *activeMuxRangeFeed) release() {
s.activeRangeFeed.release()
if s.catchupRes != nil {
s.catchupRes.Release()
}
if s.transport != nil {
s.transport.Release()
s.transport = nil
}
}

func (s *activeMuxRangeFeed) resetRouting(ctx context.Context, newToken rangecache.EvictionToken) {
if s.token.Valid() {
s.token.Evict(ctx)
}
if s.transport != nil {
s.transport.Release()
s.transport = nil
}
s.token = newToken
}

// the "Send" portion of the kvpb.Internal_MuxRangeFeedClient
type rangeFeedRequestSender interface {
Send(req *kvpb.RangeFeedRequest) error
Expand All @@ -158,15 +209,6 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()

var releaseTransport func()
maybeReleaseTransport := func() {
if releaseTransport != nil {
releaseTransport()
releaseTransport = nil
}
}
defer maybeReleaseTransport()

// Before starting single rangefeed, acquire catchup scan quota.
catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem)
if err != nil {
Expand All @@ -176,79 +218,93 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
// Register active mux range feed.
stream := &activeMuxRangeFeed{
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges),
rSpan: rs,
startAfter: startAfter,
catchupRes: catchupRes,
token: token,
}
streamID := atomic.AddInt64(&m.seqID, 1)

// stream ownership gets transferred (indicated by setting stream to nil) when
// we successfully send request. If we fail to do so, cleanup.
defer func() {
if stream != nil {
stream.release()
}
}()
if err := stream.start(ctx, m); err != nil {
stream.release()
return err
}

return nil
}

// start begins execution of activeMuxRangeFeed.
// This method uses the routine and transport information associated with this active stream,
// to find the node that hosts range replica, establish MuxRangeFeed RPC stream
// with the node, and then establish rangefeed for the span with that node.
// If the routine/transport information are not valid, performs lookup to refresh this
// information.
// Transient errors while establishing RPCs are retried with backoff.
// Certain non-recoverable errors (such as grpcutil.IsAuthError) are propagated to the
// caller and will cause the whole rangefeed to terminate.
// Upon successfully establishing RPC stream, the ownership of the activeMuxRangeFeed
// gets transferred to the node event loop go routine (receiveEventsFromNode).
func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error {
streamID := atomic.AddInt64(&m.seqID, 1)

// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); {
maybeReleaseTransport()

// If we've cleared the descriptor on failure, re-lookup.
if !token.Valid() {
var err error
ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false)
if !s.token.Valid() {
ri, err := m.ds.getRoutingInfo(ctx, s.rSpan.Key, rangecache.EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err)
if !rangecache.IsRangeLookupErrorRetryable(err) {
return err
}
continue
}
token = ri
s.resetRouting(ctx, ri)
}

// Establish a RangeFeed for a single Range.
log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)",
span, startAfter, token.Desc().RangeID, r.CurrentAttempt())
transport, err := newTransportForRange(ctx, token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err)
continue
if s.transport == nil {
transport, err := newTransportForRange(ctx, s.token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 1, "Failed to create transport for %s (err=%s) ", s.token.String(), err)
continue
}
s.transport = transport
}
releaseTransport = transport.Release

for !transport.IsExhausted() {
args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff)
args.Replica = transport.NextReplica()
for !s.transport.IsExhausted() {
args := makeRangeFeedRequest(
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff)
args.Replica = s.transport.NextReplica()
args.StreamID = streamID

rpcClient, err := transport.NextInternalClient(ctx)
s.ReplicaDescriptor = args.Replica
rpcClient, err := s.transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err)
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
continue
}

log.VEventf(ctx, 1,
"MuxRangeFeed starting for span %s@%s (rangeID %d, replica %s, attempt %d)",
s.Span, s.startAfter, s.token.Desc().RangeID, args.Replica, r.CurrentAttempt())

conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID)
if err != nil {
return err
if err == nil {
err = conn.startRangeFeed(streamID, s, &args)
}

if err := conn.startRangeFeed(streamID, stream, &args); err != nil {
log.VErrEventf(ctx, 0,
"RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err)
if err != nil {
log.VErrEventf(ctx, 1,
"RPC error establishing mux rangefeed to r%d, replica %s: %s", args.RangeID, args.Replica, err)
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return err
}
continue
}
// We successfully established rangefeed, so the responsibility
// for releasing the stream is transferred to the mux go routine.
stream = nil
return nil
}

// If the transport is exhausted, we evict routing token and retry range
// resolution.
token.Evict(ctx)
token = rangecache.EvictionToken{}
s.resetRouting(ctx, rangecache.EvictionToken{}) // Transport exhausted; reset and retry.
}

return ctx.Err()
Expand All @@ -261,7 +317,7 @@ func (m *rangefeedMuxer) establishMuxConnection(
ptr, exists := m.muxClients.LoadOrStore(int64(nodeID), unsafe.Pointer(future.Make[muxStreamOrError]()))
muxClient := (*future.Future[muxStreamOrError])(ptr)
if !exists {
// Start mux rangefeed go routine responsible for receiving MuxRangeFeedEvents.
// Start mux rangefeed goroutine responsible for receiving MuxRangeFeedEvents.
m.g.GoCtx(func(ctx context.Context) error {
return m.startNodeMuxRangeFeed(ctx, client, nodeID, muxClient)
})
Expand All @@ -287,13 +343,13 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
) (retErr error) {
ctx = logtags.AddTag(ctx, "mux_n", nodeID)
// Add "generation" number to the context so that log messages and stacks can
// differentiate between multiple instances of mux rangefeed Go routine
// differentiate between multiple instances of mux rangefeed goroutine
// (this can happen when one was shutdown, then re-established).
ctx = logtags.AddTag(ctx, "gen", atomic.AddInt64(&m.seqID, 1))
ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer restore()

if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID)
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -323,7 +379,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
if recvErr := m.receiveEventsFromNode(ctx, mux, stuckWatcher, &ms); recvErr != nil {
// Clear out this client, and restart all streams on this node.
// Note: there is a race here where we may delete this muxClient, while
// another go routine loaded it. That's fine, since we would not
// another goroutine loaded it. That's fine, since we would not
// be able to send new request on this stream anymore, and we'll retry
// against another node.
m.muxClients.Delete(int64(nodeID))
Expand All @@ -339,7 +395,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
}

toRestart := ms.close()
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart))
}
return m.restartActiveRangeFeeds(ctx, recvErr, toRestart)
Expand Down Expand Up @@ -383,7 +439,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// which one executes is a coin flip) and so it is possible that we may see
// additional event(s) arriving for a stream that is no longer active.
if active == nil {
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event)
}
continue
Expand Down Expand Up @@ -467,33 +523,39 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
) error {
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(0) {
log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason)
if log.V(1) {
log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor,
timeutil.Since(active.Resolved.GoTime()), reason)
}
active.setLastError(reason)
active.release()

doRelease := true
defer func() {
if doRelease {
active.release()
}
}()

errInfo, err := handleRangefeedError(ctx, reason)
if err != nil {
// If this is an error we cannot recover from, terminate the rangefeed.
return err
}

if errInfo.evict && active.token.Valid() {
active.token.Evict(ctx)
active.token = rangecache.EvictionToken{}
if errInfo.evict {
active.resetRouting(ctx, rangecache.EvictionToken{})
}

rs, err := keys.SpanAddr(active.Span)
if err != nil {
return err
if errInfo.resolveSpan {
return divideSpanOnRangeBoundaries(ctx, m.ds, active.rSpan, active.startAfter, m.startSingleRangeFeed)
}

if errInfo.resolveSpan {
return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed)
if err := active.start(ctx, m); err != nil {
return err
}
return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token)
doRelease = false // active stream ownership transferred to start above.
return nil
}

// startRangeFeed initiates rangefeed for the specified request running
Expand Down
Loading

0 comments on commit c0a5c30

Please sign in to comment.