Skip to content

Commit

Permalink
Merge pull request #102319 from miretskiy/backport23.1.0-98980-102094
Browse files Browse the repository at this point in the history
release-23.1.0: kvcoord: Fix error handling and retries in mux rangefeed client
  • Loading branch information
miretskiy authored Apr 26, 2023
2 parents f52cc70 + 9bd89b6 commit d2fbccc
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 89 deletions.
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ This counts the number of ranges with an active rangefeed that are performing ca
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartRanges = metric.Metadata{
Name: "distsender.rangefeed.restart_ranges",
Help: `Number of ranges that were restarted due to transient errors`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartStuck = metric.Metadata{
Name: "distsender.rangefeed.restart_stuck",
Help: `Number of times a rangefeed was restarted due to not receiving ` +
Expand Down Expand Up @@ -240,6 +246,7 @@ type DistSenderMetrics struct {
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
Expand All @@ -261,6 +268,7 @@ func makeDistSenderMetrics() DistSenderMetrics {
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
for i := range m.MethodCounts {
Expand Down
216 changes: 140 additions & 76 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -72,12 +72,11 @@ func muxRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
// TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved.
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans))
start := timeutil.Now()
defer func() {
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
}()
}

Expand Down Expand Up @@ -126,20 +125,72 @@ type muxStreamOrError struct {
}

// activeMuxRangeFeed augments activeRangeFeed with additional state.
// This is a long-lived data structure representing the lifetime of a single
// range feed.
// The lifetime of a single range feed is as follows:
//
// ┌─* muxRangeFeed
// ┌───►├─► divideSpanOnRangeBoundaries
// │ │ Divide target span(s) on range boundaries
// │ ├─► startSingleRangeFeed
// │ │ Allocate activeMuxRangeFeed, acquire catchup scan quota
// │┌──►├─► activeMuxRangeFeed.start
// ││ │ Determine the first replica that's usable for running range feed
// ││ │ Establish MuxRangeFeed connection with the node
// ││ │ Start single rangefeed on that node
// ││ │ Assign unique "stream ID" which will be echoed back by range feed server
// ││ │
// ││┌─►├─► receiveEventsFromNode
// │││ │ Lookup activeMuxRangeFeed corresponding to the received event
// │││ ◊─► Handle event
// ││└──│── Maybe release catchup scan quota
// ││ │
// ││ ├─► OR Handle error
// ││ └─► restartActiveRangeFeeds
// ││ Determine if the error is fatal, if so terminate
// ││ Transient error can be retried, using retry/transport state
// │└────── stored in this structure (for example: try next replica)
// │ Some errors (range split) need to perform full lookup
// └─────── activeMuxRangeFeed is released, replaced by one or more new instances
type activeMuxRangeFeed struct {
*activeRangeFeed
token rangecache.EvictionToken
rSpan roachpb.RSpan
roachpb.ReplicaDescriptor
startAfter hlc.Timestamp

// cathchupRes is the catchup scan quota acquired upon the
// start of rangefeed.
// It is released when this stream receives first non-empty checkpoint
// (meaning: catchup scan completes).
catchupRes catchupAlloc

// State pertaining to execution of rangefeed call.
token rangecache.EvictionToken
transport Transport
}

func (a *activeMuxRangeFeed) release() {
a.activeRangeFeed.release()
if a.catchupRes != nil {
a.catchupRes.Release()
func (s *activeMuxRangeFeed) release() {
s.activeRangeFeed.release()
if s.catchupRes != nil {
s.catchupRes.Release()
}
if s.transport != nil {
s.transport.Release()
s.transport = nil
}
}

func (s *activeMuxRangeFeed) resetRouting(ctx context.Context, newToken rangecache.EvictionToken) {
if s.token.Valid() {
s.token.Evict(ctx)
}
if s.transport != nil {
s.transport.Release()
s.transport = nil
}
s.token = newToken
}

// the "Send" portion of the kvpb.Internal_MuxRangeFeedClient
type rangeFeedRequestSender interface {
Send(req *kvpb.RangeFeedRequest) error
Expand All @@ -158,15 +209,6 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()

var releaseTransport func()
maybeReleaseTransport := func() {
if releaseTransport != nil {
releaseTransport()
releaseTransport = nil
}
}
defer maybeReleaseTransport()

// Before starting single rangefeed, acquire catchup scan quota.
catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem)
if err != nil {
Expand All @@ -176,79 +218,93 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
// Register active mux range feed.
stream := &activeMuxRangeFeed{
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges),
rSpan: rs,
startAfter: startAfter,
catchupRes: catchupRes,
token: token,
}
streamID := atomic.AddInt64(&m.seqID, 1)

// stream ownership gets transferred (indicated by setting stream to nil) when
// we successfully send request. If we fail to do so, cleanup.
defer func() {
if stream != nil {
stream.release()
}
}()
if err := stream.start(ctx, m); err != nil {
stream.release()
return err
}

return nil
}

// start begins execution of activeMuxRangeFeed.
// This method uses the routine and transport information associated with this active stream,
// to find the node that hosts range replica, establish MuxRangeFeed RPC stream
// with the node, and then establish rangefeed for the span with that node.
// If the routine/transport information are not valid, performs lookup to refresh this
// information.
// Transient errors while establishing RPCs are retried with backoff.
// Certain non-recoverable errors (such as grpcutil.IsAuthError) are propagated to the
// caller and will cause the whole rangefeed to terminate.
// Upon successfully establishing RPC stream, the ownership of the activeMuxRangeFeed
// gets transferred to the node event loop go routine (receiveEventsFromNode).
func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error {
streamID := atomic.AddInt64(&m.seqID, 1)

// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); {
maybeReleaseTransport()

// If we've cleared the descriptor on failure, re-lookup.
if !token.Valid() {
var err error
ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false)
if !s.token.Valid() {
ri, err := m.ds.getRoutingInfo(ctx, s.rSpan.Key, rangecache.EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err)
if !rangecache.IsRangeLookupErrorRetryable(err) {
return err
}
continue
}
token = ri
s.resetRouting(ctx, ri)
}

// Establish a RangeFeed for a single Range.
log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)",
span, startAfter, token.Desc().RangeID, r.CurrentAttempt())
transport, err := newTransportForRange(ctx, token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err)
continue
if s.transport == nil {
transport, err := newTransportForRange(ctx, s.token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 1, "Failed to create transport for %s (err=%s) ", s.token.String(), err)
continue
}
s.transport = transport
}
releaseTransport = transport.Release

