diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 3ddc2e98d072..a2b7ec182ac2 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -72,12 +73,11 @@ func muxRangeFeed( catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, ) (retErr error) { - // TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved. - if log.V(0) { + if log.V(1) { log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans)) start := timeutil.Now() defer func() { - log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr) + log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr) }() } @@ -126,20 +126,68 @@ type muxStreamOrError struct { } // activeMuxRangeFeed augments activeRangeFeed with additional state. +// This is a long-lived data structure representing the lifetime of a single +// range feed. +// The lifetime of a single range feed is as follows: +// +// ┌─* muxRangeFeed +// ┌───►├─► divideSpanOnRangeBoundaries +// │ │ Divide target span(s) on range boundaries +// │ ├─► startSingleRangeFeed +// │ │ Allocate activeMuxRangeFeed, acquire catchup scan quota +// │┌──►├─► activeMuxRangeFeed.start +// ││ │ Determine the first replica that's usable for running range feed +// ││ │ Establish MuxRangeFeed connection with the node +// ││ │ Start single rangefeed on that node +// ││ │ Assign unique "stream ID" which will be echoed back by range feed server +// ││ │ +// ││┌─►├─► receiveEventsFromNode +// │││ │ Lookup activeMuxRangeFeed corresponding to the received event +// │││ ◊─► Handle event +// ││└──│── Maybe release catchup scan quota +// ││ │ +// ││ ├─► OR Handle error +// ││ └─► restartActiveRangeFeeds +// ││ Determine if the error is fatal, if so terminate +// ││ Transient error can be retried, using retry/transport state +// │└────── stored in this structure (for example: try next replica) +// │ Some errors (range split) need to perform full lookup +// └─────── activeMuxRangeFeed is released, replaced by one or more new instances type activeMuxRangeFeed struct { *activeRangeFeed - token rangecache.EvictionToken + roachpb.ReplicaDescriptor startAfter hlc.Timestamp catchupRes catchupAlloc + + // State pertaining to execution of rangefeed call, along with + // the state needed to manage retries. + retry retry.Retry + token rangecache.EvictionToken + transport Transport } -func (a *activeMuxRangeFeed) release() { - a.activeRangeFeed.release() - if a.catchupRes != nil { - a.catchupRes.Release() +func (s *activeMuxRangeFeed) release() { + s.activeRangeFeed.release() + if s.catchupRes != nil { + s.catchupRes.Release() + } + if s.transport != nil { + s.transport.Release() + s.transport = nil } } +func (s *activeMuxRangeFeed) resetRouting(ctx context.Context, newToken rangecache.EvictionToken) { + if s.token.Valid() { + s.token.Evict(ctx) + } + if s.transport != nil { + s.transport.Release() + s.transport = nil + } + s.token = newToken +} + // the "Send" portion of the kvpb.Internal_MuxRangeFeedClient type rangeFeedRequestSender interface { Send(req *kvpb.RangeFeedRequest) error @@ -158,15 +206,6 @@ func (m *rangefeedMuxer) startSingleRangeFeed( // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() - var releaseTransport func() - maybeReleaseTransport := func() { - if releaseTransport != nil { - releaseTransport() - releaseTransport = nil - } - } - defer maybeReleaseTransport() - // Before starting single rangefeed, acquire catchup scan quota. catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem) if err != nil { @@ -179,24 +218,30 @@ func (m *rangefeedMuxer) startSingleRangeFeed( startAfter: startAfter, catchupRes: catchupRes, token: token, + retry: retry.StartWithCtx(ctx, m.ds.rpcRetryOptions), } - streamID := atomic.AddInt64(&m.seqID, 1) - // stream ownership gets transferred (indicated by setting stream to nil) when - // we successfully send request. If we fail to do so, cleanup. - defer func() { - if stream != nil { - stream.release() - } - }() + if err := stream.start(ctx, m); err != nil { + stream.release() + return err + } - // Start a retry loop for sending the batch to the range. - for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); { - maybeReleaseTransport() + return nil +} +// start begins execution of activeMuxRangeFeed. +// Backoff is applied if same stream restarts. +func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error { + streamID := atomic.AddInt64(&m.seqID, 1) + + // Start a retry loop for sending the batch to the range. + for s.retry.Next() { // If we've cleared the descriptor on failure, re-lookup. - if !token.Valid() { - var err error + if !s.token.Valid() { + rs, err := keys.SpanAddr(s.Span) + if err != nil { + return err + } ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false) if err != nil { log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err) @@ -205,50 +250,53 @@ func (m *rangefeedMuxer) startSingleRangeFeed( } continue } - token = ri + s.resetRouting(ctx, ri) } // Establish a RangeFeed for a single Range. - log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)", - span, startAfter, token.Desc().RangeID, r.CurrentAttempt()) - transport, err := newTransportForRange(ctx, token.Desc(), m.ds) - if err != nil { - log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err) - continue + if s.transport == nil { + transport, err := newTransportForRange(ctx, s.token.Desc(), m.ds) + if err != nil { + log.VErrEventf(ctx, 1, "Failed to create transport for %s (err=%s) ", s.token.String(), err) + continue + } + s.transport = transport } - releaseTransport = transport.Release - for !transport.IsExhausted() { - args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff) - args.Replica = transport.NextReplica() + for !s.transport.IsExhausted() { + args := makeRangeFeedRequest( + s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff) + args.Replica = s.transport.NextReplica() args.StreamID = streamID - - rpcClient, err := transport.NextInternalClient(ctx) + s.ReplicaDescriptor = args.Replica + rpcClient, err := s.transport.NextInternalClient(ctx) if err != nil { - log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err) + log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err) continue } + log.VEventf(ctx, 1, + "MuxRangeFeed starting for span %s@%s (rangeID %d, replica %s, attempt %d)", + s.Span, s.startAfter, s.token.Desc().RangeID, args.Replica, s.retry.CurrentAttempt()) + conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID) - if err != nil { - return err + if err == nil { + err = conn.startRangeFeed(streamID, s, &args) } - if err := conn.startRangeFeed(streamID, stream, &args); err != nil { - log.VErrEventf(ctx, 0, - "RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err) + if err != nil { + log.VErrEventf(ctx, 1, + "RPC error establishing mux rangefeed to r%d, replica %s: %s", args.RangeID, args.Replica, err) + if grpcutil.IsAuthError(err) { + // Authentication or authorization error. Propagate. + return err + } continue } - // We successfully established rangefeed, so the responsibility - // for releasing the stream is transferred to the mux go routine. - stream = nil return nil } - // If the transport is exhausted, we evict routing token and retry range - // resolution. - token.Evict(ctx) - token = rangecache.EvictionToken{} + s.resetRouting(ctx, rangecache.EvictionToken{}) // Transport exhausted; reset and retry. } return ctx.Err() @@ -293,7 +341,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx) defer restore() - if log.V(0) { + if log.V(1) { log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID) start := timeutil.Now() defer func() { @@ -339,7 +387,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( } toRestart := ms.close() - if log.V(0) { + if log.V(1) { log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart)) } return m.restartActiveRangeFeeds(ctx, recvErr, toRestart) @@ -383,7 +431,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( // which one executes is a coin flip) and so it is possible that we may see // additional event(s) arriving for a stream that is no longer active. if active == nil { - if log.V(0) { + if log.V(1) { log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event) } continue @@ -467,12 +515,19 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( ) error { m.ds.metrics.RangefeedRestartRanges.Inc(1) - if log.V(0) { - log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", - active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason) + if log.V(1) { + log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v", + active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor, + timeutil.Since(active.Resolved.GoTime()), reason) } active.setLastError(reason) - active.release() + + doRelease := true + defer func() { + if doRelease { + active.release() + } + }() errInfo, err := handleRangefeedError(ctx, reason) if err != nil { @@ -480,9 +535,8 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( return err } - if errInfo.evict && active.token.Valid() { - active.token.Evict(ctx) - active.token = rangecache.EvictionToken{} + if errInfo.evict { + active.resetRouting(ctx, rangecache.EvictionToken{}) } rs, err := keys.SpanAddr(active.Span) @@ -493,7 +547,12 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( if errInfo.resolveSpan { return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed) } - return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token) + + if err := active.start(ctx, m); err != nil { + return err + } + doRelease = false // active stream ownership transferred to start above. + return nil } // startRangeFeed initiates rangefeed for the specified request running diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 99d0438a1edb..a73a287f3cce 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3267,7 +3267,6 @@ HAVING func TestDecommission(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 96630, "flaky test") defer log.Scope(t).Close(t) // Five nodes is too much to reliably run under testrace with our aggressive