From c27731fd7b721975a55fb7bfe1c86a7d2d9d0693 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sun, 23 Apr 2023 10:41:54 -0400 Subject: [PATCH] kvcoord: Fix error handling and retries in mux rangefeed client Fix error handling and retries when restarting rangefeeds. A big difference between regular rangefeed, and mux rangefeed, is regular rangefeed has a dedicated go routine per range. This go routine is responsible for running a rangefeed, handling its errors, management of state pertaining to the RPC call (transport, retry information, routing information, etc), and restarting rangefeed with backoff as needed. Mux rangefeed, on the other hand, is not "loop" based. Prior to this PR, mux rangefeed, when it encountered a transient error, would loose a lot of the restart state mentioned above. For example, it would loose the transport information, so that the restart would run against the same node as before, resulting, potentially, in busy loops. Those busy loops (where the RPC call is restarted against the same node/replica that just experienced an error), would tend to make test flaky since they would take longer time to converge to the state expected by the tests (such as `TestDecommission`) test. This PR fixes this loss of state pertaining to single range restart by associating this state with the long lived `activeMuxRangeFeed` state. Fixes #96630 Fixes #100783 Informs #99631 Informs #101614 Release note: None --- .../kvcoord/dist_sender_mux_rangefeed.go | 209 ++++++++++++------ pkg/kv/kvserver/client_raft_test.go | 1 - 2 files changed, 136 insertions(+), 74 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 3ddc2e98d072..f7ddf27b3b0a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -18,13 +18,13 @@ import ( "time" "unsafe" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -72,12 +72,11 @@ func muxRangeFeed( catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, ) (retErr error) { - // TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved. - if log.V(0) { + if log.V(1) { log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans)) start := timeutil.Now() defer func() { - log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr) + log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr) }() } @@ -126,20 +125,73 @@ type muxStreamOrError struct { } // activeMuxRangeFeed augments activeRangeFeed with additional state. +// This is a long-lived data structure representing the lifetime of a single +// range feed. +// The lifetime of a single range feed is as follows: +// +// ┌─* muxRangeFeed +// ┌───►├─► divideSpanOnRangeBoundaries +// │ │ Divide target span(s) on range boundaries +// │ ├─► startSingleRangeFeed +// │ │ Allocate activeMuxRangeFeed, acquire catchup scan quota +// │┌──►├─► activeMuxRangeFeed.start +// ││ │ Determine the first replica that's usable for running range feed +// ││ │ Establish MuxRangeFeed connection with the node +// ││ │ Start single rangefeed on that node +// ││ │ Assign unique "stream ID" which will be echoed back by range feed server +// ││ │ +// ││┌─►├─► receiveEventsFromNode +// │││ │ Lookup activeMuxRangeFeed corresponding to the received event +// │││ ◊─► Handle event +// ││└──│── Maybe release catchup scan quota +// ││ │ +// ││ ├─► OR Handle error +// ││ └─► restartActiveRangeFeeds +// ││ Determine if the error is fatal, if so terminate +// ││ Transient error can be retried, using retry/transport state +// │└────── stored in this structure (for example: try next replica) +// │ Some errors (range split) need to perform full lookup +// └─────── activeMuxRangeFeed is released, replaced by one or more new instances type activeMuxRangeFeed struct { *activeRangeFeed - token rangecache.EvictionToken + rSpan roachpb.RSpan + roachpb.ReplicaDescriptor startAfter hlc.Timestamp + + // cathchupRes is the catchup scan quota acquired upon the + // start of rangefeed. + // It is released when this stream receives first non-empty checkpoint + // (meaning: catchup scan completes). catchupRes catchupAlloc + + // State pertaining to execution of rangefeed call, along with + // the state needed to manage retries. + token rangecache.EvictionToken + transport Transport } -func (a *activeMuxRangeFeed) release() { - a.activeRangeFeed.release() - if a.catchupRes != nil { - a.catchupRes.Release() +func (s *activeMuxRangeFeed) release() { + s.activeRangeFeed.release() + if s.catchupRes != nil { + s.catchupRes.Release() + } + if s.transport != nil { + s.transport.Release() + s.transport = nil } } +func (s *activeMuxRangeFeed) resetRouting(ctx context.Context, newToken rangecache.EvictionToken) { + if s.token.Valid() { + s.token.Evict(ctx) + } + if s.transport != nil { + s.transport.Release() + s.transport = nil + } + s.token = newToken +} + // the "Send" portion of the kvpb.Internal_MuxRangeFeedClient type rangeFeedRequestSender interface { Send(req *kvpb.RangeFeedRequest) error @@ -158,15 +210,6 @@ func (m *rangefeedMuxer) startSingleRangeFeed( // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() - var releaseTransport func() - maybeReleaseTransport := func() { - if releaseTransport != nil { - releaseTransport() - releaseTransport = nil - } - } - defer maybeReleaseTransport() - // Before starting single rangefeed, acquire catchup scan quota. catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem) if err != nil { @@ -176,28 +219,39 @@ func (m *rangefeedMuxer) startSingleRangeFeed( // Register active mux range feed. stream := &activeMuxRangeFeed{ activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges), + rSpan: rs, startAfter: startAfter, catchupRes: catchupRes, token: token, } - streamID := atomic.AddInt64(&m.seqID, 1) - // stream ownership gets transferred (indicated by setting stream to nil) when - // we successfully send request. If we fail to do so, cleanup. - defer func() { - if stream != nil { - stream.release() - } - }() + if err := stream.start(ctx, m); err != nil { + stream.release() + return err + } + + return nil +} + +// start begins execution of activeMuxRangeFeed. +// This method uses the routine and transport information associated with this active stream, +// to find the node that hosts range replica, establish MuxRangeFeed RPC stream +// with the node, and then establish rangefeed for the span with that node. +// If the routine/transport information are not valid, performs lookup to refresh this +// information. +// Transient errors while establishing RPCs are retried with backoff. +// Certain non-recoverable errors (such as grpcutil.IsAuthError(err)) are propagated to the +// caller and will cause the whole rangefeed to terminate. +// Upon successfully establishing RPC stream, the ownership of the activeMuxRangeFeed +// gets transferred to the node event loop go routine (receiveEventsFromNode). +func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error { + streamID := atomic.AddInt64(&m.seqID, 1) // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); { - maybeReleaseTransport() - // If we've cleared the descriptor on failure, re-lookup. - if !token.Valid() { - var err error - ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false) + if !s.token.Valid() { + ri, err := m.ds.getRoutingInfo(ctx, s.rSpan.Key, rangecache.EvictionToken{}, false) if err != nil { log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err) if !rangecache.IsRangeLookupErrorRetryable(err) { @@ -205,50 +259,53 @@ func (m *rangefeedMuxer) startSingleRangeFeed( } continue } - token = ri + s.resetRouting(ctx, ri) } // Establish a RangeFeed for a single Range. - log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)", - span, startAfter, token.Desc().RangeID, r.CurrentAttempt()) - transport, err := newTransportForRange(ctx, token.Desc(), m.ds) - if err != nil { - log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err) - continue + if s.transport == nil { + transport, err := newTransportForRange(ctx, s.token.Desc(), m.ds) + if err != nil { + log.VErrEventf(ctx, 1, "Failed to create transport for %s (err=%s) ", s.token.String(), err) + continue + } + s.transport = transport } - releaseTransport = transport.Release - for !transport.IsExhausted() { - args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff) - args.Replica = transport.NextReplica() + for !s.transport.IsExhausted() { + args := makeRangeFeedRequest( + s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff) + args.Replica = s.transport.NextReplica() args.StreamID = streamID - - rpcClient, err := transport.NextInternalClient(ctx) + s.ReplicaDescriptor = args.Replica + rpcClient, err := s.transport.NextInternalClient(ctx) if err != nil { - log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err) + log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err) continue } + log.VEventf(ctx, 1, + "MuxRangeFeed starting for span %s@%s (rangeID %d, replica %s, attempt %d)", + s.Span, s.startAfter, s.token.Desc().RangeID, args.Replica, r.CurrentAttempt()) + conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID) - if err != nil { - return err + if err == nil { + err = conn.startRangeFeed(streamID, s, &args) } - if err := conn.startRangeFeed(streamID, stream, &args); err != nil { - log.VErrEventf(ctx, 0, - "RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err) + if err != nil { + log.VErrEventf(ctx, 1, + "RPC error establishing mux rangefeed to r%d, replica %s: %s", args.RangeID, args.Replica, err) + if grpcutil.IsAuthError(err) { + // Authentication or authorization error. Propagate. + return err + } continue } - // We successfully established rangefeed, so the responsibility - // for releasing the stream is transferred to the mux go routine. - stream = nil return nil } - // If the transport is exhausted, we evict routing token and retry range - // resolution. - token.Evict(ctx) - token = rangecache.EvictionToken{} + s.resetRouting(ctx, rangecache.EvictionToken{}) // Transport exhausted; reset and retry. } return ctx.Err() @@ -293,7 +350,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx) defer restore() - if log.V(0) { + if log.V(1) { log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID) start := timeutil.Now() defer func() { @@ -339,7 +396,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( } toRestart := ms.close() - if log.V(0) { + if log.V(1) { log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart)) } return m.restartActiveRangeFeeds(ctx, recvErr, toRestart) @@ -383,7 +440,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( // which one executes is a coin flip) and so it is possible that we may see // additional event(s) arriving for a stream that is no longer active. if active == nil { - if log.V(0) { + if log.V(1) { log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event) } continue @@ -467,12 +524,19 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( ) error { m.ds.metrics.RangefeedRestartRanges.Inc(1) - if log.V(0) { - log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", - active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason) + if log.V(1) { + log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v", + active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor, + timeutil.Since(active.Resolved.GoTime()), reason) } active.setLastError(reason) - active.release() + + doRelease := true + defer func() { + if doRelease { + active.release() + } + }() errInfo, err := handleRangefeedError(ctx, reason) if err != nil { @@ -480,20 +544,19 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( return err } - if errInfo.evict && active.token.Valid() { - active.token.Evict(ctx) - active.token = rangecache.EvictionToken{} + if errInfo.evict { + active.resetRouting(ctx, rangecache.EvictionToken{}) } - rs, err := keys.SpanAddr(active.Span) - if err != nil { - return err + if errInfo.resolveSpan { + return divideSpanOnRangeBoundaries(ctx, m.ds, active.rSpan, active.startAfter, m.startSingleRangeFeed) } - if errInfo.resolveSpan { - return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed) + if err := active.start(ctx, m); err != nil { + return err } - return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token) + doRelease = false // active stream ownership transferred to start above. + return nil } // startRangeFeed initiates rangefeed for the specified request running diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 99d0438a1edb..a73a287f3cce 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3267,7 +3267,6 @@ HAVING func TestDecommission(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 96630, "flaky test") defer log.Scope(t).Close(t) // Five nodes is too much to reliably run under testrace with our aggressive