for !transport.IsExhausted() {
args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff)
args.Replica = transport.NextReplica()
for !s.transport.IsExhausted() {
args := makeRangeFeedRequest(
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff)
args.Replica = s.transport.NextReplica()
args.StreamID = streamID

rpcClient, err := transport.NextInternalClient(ctx)
s.ReplicaDescriptor = args.Replica
rpcClient, err := s.transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err)
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
continue
}

log.VEventf(ctx, 1,
"MuxRangeFeed starting for span %s@%s (rangeID %d, replica %s, attempt %d)",
s.Span, s.startAfter, s.token.Desc().RangeID, args.Replica, r.CurrentAttempt())

conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID)
if err != nil {
return err
if err == nil {
err = conn.startRangeFeed(streamID, s, &args)
}

if err := conn.startRangeFeed(streamID, stream, &args); err != nil {
log.VErrEventf(ctx, 0,
"RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err)
if err != nil {
log.VErrEventf(ctx, 1,
"RPC error establishing mux rangefeed to r%d, replica %s: %s", args.RangeID, args.Replica, err)
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return err
}
continue
}
// We successfully established rangefeed, so the responsibility
// for releasing the stream is transferred to the mux go routine.
stream = nil
return nil
}

// If the transport is exhausted, we evict routing token and retry range
// resolution.
token.Evict(ctx)
token = rangecache.EvictionToken{}
s.resetRouting(ctx, rangecache.EvictionToken{}) // Transport exhausted; reset and retry.
}

return ctx.Err()
Expand All @@ -261,7 +317,7 @@ func (m *rangefeedMuxer) establishMuxConnection(
ptr, exists := m.muxClients.LoadOrStore(int64(nodeID), unsafe.Pointer(future.Make[muxStreamOrError]()))
muxClient := (*future.Future[muxStreamOrError])(ptr)
if !exists {
// Start mux rangefeed go routine responsible for receiving MuxRangeFeedEvents.
// Start mux rangefeed goroutine responsible for receiving MuxRangeFeedEvents.
m.g.GoCtx(func(ctx context.Context) error {
return m.startNodeMuxRangeFeed(ctx, client, nodeID, muxClient)
})
Expand All @@ -287,13 +343,13 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
) (retErr error) {
ctx = logtags.AddTag(ctx, "mux_n", nodeID)
// Add "generation" number to the context so that log messages and stacks can
// differentiate between multiple instances of mux rangefeed Go routine
// differentiate between multiple instances of mux rangefeed goroutine
// (this can happen when one was shutdown, then re-established).
ctx = logtags.AddTag(ctx, "gen", atomic.AddInt64(&m.seqID, 1))
ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer restore()

if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID)
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -323,7 +379,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
if recvErr := m.receiveEventsFromNode(ctx, mux, stuckWatcher, &ms); recvErr != nil {
// Clear out this client, and restart all streams on this node.
// Note: there is a race here where we may delete this muxClient, while
// another go routine loaded it. That's fine, since we would not
// another goroutine loaded it. That's fine, since we would not
// be able to send new request on this stream anymore, and we'll retry
// against another node.
m.muxClients.Delete(int64(nodeID))
Expand All @@ -339,7 +395,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
}

toRestart := ms.close()
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart))
}
return m.restartActiveRangeFeeds(ctx, recvErr, toRestart)
Expand Down Expand Up @@ -383,7 +439,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// which one executes is a coin flip) and so it is possible that we may see
// additional event(s) arriving for a stream that is no longer active.
if active == nil {
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event)
}
continue
Expand Down Expand Up @@ -465,33 +521,41 @@ func (m *rangefeedMuxer) restartActiveRangeFeeds(
func (m *rangefeedMuxer) restartActiveRangeFeed(
ctx context.Context, active *activeMuxRangeFeed, reason error,
) error {
if log.V(0) {
log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason)
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(1) {
log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor,
timeutil.Since(active.Resolved.GoTime()), reason)
}
active.setLastError(reason)
active.release()

doRelease := true
defer func() {
if doRelease {
active.release()
}
}()

errInfo, err := handleRangefeedError(ctx, reason)
if err != nil {
// If this is an error we cannot recover from, terminate the rangefeed.
return err
}

if errInfo.evict && active.token.Valid() {
active.token.Evict(ctx)
active.token = rangecache.EvictionToken{}
if errInfo.evict {
active.resetRouting(ctx, rangecache.EvictionToken{})
}

rs, err := keys.SpanAddr(active.Span)
if err != nil {
return err
if errInfo.resolveSpan {
return divideSpanOnRangeBoundaries(ctx, m.ds, active.rSpan, active.startAfter, m.startSingleRangeFeed)
}

if errInfo.resolveSpan {
return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed)
if err := active.start(ctx, m); err != nil {
return err
}
return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token)
doRelease = false // active stream ownership transferred to start above.
return nil
}

// startRangeFeed initiates rangefeed for the specified request running
Expand Down
Loading

0 comments on commit d2fbccc

Please sign in to comment.