Skip to content

Commit

Permalink
kvcoord: Fix error handling and retries in mux rangefeed client
Browse files Browse the repository at this point in the history
Fix error handling and retries when restarting rangefeeds.
A big difference between regular rangefeed, and mux rangefeed,
is regular rangefeed has a dedicated go routine per range.
This go routine is responsible for running a rangefeed, handling
its errors, management of state pertaining to the RPC call
(transport, retry information, routing information, etc), and
restarting rangefeed with backoff as needed.

Mux rangefeed, on the other hand, is not "loop" based.  Prior
to this PR, mux rangefeed, when it encountered a transient error,
would loose a lot of the restart state mentioned above.  For example,
it would loose the transport information, so that the restart would
run against the same node as before, resulting, potentially, in
busy loops.  Those busy loops (where the RPC call is restarted
against the same node/replica that just experienced an error), would
tend to make test flaky since they would take longer time to converge
to the state expected by the tests (such as `TestDecommission`) test.

This PR fixes this loss of state pertaining to single range restart
by associating this state with the long lived `activeMuxRangeFeed` state.

Fixes cockroachdb#96630
Fixes cockroachdb#100783
Informs cockroachdb#99631
Informs cockroachdb#101614

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 24, 2023
1 parent 1f3419e commit 6a7da03
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 48 deletions.
147 changes: 100 additions & 47 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -72,12 +73,11 @@ func muxRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
// TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved.
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans))
start := timeutil.Now()
defer func() {
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr)
}()
}

Expand Down Expand Up @@ -126,18 +126,50 @@ type muxStreamOrError struct {
}

// activeMuxRangeFeed augments activeRangeFeed with additional state.
// This is a long-lived data structure representing the lifetime of a single
// range feed.
// The lifetime of a single range feed is as follows:
// |--->| divideSpanOnRangeBoundaries
// | | Divides target span(s) on range boundaries
// | |-> startSingleRangeFeed
// | | Allocates activeMuxRangeFeed, acquires catchup scan quota
// | |->|-> startActiveRangeFeed
// | | | Determine the first replica that's usable for running range feed.
// | | | Establish mux rangefeed connection with the node
// | | | Start single range feed on that node
// | | | Assign unique "stream ID" which will be echoed back by rangefeed server.
// | | |-> receiveEnvetsFromNode
// | | | Lookup activeMuxRangeFeed corresponding to the received event.
// | | |-- Handle event
// | | | OR Handle error
// | | |-> restartActiveRangeFeeds
// | | Determine if the error is fatal, if so terminate.
// | | Transient error can be retried, using retry/transport state
// | |------stored in this structure (for example: try next replica)
// | |------Some errors (range split) need to perform full lookup.
// |--------activeMuxRangefeed structure is released, and 1 or more new range feeds started.
type activeMuxRangeFeed struct {
*activeRangeFeed
token rangecache.EvictionToken
roachpb.ReplicaDescriptor
startAfter hlc.Timestamp
catchupRes catchupAlloc

// State pertaining to execution of rangefeed call, along with
// the state needed to manage retries.
retry retry.Retry
token rangecache.EvictionToken
transport Transport
}

func (a *activeMuxRangeFeed) release() {
a.activeRangeFeed.release()
if a.catchupRes != nil {
a.catchupRes.Release()
}
if a.transport != nil {
a.transport.Release()
a.transport = nil
}
}

// the "Send" portion of the kvpb.Internal_MuxRangeFeedClient
Expand All @@ -158,15 +190,6 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()

var releaseTransport func()
maybeReleaseTransport := func() {
if releaseTransport != nil {
releaseTransport()
releaseTransport = nil
}
}
defer maybeReleaseTransport()

// Before starting single rangefeed, acquire catchup scan quota.
catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem)
if err != nil {
Expand All @@ -179,7 +202,14 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
startAfter: startAfter,
catchupRes: catchupRes,
token: token,
retry: retry.StartWithCtx(ctx, m.ds.rpcRetryOptions),
}
return m.startActiveRangeFeed(ctx, stream)
}

func (m *rangefeedMuxer) startActiveRangeFeed(
ctx context.Context, stream *activeMuxRangeFeed,
) error {
streamID := atomic.AddInt64(&m.seqID, 1)

// stream ownership gets transferred (indicated by setting stream to nil) when
Expand All @@ -191,12 +221,13 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
}()

// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); {
maybeReleaseTransport()

for stream.retry.Next() {
// If we've cleared the descriptor on failure, re-lookup.
if !token.Valid() {
var err error
if !stream.token.Valid() {
rs, err := keys.SpanAddr(stream.Span)
if err != nil {
return err
}
ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err)
Expand All @@ -205,38 +236,49 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
}
continue
}
token = ri
stream.token = ri
stream.transport.Release()
stream.transport = nil
}

// Establish a RangeFeed for a single Range.
log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)",
span, startAfter, token.Desc().RangeID, r.CurrentAttempt())
transport, err := newTransportForRange(ctx, token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err)
continue
if stream.transport == nil {
transport, err := newTransportForRange(ctx, stream.token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 1, "Failed to create transport for %s (err=%s) ", stream.token.String(), err)
continue
}
stream.transport = transport
}
releaseTransport = transport.Release

for !transport.IsExhausted() {
args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff)
args.Replica = transport.NextReplica()
for !stream.transport.IsExhausted() {
args := makeRangeFeedRequest(
stream.Span, stream.token.Desc().RangeID, m.cfg.overSystemTable, stream.startAfter, m.cfg.withDiff)
args.Replica = stream.transport.NextReplica()
args.StreamID = streamID

rpcClient, err := transport.NextInternalClient(ctx)
stream.ReplicaDescriptor = args.Replica
rpcClient, err := stream.transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err)
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
continue
}

log.VEventf(ctx, 1,
"MuxRangeFeed starting for span %s@%s (rangeID %d, replica %s, attempt %d)",
stream.Span, stream.startAfter, stream.token.Desc().RangeID, args.Replica, stream.retry.CurrentAttempt())

conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID)
if err != nil {
return err
if err == nil {
err = conn.startRangeFeed(streamID, stream, &args)
}

if err := conn.startRangeFeed(streamID, stream, &args); err != nil {
log.VErrEventf(ctx, 0,
"RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err)
if err != nil {
log.VErrEventf(ctx, 1,
"RPC error establishing mux rangefeed to r%d, replica %s: %s", args.RangeID, args.Replica, err)
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return err
}
continue
}
// We successfully established rangefeed, so the responsibility
Expand All @@ -247,8 +289,10 @@ func (m *rangefeedMuxer) startSingleRangeFeed(

// If the transport is exhausted, we evict routing token and retry range
// resolution.
token.Evict(ctx)
token = rangecache.EvictionToken{}
stream.token.Evict(ctx)
stream.token = rangecache.EvictionToken{}
stream.transport.Release()
stream.transport = nil
}

return ctx.Err()
Expand Down Expand Up @@ -293,7 +337,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer restore()

if log.V(0) {
if log.V(1) {
log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID)
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -339,7 +383,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
}

toRestart := ms.close()
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart))
}
return m.restartActiveRangeFeeds(ctx, recvErr, toRestart)
Expand Down Expand Up @@ -383,7 +427,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// which one executes is a coin flip) and so it is possible that we may see
// additional event(s) arriving for a stream that is no longer active.
if active == nil {
if log.V(0) {
if log.V(1) {
log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event)
}
continue
Expand Down Expand Up @@ -467,12 +511,19 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
) error {
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(0) {
log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason)
if log.V(1) {
log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor,
timeutil.Since(active.Resolved.GoTime()), reason)
}
active.setLastError(reason)
active.release()

doRelease := true
defer func() {
if doRelease {
active.release()
}
}()

errInfo, err := handleRangefeedError(ctx, reason)
if err != nil {
Expand All @@ -493,7 +544,9 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
if errInfo.resolveSpan {
return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed)
}
return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token)

doRelease = false // active stream ownership transferred to startActiveRangeFeed.
return m.startActiveRangeFeed(ctx, active)
}

// startRangeFeed initiates rangefeed for the specified request running
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3267,7 +3267,6 @@ HAVING

func TestDecommission(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 96630, "flaky test")
defer log.Scope(t).Close(t)

// Five nodes is too much to reliably run under testrace with our aggressive
Expand Down

0 comments on commit 6a7da03

Please sign in to comment.