diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go index 96570dbf69ca..542d260dda3d 100644 --- a/pkg/cli/zip_table_registry.go +++ b/pkg/cli/zip_table_registry.go @@ -599,6 +599,7 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{ "range_end", "resolved", "last_event_utc", + "catchup", }, }, "crdb_internal.feature_usage": { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 1170ebafb6fb..0986e38d35f5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -222,13 +222,6 @@ This counts the number of ranges with an active rangefeed that are performing ca Measurement: "Ranges", Unit: metric.Unit_COUNT, } - metaDistSenderRangefeedRestartStuck = metric.Metadata{ - Name: "distsender.rangefeed.restart_stuck", - Help: `Number of times a rangefeed was restarted due to not receiving ` + - `timely updates (kv.rangefeed.range_stuck_threshold cluster setting)`, - Measurement: "Count", - Unit: metric.Unit_COUNT, - } ) // CanSendToFollower is used by the DistSender to determine if it needs to look @@ -310,9 +303,7 @@ type DistSenderMetrics struct { type DistSenderRangeFeedMetrics struct { RangefeedRanges *metric.Gauge RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter + Errors rangeFeedErrorCounters } func makeDistSenderMetrics() DistSenderMetrics { @@ -353,13 +344,87 @@ func makeDistSenderMetrics() DistSenderMetrics { return m } +// rangeFeedErrorCounters are various error related counters for rangefeed. +type rangeFeedErrorCounters struct { + RangefeedRestartRanges *metric.Counter + RangefeedErrorCatchup *metric.Counter + RetryErrors []*metric.Counter + Stuck *metric.Counter + SendErrors *metric.Counter + StoreNotFound *metric.Counter + NodeNotFound *metric.Counter + RangeNotFound *metric.Counter + RangeKeyMismatch *metric.Counter +} + +func makeRangeFeedErrorCounters() rangeFeedErrorCounters { + var retryCounters []*metric.Counter + for name := range kvpb.RangeFeedRetryError_Reason_value { + name = strings.TrimPrefix(name, "REASON_") + retryCounters = append(retryCounters, metric.NewCounter(metric.Metadata{ + Name: fmt.Sprintf("distsender.rangefeed.retry.%s", strings.ToLower(name)), + Help: fmt.Sprintf(`Number of ranges that encountered retryable %s error`, name), + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + })) + } + + retryMeta := func(name string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("distsender.rangefeed.retry.%s", strings.ReplaceAll(name, " ", "_")), + Help: fmt.Sprintf("Number of ranges that encountered retryable %s error", name), + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + } + } + + return rangeFeedErrorCounters{ + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RetryErrors: retryCounters, + Stuck: metric.NewCounter(retryMeta("stuck")), + SendErrors: metric.NewCounter(retryMeta("send")), + StoreNotFound: metric.NewCounter(retryMeta("store not found")), + NodeNotFound: metric.NewCounter(retryMeta("node not found")), + RangeNotFound: metric.NewCounter(retryMeta("range not found")), + RangeKeyMismatch: metric.NewCounter(retryMeta("range key mismatch")), + } +} + +// GetRangeFeedRetryCounter returns retry reason counter for the specified reason. +// Use this method instead of direct counter access (since this method handles +// potential gaps in retry reason values). +func (c rangeFeedErrorCounters) GetRangeFeedRetryCounter( + reason kvpb.RangeFeedRetryError_Reason, +) *metric.Counter { + // Normally, retry reason values are contiguous. One way gaps could be + // introduced, is if some retry reasons are retired (deletions are + // accomplished by reserving enum value to prevent its re-use), and then more + // reason added after. Then, we can't use reason value as an index into + // retryCounters. Because this scenario is believed to be very unlikely, we + // forego any fancy re-mapping schemes, and instead opt for explicit handling. + switch reason { + case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, + kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, + kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, + kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, + kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, + kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER, + kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER, + kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED: + return c.RetryErrors[reason] + default: + panic(errors.AssertionFailedf("unknown retry reason %d", reason)) + } +} + +func (rangeFeedErrorCounters) MetricStruct() {} + func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics { return DistSenderRangeFeedMetrics{ RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + Errors: makeRangeFeedErrorCounters(), } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 4c0e2105bd59..b05e740ab0a5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -95,7 +95,10 @@ func muxRangeFeed( m.metrics = cfg.knobs.metrics } - divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g) + m.g.GoCtx(func(ctx context.Context) error { + return divideAllSpansOnRangeBoundaries(ctx, spans, m.startSingleRangeFeed, ds, &m.g) + }) + return errors.CombineErrors(m.g.Wait(), ctx.Err()) } @@ -165,12 +168,6 @@ type activeMuxRangeFeed struct { roachpb.ReplicaDescriptor startAfter hlc.Timestamp - // catchupRes is the catchup scan quota acquired upon the - // start of rangefeed. - // It is released when this stream receives first non-empty checkpoint - // (meaning: catchup scan completes). - catchupRes catchupAlloc - // State pertaining to execution of rangefeed call. token rangecache.EvictionToken transport Transport @@ -218,7 +215,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( // Register active mux range feed. stream := &activeMuxRangeFeed{ - activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics.RangefeedRanges), + activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics), rSpan: rs, startAfter: startAfter, token: token, @@ -409,7 +406,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( // make sure that the underlying error is not fatal. If it is, there is no // reason to restart each rangefeed, so just bail out. - if _, err := handleRangefeedError(ctx, recvErr); err != nil { + if _, err := handleRangefeedError(ctx, m.metrics, recvErr); err != nil { // Regardless of an error, release any resources (i.e. metrics) still // being held by active stream. for _, s := range toRestart { @@ -467,9 +464,8 @@ func (m *rangefeedMuxer) receiveEventsFromNode( case *kvpb.RangeFeedCheckpoint: if t.Span.Contains(active.Span) { // If we see the first non-empty checkpoint, we know we're done with the catchup scan. - if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil { - active.catchupRes.Release() - active.catchupRes = nil + if active.catchupRes != nil { + active.releaseCatchupScan() } // Note that this timestamp means that all rows in the span with // writes at or before the timestamp have now been seen. The @@ -481,7 +477,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) if active.catchupRes != nil { - m.metrics.RangefeedErrorCatchup.Inc(1) + m.metrics.Errors.RangefeedErrorCatchup.Inc(1) } ms.deleteStream(event.StreamID) // Restart rangefeed on another goroutine. Restart might be a bit @@ -519,15 +515,12 @@ func (m *rangefeedMuxer) restartActiveRangeFeeds( func (m *rangefeedMuxer) restartActiveRangeFeed( ctx context.Context, active *activeMuxRangeFeed, reason error, ) error { - m.metrics.RangefeedRestartRanges.Inc(1) + m.metrics.Errors.RangefeedRestartRanges.Inc(1) active.setLastError(reason) // Release catchup scan reservation if any -- we will acquire another // one when we restart. - if active.catchupRes != nil { - active.catchupRes.Release() - active.catchupRes = nil - } + active.releaseCatchupScan() doRelease := true defer func() { @@ -536,7 +529,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( } }() - errInfo, err := handleRangefeedError(ctx, reason) + errInfo, err := handleRangefeedError(ctx, m.metrics, reason) if err != nil { // If this is an error we cannot recover from, terminate the rangefeed. return err diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 98d3d0eeb08d..31bc9221573d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -243,10 +242,37 @@ func (ds *DistSender) RangeFeedSpans( for { select { case sri := <-rangeCh: + // Bound the partial rangefeed to the partial span. + span := sri.rs.AsRawSpanWithNoLocals() + + // Register partial range feed with registry. We do this prior to acquiring + // catchup scan quota so that we have some observability into the ranges + // that are blocked, waiting for quota. + active := newActiveRangeFeed(span, sri.startAfter, rr, metrics) + + acquireStart := timeutil.Now() + if log.V(1) { + log.Infof(ctx, "RangeFeed starting for span %s@%s (quota acquisition)", span, sri.startAfter) + } + // Prior to spawning goroutine to process this feed, acquire catchup scan quota. + // This quota acquisition paces the rate of new goroutine creation. + catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics) + if err != nil { + active.release() + return err + } + active.catchupRes = catchupRes + if log.V(1) { + log.Infof(ctx, "RangeFeed starting for span %s@%s (quota acquired in %s)", + span, sri.startAfter, timeutil.Since(acquireStart)) + } + // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, - sri.token, &catchupSem, rangeCh, eventCh, cfg, metrics) + defer active.release() + + return ds.partialRangeFeed(ctx, active, sri.rs, sri.startAfter, + sri.token, rangeCh, eventCh, cfg, metrics) }) case <-ctx.Done(): return ctx.Err() @@ -255,7 +281,9 @@ func (ds *DistSender) RangeFeedSpans( }) // Kick off the initial set of ranges. - divideAllSpansOnRangeBoundaries(spans, sendSingleRangeInfo(rangeCh), ds, &g) + g.GoCtx(func(ctx context.Context) error { + return divideAllSpansOnRangeBoundaries(ctx, spans, sendSingleRangeInfo(rangeCh), ds, &g) + }) return g.Wait() } @@ -264,8 +292,8 @@ func (ds *DistSender) RangeFeedSpans( // provided onRange function for each range. Resolution happens concurrently using provided // context group. func divideAllSpansOnRangeBoundaries( - spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, -) { + ctx context.Context, spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, +) error { // Sort input spans based on their start time -- older spans first. // Starting rangefeed over large number of spans is an expensive proposition, // since this involves initiating catch-up scan operation for each span. These @@ -280,17 +308,16 @@ func divideAllSpansOnRangeBoundaries( return spans[i].StartAfter.Less(spans[j].StartAfter) }) - for _, s := range spans { - func(stp SpanTimePair) { - g.GoCtx(func(ctx context.Context) error { - rs, err := keys.SpanAddr(stp.Span) - if err != nil { - return err - } - return divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange) - }) - }(s) + for _, stp := range spans { + rs, err := keys.SpanAddr(stp.Span) + if err != nil { + return err + } + if err := divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange); err != nil { + return err + } } + return nil } // RangeFeedContext is the structure containing arguments passed to @@ -305,13 +332,18 @@ type RangeFeedContext struct { // PartialRangeFeed structure describes the state of currently executing partial range feed. type PartialRangeFeed struct { - Span roachpb.Span - StartAfter hlc.Timestamp // exclusive + // The following fields are immutable and are initialized + // once the rangefeed for a range starts. + Span roachpb.Span + StartAfter hlc.Timestamp // exclusive + CreatedTime time.Time + + // Fields below are mutable. NodeID roachpb.NodeID RangeID roachpb.RangeID - CreatedTime time.Time LastValueReceived time.Time Resolved hlc.Timestamp + InCatchup bool NumErrs int LastErr error } @@ -343,7 +375,9 @@ func (r *rangeFeedRegistry) ForEachPartialRangefeed(fn ActiveRangeFeedIterFn) (i partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { active.Lock() defer active.Unlock() - return active.PartialRangeFeed + p := active.PartialRangeFeed + p.InCatchup = active.catchupRes != nil + return p } r.ranges.Range(func(k, v interface{}) bool { active := k.(*activeRangeFeed) @@ -358,7 +392,23 @@ func (r *rangeFeedRegistry) ForEachPartialRangefeed(fn ActiveRangeFeedIterFn) (i // activeRangeFeed is a thread safe PartialRangeFeed. type activeRangeFeed struct { + // release releases resources and updates metrics when + // active rangefeed completes. release func() + + // catchupRes is the catchup scan quota acquired upon the + // start of rangefeed. + // It is released when this stream receives first non-empty checkpoint + // (meaning: catchup scan completes). + // Safe to release multiple times. + catchupRes catchupAlloc + + // PartialRangeFeed contains information about this range + // mostly for the purpose of exposing it to the external + // observability tools (crdb_internal.active_range_feeds). + // This state is protected by the mutex to avoid data races. + // The mutex overhead in a common case -- a single goroutine + // (singleRangeFeed) mutating this data is low. syncutil.Mutex PartialRangeFeed } @@ -459,7 +509,10 @@ func divideSpanOnRangeBoundaries( // newActiveRangeFeed registers active rangefeed with rangefeedRegistry. // The caller must call active.release() in order to cleanup. func newActiveRangeFeed( - span roachpb.Span, startAfter hlc.Timestamp, rr *rangeFeedRegistry, c *metric.Gauge, + span roachpb.Span, + startAfter hlc.Timestamp, + rr *rangeFeedRegistry, + metrics *DistSenderRangeFeedMetrics, ) *activeRangeFeed { // Register partial range feed with registry. active := &activeRangeFeed{ @@ -469,27 +522,38 @@ func newActiveRangeFeed( CreatedTime: timeutil.Now(), }, } + active.release = func() { + active.releaseCatchupScan() rr.ranges.Delete(active) - c.Dec(1) + metrics.RangefeedRanges.Dec(1) } + rr.ranges.Store(active, nil) - c.Inc(1) + metrics.RangefeedRanges.Inc(1) return active } -// partialRangeFeed establishes a RangeFeed to the range specified by desc. It -// manages lifecycle events of the range in order to maintain the RangeFeed +// releaseCatchupScan releases catchup scan allocation, if any. +// safe to call multiple times. +func (a *activeRangeFeed) releaseCatchupScan() { + if a.catchupRes != nil { + a.catchupRes.Release() + a.catchupRes = nil + } +} + +// partialRangeFeed establishes a RangeFeed to the range specified the routing token. +// This method manages lifecycle events of the range in order to maintain the RangeFeed // connection; this may involve instructing higher-level functions to retry // this rangefeed, or subdividing the range further in the event of a split. func (ds *DistSender) partialRangeFeed( ctx context.Context, - rr *rangeFeedRegistry, + active *activeRangeFeed, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, - catchupSem *limit.ConcurrentRequestLimiter, rangeCh chan<- singleRangeInfo, eventCh chan<- RangeFeedMessage, cfg rangeFeedConfig, @@ -498,10 +562,6 @@ func (ds *DistSender) partialRangeFeed( // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() - // Register partial range feed with registry. - active := newActiveRangeFeed(span, startAfter, rr, metrics.RangefeedRanges) - defer active.release() - // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { // If we've cleared the descriptor on a send failure, re-lookup. @@ -523,29 +583,31 @@ func (ds *DistSender) partialRangeFeed( log.Infof(ctx, "RangeFeed starting for range %d@%s (%s)", token.Desc().RangeID, startAfter, span) } - maxTS, err := ds.singleRangeFeed( - ctx, span, startAfter, token.Desc(), - catchupSem, eventCh, active.onRangeEvent, cfg, metrics) + maxTS, err := ds.singleRangeFeed(ctx, active, span, startAfter, token.Desc(), eventCh, cfg, metrics) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) if log.V(1) { - log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", - active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), err) + log.Infof(ctx, "RangeFeed %d@%s (%s) disconnected with last checkpoint %s ago: %v", + token.Desc().RangeID, active.StartAfter, active.Span, timeutil.Since(active.Resolved.GoTime()), err) } active.setLastError(err) - errInfo, err := handleRangefeedError(ctx, err) + errInfo, err := handleRangefeedError(ctx, metrics, err) if err != nil { return err } - metrics.RangefeedRestartRanges.Inc(1) if errInfo.evict { token.Evict(ctx) token = rangecache.EvictionToken{} } + if errInfo.resolveSpan { + // We must release catchup scan reservation prior to attempt to + // re-resolve since this will attempt to acquire 1 or more catchup + // scan reservations. + active.releaseCatchupScan() return divideSpanOnRangeBoundaries(ctx, ds, rs, startAfter, sendSingleRangeInfo(rangeCh)) } } @@ -560,7 +622,11 @@ type rangefeedErrorInfo struct { // handleRangefeedError handles an error that occurred while running rangefeed. // Returns rangefeedErrorInfo describing how the error should be handled for the // range. Returns an error if the entire rangefeed should terminate. -func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, error) { +func handleRangefeedError( + ctx context.Context, metrics *DistSenderRangeFeedMetrics, err error, +) (rangefeedErrorInfo, error) { + metrics.Errors.RangefeedRestartRanges.Inc(1) + if err == nil { return rangefeedErrorInfo{}, nil } @@ -569,11 +635,17 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e case errors.Is(err, io.EOF): // If we got an EOF, treat it as a signal to restart single range feed. return rangefeedErrorInfo{}, nil - case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || - errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): + case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)): + // These errors are likely to be unique to the replica that + // reported them, so no action is required before the next + // retry. + metrics.Errors.StoreNotFound.Inc(1) + return rangefeedErrorInfo{}, nil + case errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): // These errors are likely to be unique to the replica that // reported them, so no action is required before the next // retry. + metrics.Errors.NodeNotFound.Inc(1) return rangefeedErrorInfo{}, nil case errors.Is(err, errRestartStuckRange): // Stuck ranges indicate a bug somewhere in the system. We are being @@ -583,16 +655,23 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e // // The error contains the replica which we were waiting for. log.Warningf(ctx, "restarting stuck rangefeed: %s", err) + metrics.Errors.Stuck.Inc(1) return rangefeedErrorInfo{evict: true}, nil - case IsSendError(err), errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): + case IsSendError(err): + metrics.Errors.SendErrors.Inc(1) + return rangefeedErrorInfo{evict: true}, nil + case errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): + metrics.Errors.RangeNotFound.Inc(1) return rangefeedErrorInfo{evict: true}, nil case errors.HasType(err, (*kvpb.RangeKeyMismatchError)(nil)): + metrics.Errors.RangeKeyMismatch.Inc(1) return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil case errors.HasType(err, (*kvpb.RangeFeedRetryError)(nil)): var t *kvpb.RangeFeedRetryError if ok := errors.As(err, &t); !ok { return rangefeedErrorInfo{}, errors.AssertionFailedf("wrong error type: %T", err) } + metrics.Errors.GetRangeFeedRetryCounter(t.Reason).Inc(1) switch t.Reason { case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, @@ -659,10 +738,6 @@ func newTransportForRange( return ds.transportFactory(opts, ds.nodeDialer, replicas) } -// onRangeEventCb is invoked for each non-error range event. -// nodeID identifies the node ID which generated the event. -type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *kvpb.RangeFeedEvent) - // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and // rangeID. Request is constructed to watch event after specified timestamp, and // with optional diff. If the request corresponds to a system range, request @@ -725,12 +800,11 @@ func defaultStuckRangeThreshold(st *cluster.Settings) func() time.Duration { // request's timestamp if not checkpoints are seen. func (ds *DistSender) singleRangeFeed( ctx context.Context, + active *activeRangeFeed, span roachpb.Span, startAfter hlc.Timestamp, desc *roachpb.RangeDescriptor, - catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, - onRangeEvent onRangeEventCb, cfg rangeFeedConfig, metrics *DistSenderRangeFeedMetrics, ) (_ hlc.Timestamp, retErr error) { @@ -750,22 +824,6 @@ func (ds *DistSender) singleRangeFeed( } defer transport.Release() - // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take - // opportunity to update semaphore limit. - catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, catchupSem, metrics) - if err != nil { - return hlc.Timestamp{}, err - } - - finishCatchupScan := func() { - if catchupRes != nil { - catchupRes.Release() - catchupRes = nil - } - } - // cleanup catchup reservation in case of early termination. - defer finishCatchupScan() - stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, defaultStuckRangeThreshold(ds.st)) defer stuckWatcher.stop() @@ -820,7 +878,7 @@ func (ds *DistSender) singleRangeFeed( }); err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) if stuckWatcher.stuck() { - afterCatchUpScan := catchupRes == nil + afterCatchUpScan := active.catchupRes == nil return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, err @@ -841,8 +899,8 @@ func (ds *DistSender) singleRangeFeed( case *kvpb.RangeFeedCheckpoint: if t.Span.Contains(args.Span) { // If we see the first non-empty checkpoint, we know we're done with the catchup scan. - if !t.ResolvedTS.IsEmpty() && catchupRes != nil { - finishCatchupScan() + if active.catchupRes != nil { + active.releaseCatchupScan() } // Note that this timestamp means that all rows in the span with // writes at or before the timestamp have now been seen. The @@ -853,19 +911,19 @@ func (ds *DistSender) singleRangeFeed( } case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) - if catchupRes != nil { - metrics.RangefeedErrorCatchup.Inc(1) + if active.catchupRes != nil { + metrics.Errors.RangefeedErrorCatchup.Inc(1) } if stuckWatcher.stuck() { // When the stuck watcher fired, and the rangefeed call is local, // the remote might notice the cancellation first and return from // Recv with an error that we need to special-case here. - afterCatchUpScan := catchupRes == nil + afterCatchUpScan := active.catchupRes == nil return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, t.Error.GoError() } - onRangeEvent(args.Replica.NodeID, desc.RangeID, event) + active.onRangeEvent(args.Replica.NodeID, desc.RangeID, event) select { case eventCh <- msg: @@ -889,7 +947,6 @@ func handleStuckEvent( threshold time.Duration, m *DistSenderRangeFeedMetrics, ) error { - m.RangefeedRestartStuck.Inc(1) if afterCatchupScan { telemetry.Count("rangefeed.stuck.after-catchup-scan") } else { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index e8014eb13070..28be7891eafb 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -462,7 +462,7 @@ func TestRestartsStuckRangeFeeds(t *testing.T) { closeFeed() require.True(t, blockingClient.ctxCanceled) - require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().RangefeedRestartStuck.Count()) + require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().Errors.Stuck.Count()) } func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { @@ -589,7 +589,7 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { // NB: We really expect exactly 1 but with a 1s timeout, it's not inconceivable that // on a particularly slow CI machine some unrelated rangefeed could also catch the occasional // retry. - require.NotZero(t, ds.Metrics().RangefeedRestartStuck.Count()) + require.NotZero(t, ds.Metrics().Errors.Stuck.Count()) } func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { @@ -684,7 +684,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) { // Upon shutdown, make sure the metrics have correct values. defer func() { require.EqualValues(t, 0, metrics.RangefeedRanges.Value()) - require.EqualValues(t, 0, metrics.RangefeedRestartStuck.Count()) + require.EqualValues(t, 0, metrics.Errors.Stuck.Count()) // We injected numRangesToRetry transient errors during catchup scan. // It is possible however, that we will observe key-mismatch error when restarting @@ -693,8 +693,8 @@ func TestRangeFeedMetricsManagement(t *testing.T) { // When iterating through the entire table span, we pick up correct version. // However, if we attempt to re-resolve single range, we may get incorrect/old // version that was cached. Thus, we occasionally see additional transient restarts. - require.GreaterOrEqual(t, metrics.RangefeedErrorCatchup.Count(), numRangesToRetry) - require.GreaterOrEqual(t, metrics.RangefeedRestartRanges.Count(), numRangesToRetry) + require.GreaterOrEqual(t, metrics.Errors.RangefeedErrorCatchup.Count(), numRangesToRetry) + require.GreaterOrEqual(t, metrics.Errors.RangefeedRestartRanges.Count(), numRangesToRetry) // Even though numCatchupToBlock ranges were blocked in the catchup scan phase, // the counter should be 0 once rangefeed is done. @@ -741,7 +741,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) { case *kvpb.RangeFeedCheckpoint: if checkpoint := t; checkpoint.Span.Contains(s) { if checkpoint.ResolvedTS.IsEmpty() { - return false, nil + return true, nil } // Skip any subsequent checkpoint if we previously arranged for diff --git a/pkg/kv/kvpb/errors.go b/pkg/kv/kvpb/errors.go index 798c5c9439fe..c3b6ad0f8b33 100644 --- a/pkg/kv/kvpb/errors.go +++ b/pkg/kv/kvpb/errors.go @@ -348,6 +348,7 @@ func init() { errors.RegisterTypeMigration(roachpbPath, "*roachpb.RefreshFailedError", &RefreshFailedError{}) errors.RegisterTypeMigration(roachpbPath, "*roachpb.MVCCHistoryMutationError", &MVCCHistoryMutationError{}) errors.RegisterTypeMigration(roachpbPath, "*roachpb.InsufficientSpaceError", &InsufficientSpaceError{}) + } // GoError returns a Go error converted from Error. If the error is a transaction diff --git a/pkg/kv/kvpb/errors.proto b/pkg/kv/kvpb/errors.proto index 9b52cf68b818..6dcfd3504ea9 100644 --- a/pkg/kv/kvpb/errors.proto +++ b/pkg/kv/kvpb/errors.proto @@ -592,6 +592,10 @@ message MergeInProgressError { // because of a range lifecycle event, and can be retried. message RangeFeedRetryError { // Reason specifies what caused the error. + // NB: reason names should be stable because they are + // exposed as rangefeed error counters. + // NB: *never* delete enum values; if retry reason needs to be retired, + // reserve reason value. enum Reason { // The replica was removed from its store. REASON_REPLICA_REMOVED = 0; diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 7f7298b470d7..18e694f8d3ef 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -7093,6 +7093,7 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_end STRING, resolved STRING, last_event_utc INT, + catchup BOOL, num_errs INT, last_err STRING );`, @@ -7124,6 +7125,7 @@ CREATE TABLE crdb_internal.active_range_feeds ( tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.EndKey)), tree.NewDString(rf.Resolved.AsOfSystemTime()), lastEvent, + tree.MakeDBool(tree.DBool(rf.InCatchup)), tree.NewDInt(tree.DInt(rf.NumErrs)), lastErr, ) diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog index d13f6fb74753..710e2e9a22e9 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog @@ -405,7 +405,7 @@ SELECT id, strip_volatile(descriptor) FROM crdb_internal.kv_catalog_descriptor O 4294967207 {"table": {"columns": [{"id": 1, "name": "id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "database_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "super_region_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "regions", "nullable": true, "type": {"arrayContents": {"family": "StringFamily", "oid": 25}, "arrayElemType": "StringFamily", "family": "ArrayFamily", "oid": 1009}}], "formatVersion": 3, "id": 4294967207, "name": "super_regions", "nextColumnId": 5, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967208 {"table": {"columns": [{"id": 1, "name": "name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "implemented", "nullable": true, "type": {"oid": 16}}], "formatVersion": 3, "id": 4294967208, "name": "pg_catalog_table_is_implemented", "nextColumnId": 3, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967209 {"table": {"columns": [{"id": 1, "name": "tenant_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "total_ru", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 3, "name": "total_read_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "total_read_requests", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 5, "name": "total_write_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "total_write_requests", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "total_sql_pod_seconds", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 8, "name": "total_pgwire_egress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "total_external_io_ingress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "total_external_io_egress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 11, "name": "total_kv_ru", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 12, "name": "total_cross_region_network_ru", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}], "formatVersion": 3, "id": 4294967209, "name": "tenant_usage_details", "nextColumnId": 13, "nextConstraintId": 1, "nextMutationId": 1, "primaryIndex": {"foreignKey": {}, "geoConfig": {}, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1", "viewQuery": "SELECT tenant_id, (j->>'rU')::FLOAT8 AS total_ru, (j->>'readBytes')::INT8 AS total_read_bytes, (j->>'readRequests')::INT8 AS total_read_requests, (j->>'writeBytes')::INT8 AS total_write_bytes, (j->>'writeRequests')::INT8 AS total_write_requests, (j->>'sqlPodsCpuSeconds')::FLOAT8 AS total_sql_pod_seconds, (j->>'pgwireEgressBytes')::INT8 AS total_pgwire_egress_bytes, (j->>'externalIOIngressBytes')::INT8 AS total_external_io_ingress_bytes, (j->>'externalIOIngressBytes')::INT8 AS total_external_io_ingress_bytes, (j->>'kvRU')::FLOAT8 AS total_kv_ru, (j->>'crossRegionNetworkRU')::FLOAT8 AS total_cross_region_network_ru FROM (SELECT tenant_id, crdb_internal.pb_to_json('cockroach.roachpb.TenantConsumption', total_consumption) AS j FROM system.tenant_usage WHERE instance_id = 0)"}} -4294967210 {"table": {"columns": [{"id": 1, "name": "id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "tags", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "startts", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "diff", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "node_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "range_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "created", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "range_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 9, "name": "range_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 10, "name": "resolved", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 11, "name": "last_event_utc", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "num_errs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 13, "name": "last_err", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967210, "name": "active_range_feeds", "nextColumnId": 14, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} +4294967210 {"table": {"columns": [{"id": 1, "name": "id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "tags", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "startts", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "diff", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "node_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "range_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "created", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "range_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 9, "name": "range_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 10, "name": "resolved", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 11, "name": "last_event_utc", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "catchup", "nullable": true, "type": {"oid": 16}}, {"id": 13, "name": "num_errs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 14, "name": "last_err", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967210, "name": "active_range_feeds", "nextColumnId": 15, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967211 {"table": {"columns": [{"id": 1, "name": "database_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "schema_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "role", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "for_all_roles", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "object_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 6, "name": "grantee", "type": {"family": "StringFamily", "oid": 25}}, {"id": 7, "name": "privilege_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 8, "name": "is_grantable", "nullable": true, "type": {"oid": 16}}], "formatVersion": 3, "id": 4294967211, "name": "default_privileges", "nextColumnId": 9, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967212 {"table": {"columns": [{"id": 1, "name": "region", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "zones", "type": {"arrayContents": {"family": "StringFamily", "oid": 25}, "arrayElemType": "StringFamily", "family": "ArrayFamily", "oid": 1009}}], "formatVersion": 3, "id": 4294967212, "name": "regions", "nextColumnId": 3, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967213 {"table": {"columns": [{"id": 1, "name": "trace_id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "node_id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 3, "name": "root_op_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "trace_str", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "jaeger_json", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967213, "indexes": [{"foreignKey": {}, "geoConfig": {}, "id": 2, "interleave": {}, "keyColumnDirections": ["ASC"], "keyColumnIds": [1], "keyColumnNames": ["trace_id"], "name": "cluster_inflight_traces_trace_id_idx", "partitioning": {}, "sharded": {}, "storeColumnIds": [2, 3, 4, 5], "storeColumnNames": ["node_id", "root_op_name", "trace_str", "jaeger_json"], "version": 3}], "name": "cluster_inflight_traces", "nextColumnId": 6, "nextConstraintId": 2, "nextIndexId": 3, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}