From ef0e5caa4424468a8107bb6100c38e86e8c7306a Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 23 Aug 2023 11:51:00 -0400 Subject: [PATCH] kvcoord: Pace rangefeed client goroutine creation Acquire catchup scan quota prior to goroutine creation in order to pace the goroutine creation rate. This change results in nice and smooth growth in goroutine count, thus reducing the pressure on goroutine scheduler, which in turn reduces the impact on SQL latency during changefeed startup. This change also improves observability in rangefeed client by introducing new counters: * `distsender.rangefeed.retry.`: counter keeping track of the number of ranges that ecountered a retryable error of particular type (e.g. slow counsumer, range split, etc). Observability also enhanced by adding a column to `crdb_internal.active_rangefeed` virtual table augment to indicate if the range is currently in catchup scan mode. Fixes #98842 Release note (enterprise change): Pace rangefeed goroutine creation rate to improve scheduler latency. Improve observability by adding additional metrics indicating the reason for rangefeed restart as well as additional column in the `crdb_internal.active_rangefeed` table to indicate if the range is currently in catchup scan mode. --- pkg/cli/zip_table_registry.go | 1 + pkg/kv/kvclient/kvcoord/dist_sender.go | 91 +++++++-- .../kvcoord/dist_sender_mux_rangefeed.go | 29 ++- .../kvclient/kvcoord/dist_sender_rangefeed.go | 186 +++++++++++------- .../kvcoord/dist_sender_rangefeed_test.go | 10 +- pkg/kv/kvpb/errors.go | 1 + pkg/kv/kvpb/errors.proto | 4 + pkg/sql/crdb_internal.go | 2 + .../testdata/logic_test/crdb_internal_catalog | 2 +- pkg/util/metric/registry.go | 9 +- pkg/util/metric/registry_test.go | 10 +- 11 files changed, 225 insertions(+), 120 deletions(-) diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go index 48ab766e4bfb..94df7308e95e 100644 --- a/pkg/cli/zip_table_registry.go +++ b/pkg/cli/zip_table_registry.go @@ -599,6 +599,7 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{ "range_end", "resolved", "last_event_utc", + "catchup", }, }, "crdb_internal.feature_usage": { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index b6203424d82c..450b7ad05372 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -222,13 +222,6 @@ This counts the number of ranges with an active rangefeed that are performing ca Measurement: "Ranges", Unit: metric.Unit_COUNT, } - metaDistSenderRangefeedRestartStuck = metric.Metadata{ - Name: "distsender.rangefeed.restart_stuck", - Help: `Number of times a rangefeed was restarted due to not receiving ` + - `timely updates (kv.rangefeed.range_stuck_threshold cluster setting)`, - Measurement: "Count", - Unit: metric.Unit_COUNT, - } ) // CanSendToFollower is used by the DistSender to determine if it needs to look @@ -310,9 +303,7 @@ type DistSenderMetrics struct { type DistSenderRangeFeedMetrics struct { RangefeedRanges *metric.Gauge RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter + Errors rangeFeedErrorCounters } func makeDistSenderMetrics() DistSenderMetrics { @@ -353,13 +344,87 @@ func makeDistSenderMetrics() DistSenderMetrics { return m } +// rangeFeedErrorCounters are various error related counters for rangefeed. +type rangeFeedErrorCounters struct { + RangefeedRestartRanges *metric.Counter + RangefeedErrorCatchup *metric.Counter + RetryErrors []*metric.Counter + Stuck *metric.Counter + SendErrors *metric.Counter + StoreNotFound *metric.Counter + NodeNotFound *metric.Counter + RangeNotFound *metric.Counter + RangeKeyMismatch *metric.Counter +} + +func makeRangeFeedErrorCounters() rangeFeedErrorCounters { + var retryCounters []*metric.Counter + for name := range kvpb.RangeFeedRetryError_Reason_value { + name = strings.TrimPrefix(name, "REASON_") + retryCounters = append(retryCounters, metric.NewCounter(metric.Metadata{ + Name: fmt.Sprintf("distsender.rangefeed.retry.%s", strings.ToLower(name)), + Help: fmt.Sprintf(`Number of ranges that encountered retryable %s error`, name), + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + })) + } + + retryMeta := func(name string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("distsender.rangefeed.retry.%s", strings.ReplaceAll(name, " ", "_")), + Help: fmt.Sprintf("Number of ranges that encountered retryable %s error", name), + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + } + } + + return rangeFeedErrorCounters{ + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RetryErrors: retryCounters, + Stuck: metric.NewCounter(retryMeta("stuck")), + SendErrors: metric.NewCounter(retryMeta("send error")), + StoreNotFound: metric.NewCounter(retryMeta("store not found")), + NodeNotFound: metric.NewCounter(retryMeta("node not found")), + RangeNotFound: metric.NewCounter(retryMeta("range not found")), + RangeKeyMismatch: metric.NewCounter(retryMeta("range key mismatch")), + } +} + +// GetRangeFeedRetryCounter returns retry reason counter for the specified reason. +// Use this method instead of direct counter access (since this method handles +// potential gaps in retry reason values). +func (c rangeFeedErrorCounters) GetRangeFeedRetryCounter( + reason kvpb.RangeFeedRetryError_Reason, +) *metric.Counter { + // Normally, retry reason values are contiguous. One way gaps could be + // introduced, is if some retry reasons are retired (deletions are + // accomplished by reserving enum value to prevent its re-use), and then more + // reason added after. Then, we can't use reason value as an index into + // retryCounters. Because this scenario is believed to be very unlikely, we + // forego any fancy re-mapping schemes, and instead opt for explicit handling. + switch reason { + case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, + kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, + kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, + kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, + kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, + kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER, + kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER, + kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED: + return c.RetryErrors[reason] + default: + panic(errors.AssertionFailedf("unknown retry reason %d", reason)) + } +} + +func (rangeFeedErrorCounters) MetricStruct() {} + func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics { return DistSenderRangeFeedMetrics{ RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + Errors: makeRangeFeedErrorCounters(), } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 4c0e2105bd59..f763717689db 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -95,7 +95,10 @@ func muxRangeFeed( m.metrics = cfg.knobs.metrics } - divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g) + m.g.GoCtx(func(ctx context.Context) error { + return divideAllSpansOnRangeBoundaries(ctx, spans, m.startSingleRangeFeed, ds, &m.g) + }) + return errors.CombineErrors(m.g.Wait(), ctx.Err()) } @@ -165,12 +168,6 @@ type activeMuxRangeFeed struct { roachpb.ReplicaDescriptor startAfter hlc.Timestamp - // catchupRes is the catchup scan quota acquired upon the - // start of rangefeed. - // It is released when this stream receives first non-empty checkpoint - // (meaning: catchup scan completes). - catchupRes catchupAlloc - // State pertaining to execution of rangefeed call. token rangecache.EvictionToken transport Transport @@ -218,7 +215,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( // Register active mux range feed. stream := &activeMuxRangeFeed{ - activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics.RangefeedRanges), + activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics), rSpan: rs, startAfter: startAfter, token: token, @@ -409,7 +406,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( // make sure that the underlying error is not fatal. If it is, there is no // reason to restart each rangefeed, so just bail out. - if _, err := handleRangefeedError(ctx, recvErr); err != nil { + if _, err := handleRangefeedError(ctx, m.metrics, recvErr); err != nil { // Regardless of an error, release any resources (i.e. metrics) still // being held by active stream. for _, s := range toRestart { @@ -468,8 +465,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( if t.Span.Contains(active.Span) { // If we see the first non-empty checkpoint, we know we're done with the catchup scan. if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil { - active.catchupRes.Release() - active.catchupRes = nil + active.releaseCatchupScan() } // Note that this timestamp means that all rows in the span with // writes at or before the timestamp have now been seen. The @@ -481,7 +477,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) if active.catchupRes != nil { - m.metrics.RangefeedErrorCatchup.Inc(1) + m.metrics.Errors.RangefeedErrorCatchup.Inc(1) } ms.deleteStream(event.StreamID) // Restart rangefeed on another goroutine. Restart might be a bit @@ -519,15 +515,12 @@ func (m *rangefeedMuxer) restartActiveRangeFeeds( func (m *rangefeedMuxer) restartActiveRangeFeed( ctx context.Context, active *activeMuxRangeFeed, reason error, ) error { - m.metrics.RangefeedRestartRanges.Inc(1) + m.metrics.Errors.RangefeedRestartRanges.Inc(1) active.setLastError(reason) // Release catchup scan reservation if any -- we will acquire another // one when we restart. - if active.catchupRes != nil { - active.catchupRes.Release() - active.catchupRes = nil - } + active.releaseCatchupScan() doRelease := true defer func() { @@ -536,7 +529,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( } }() - errInfo, err := handleRangefeedError(ctx, reason) + errInfo, err := handleRangefeedError(ctx, m.metrics, reason) if err != nil { // If this is an error we cannot recover from, terminate the rangefeed. return err diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 0dc209f7bb02..76b72dd9ab24 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -228,10 +227,37 @@ func (ds *DistSender) RangeFeedSpans( for { select { case sri := <-rangeCh: + // Bound the partial rangefeed to the partial span. + span := sri.rs.AsRawSpanWithNoLocals() + + // Register partial range feed with registry. We do this prior to acquiring + // catchup scan quota so that we have some observability into the ranges + // that are blocked, waiting for quota. + active := newActiveRangeFeed(span, sri.startAfter, rr, metrics) + + acquireStart := timeutil.Now() + if log.V(1) { + log.Infof(ctx, "RangeFeed starting for span %s@%s (quota acquisition)", span, sri.startAfter) + } + // Prior to spawning goroutine to process this feed, acquire catchup scan quota. + // This quota acquisition paces the rate of new goroutine creation. + catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics) + if err != nil { + active.release() + return err + } + active.catchupRes = catchupRes + if log.V(1) { + log.Infof(ctx, "RangeFeed starting for span %s@%s (quota acquired in %s)", + span, sri.startAfter, timeutil.Since(acquireStart)) + } + // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, - sri.token, &catchupSem, rangeCh, eventCh, cfg, metrics) + defer active.release() + + return ds.partialRangeFeed(ctx, active, sri.rs, sri.startAfter, + sri.token, rangeCh, eventCh, cfg, metrics) }) case <-ctx.Done(): return ctx.Err() @@ -240,7 +266,9 @@ func (ds *DistSender) RangeFeedSpans( }) // Kick off the initial set of ranges. - divideAllSpansOnRangeBoundaries(spans, sendSingleRangeInfo(rangeCh), ds, &g) + g.GoCtx(func(ctx context.Context) error { + return divideAllSpansOnRangeBoundaries(ctx, spans, sendSingleRangeInfo(rangeCh), ds, &g) + }) return g.Wait() } @@ -249,8 +277,8 @@ func (ds *DistSender) RangeFeedSpans( // provided onRange function for each range. Resolution happens concurrently using provided // context group. func divideAllSpansOnRangeBoundaries( - spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, -) { + ctx context.Context, spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, +) error { // Sort input spans based on their start time -- older spans first. // Starting rangefeed over large number of spans is an expensive proposition, // since this involves initiating catch-up scan operation for each span. These @@ -265,17 +293,16 @@ func divideAllSpansOnRangeBoundaries( return spans[i].StartAfter.Less(spans[j].StartAfter) }) - for _, s := range spans { - func(stp SpanTimePair) { - g.GoCtx(func(ctx context.Context) error { - rs, err := keys.SpanAddr(stp.Span) - if err != nil { - return err - } - return divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange) - }) - }(s) + for _, stp := range spans { + rs, err := keys.SpanAddr(stp.Span) + if err != nil { + return err + } + if err := divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange); err != nil { + return err + } } + return nil } // RangeFeedContext is the structure containing arguments passed to @@ -297,6 +324,7 @@ type PartialRangeFeed struct { CreatedTime time.Time LastValueReceived time.Time Resolved hlc.Timestamp + InCatchup bool NumErrs int LastErr error } @@ -311,10 +339,12 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr const continueIter = true const stopIter = false - partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { + partialRangeFeed := func(active *activeRangeFeed) (p PartialRangeFeed) { active.Lock() defer active.Unlock() - return active.PartialRangeFeed + p = active.PartialRangeFeed + p.InCatchup = active.catchupRes != nil + return p } ds.activeRangeFeeds.Range(func(k, v interface{}) bool { @@ -336,6 +366,13 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr // activeRangeFeed is a thread safe PartialRangeFeed. type activeRangeFeed struct { release func() + + // catchupRes is the catchup scan quota acquired upon the + // start of rangefeed. + // It is released when this stream receives first non-empty checkpoint + // (meaning: catchup scan completes). + catchupRes catchupAlloc + syncutil.Mutex PartialRangeFeed } @@ -436,7 +473,10 @@ func divideSpanOnRangeBoundaries( // newActiveRangeFeed registers active rangefeed with rangefeedRegistry. // The caller must call active.release() in order to cleanup. func newActiveRangeFeed( - span roachpb.Span, startAfter hlc.Timestamp, rr *rangeFeedRegistry, c *metric.Gauge, + span roachpb.Span, + startAfter hlc.Timestamp, + rr *rangeFeedRegistry, + metrics *DistSenderRangeFeedMetrics, ) *activeRangeFeed { // Register partial range feed with registry. active := &activeRangeFeed{ @@ -445,28 +485,39 @@ func newActiveRangeFeed( StartAfter: startAfter, CreatedTime: timeutil.Now(), }, - release: func() { - rr.ranges.Delete(active) - c.Dec(1) - }, } + + active.release = func() { + active.releaseCatchupScan() + rr.ranges.Delete(active) + metrics.RangefeedRanges.Dec(1) + } + rr.ranges.Store(active, nil) - c.Inc(1) + metrics.RangefeedRanges.Inc(1) return active } +// releaseCatchupScan releases catchup scan allocation, if any. +// safe to call multiple times. +func (a *activeRangeFeed) releaseCatchupScan() { + if a.catchupRes != nil { + a.catchupRes.Release() + a.catchupRes = nil + } +} + // partialRangeFeed establishes a RangeFeed to the range specified by desc. It // manages lifecycle events of the range in order to maintain the RangeFeed // connection; this may involve instructing higher-level functions to retry // this rangefeed, or subdividing the range further in the event of a split. func (ds *DistSender) partialRangeFeed( ctx context.Context, - rr *rangeFeedRegistry, + active *activeRangeFeed, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, - catchupSem *limit.ConcurrentRequestLimiter, rangeCh chan<- singleRangeInfo, eventCh chan<- RangeFeedMessage, cfg rangeFeedConfig, @@ -475,10 +526,6 @@ func (ds *DistSender) partialRangeFeed( // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() - // Register partial range feed with registry. - active := newActiveRangeFeed(span, startAfter, rr, metrics.RangefeedRanges) - defer active.release() - // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { // If we've cleared the descriptor on a send failure, re-lookup. @@ -500,29 +547,31 @@ func (ds *DistSender) partialRangeFeed( log.Infof(ctx, "RangeFeed starting for range %d@%s (%s)", token.Desc().RangeID, startAfter, span) } - maxTS, err := ds.singleRangeFeed( - ctx, span, startAfter, token.Desc(), - catchupSem, eventCh, active.onRangeEvent, cfg, metrics) + maxTS, err := ds.singleRangeFeed(ctx, active, span, startAfter, token.Desc(), eventCh, cfg, metrics) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) if log.V(1) { - log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", - active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), err) + log.Infof(ctx, "RangeFeed %d@%s (%s) disconnected with last checkpoint %s ago: %v", + token.Desc().RangeID, active.StartAfter, active.Span, timeutil.Since(active.Resolved.GoTime()), err) } active.setLastError(err) - errInfo, err := handleRangefeedError(ctx, err) + errInfo, err := handleRangefeedError(ctx, metrics, err) if err != nil { return err } - metrics.RangefeedRestartRanges.Inc(1) if errInfo.evict { token.Evict(ctx) token = rangecache.EvictionToken{} } + if errInfo.resolveSpan { + // We must release catchup scan reservation prior to attempt to + // re-resolve since this will attempt to acquire 1 or more catchup + // scan reservations. + active.releaseCatchupScan() return divideSpanOnRangeBoundaries(ctx, ds, rs, startAfter, sendSingleRangeInfo(rangeCh)) } } @@ -537,7 +586,11 @@ type rangefeedErrorInfo struct { // handleRangefeedError handles an error that occurred while running rangefeed. // Returns rangefeedErrorInfo describing how the error should be handled for the // range. Returns an error if the entire rangefeed should terminate. -func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, error) { +func handleRangefeedError( + ctx context.Context, metrics *DistSenderRangeFeedMetrics, err error, +) (rangefeedErrorInfo, error) { + metrics.Errors.RangefeedRestartRanges.Inc(1) + if err == nil { return rangefeedErrorInfo{}, nil } @@ -546,11 +599,17 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e case errors.Is(err, io.EOF): // If we got an EOF, treat it as a signal to restart single range feed. return rangefeedErrorInfo{}, nil - case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || - errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): + case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)): // These errors are likely to be unique to the replica that // reported them, so no action is required before the next // retry. + metrics.Errors.StoreNotFound.Inc(1) + return rangefeedErrorInfo{}, nil + case errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): + // These errors are likely to be unique to the replica that + // reported them, so no action is required before the next + // retry. + metrics.Errors.NodeNotFound.Inc(1) return rangefeedErrorInfo{}, nil case errors.Is(err, errRestartStuckRange): // Stuck ranges indicate a bug somewhere in the system. We are being @@ -560,16 +619,23 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e // // The error contains the replica which we were waiting for. log.Warningf(ctx, "restarting stuck rangefeed: %s", err) + metrics.Errors.Stuck.Inc(1) return rangefeedErrorInfo{evict: true}, nil - case IsSendError(err), errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): + case IsSendError(err): + metrics.Errors.SendErrors.Inc(1) + return rangefeedErrorInfo{evict: true}, nil + case errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): + metrics.Errors.RangeNotFound.Inc(1) return rangefeedErrorInfo{evict: true}, nil case errors.HasType(err, (*kvpb.RangeKeyMismatchError)(nil)): + metrics.Errors.RangeKeyMismatch.Inc(1) return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil case errors.HasType(err, (*kvpb.RangeFeedRetryError)(nil)): var t *kvpb.RangeFeedRetryError if ok := errors.As(err, &t); !ok { return rangefeedErrorInfo{}, errors.AssertionFailedf("wrong error type: %T", err) } + metrics.Errors.GetRangeFeedRetryCounter(t.Reason).Inc(1) switch t.Reason { case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, @@ -636,10 +702,6 @@ func newTransportForRange( return ds.transportFactory(opts, ds.nodeDialer, replicas) } -// onRangeEventCb is invoked for each non-error range event. -// nodeID identifies the node ID which generated the event. -type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *kvpb.RangeFeedEvent) - // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and // rangeID. Request is constructed to watch event after specified timestamp, and // with optional diff. If the request corresponds to a system range, request @@ -702,12 +764,11 @@ func defaultStuckRangeThreshold(st *cluster.Settings) func() time.Duration { // request's timestamp if not checkpoints are seen. func (ds *DistSender) singleRangeFeed( ctx context.Context, + active *activeRangeFeed, span roachpb.Span, startAfter hlc.Timestamp, desc *roachpb.RangeDescriptor, - catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, - onRangeEvent onRangeEventCb, cfg rangeFeedConfig, metrics *DistSenderRangeFeedMetrics, ) (_ hlc.Timestamp, retErr error) { @@ -727,22 +788,6 @@ func (ds *DistSender) singleRangeFeed( } defer transport.Release() - // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take - // opportunity to update semaphore limit. - catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, catchupSem, metrics) - if err != nil { - return hlc.Timestamp{}, err - } - - finishCatchupScan := func() { - if catchupRes != nil { - catchupRes.Release() - catchupRes = nil - } - } - // cleanup catchup reservation in case of early termination. - defer finishCatchupScan() - stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, defaultStuckRangeThreshold(ds.st)) defer stuckWatcher.stop() @@ -797,7 +842,7 @@ func (ds *DistSender) singleRangeFeed( }); err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) if stuckWatcher.stuck() { - afterCatchUpScan := catchupRes == nil + afterCatchUpScan := active.catchupRes == nil return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, err @@ -818,8 +863,8 @@ func (ds *DistSender) singleRangeFeed( case *kvpb.RangeFeedCheckpoint: if t.Span.Contains(args.Span) { // If we see the first non-empty checkpoint, we know we're done with the catchup scan. - if !t.ResolvedTS.IsEmpty() && catchupRes != nil { - finishCatchupScan() + if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil { + active.releaseCatchupScan() } // Note that this timestamp means that all rows in the span with // writes at or before the timestamp have now been seen. The @@ -830,19 +875,19 @@ func (ds *DistSender) singleRangeFeed( } case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) - if catchupRes != nil { - metrics.RangefeedErrorCatchup.Inc(1) + if active.catchupRes != nil { + metrics.Errors.RangefeedErrorCatchup.Inc(1) } if stuckWatcher.stuck() { // When the stuck watcher fired, and the rangefeed call is local, // the remote might notice the cancellation first and return from // Recv with an error that we need to special-case here. - afterCatchUpScan := catchupRes == nil + afterCatchUpScan := active.catchupRes == nil return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, t.Error.GoError() } - onRangeEvent(args.Replica.NodeID, desc.RangeID, event) + active.onRangeEvent(args.Replica.NodeID, desc.RangeID, event) select { case eventCh <- msg: @@ -866,7 +911,6 @@ func handleStuckEvent( threshold time.Duration, m *DistSenderRangeFeedMetrics, ) error { - m.RangefeedRestartStuck.Inc(1) if afterCatchupScan { telemetry.Count("rangefeed.stuck.after-catchup-scan") } else { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 2452db21890b..f84a6f57dabf 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -460,7 +460,7 @@ func TestRestartsStuckRangeFeeds(t *testing.T) { closeFeed() require.True(t, blockingClient.ctxCanceled) - require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().RangefeedRestartStuck.Count()) + require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().Errors.Stuck.Count()) } func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { @@ -587,7 +587,7 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { // NB: We really expect exactly 1 but with a 1s timeout, it's not inconceivable that // on a particularly slow CI machine some unrelated rangefeed could also catch the occasional // retry. - require.NotZero(t, ds.Metrics().RangefeedRestartStuck.Count()) + require.NotZero(t, ds.Metrics().Errors.Stuck.Count()) } func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { @@ -682,7 +682,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) { // Upon shutdown, make sure the metrics have correct values. defer func() { require.EqualValues(t, 0, metrics.RangefeedRanges.Value()) - require.EqualValues(t, 0, metrics.RangefeedRestartStuck.Count()) + require.EqualValues(t, 0, metrics.Errors.Stuck.Count()) // We injected numRangesToRetry transient errors during catchup scan. // It is possible however, that we will observe key-mismatch error when restarting @@ -691,8 +691,8 @@ func TestRangeFeedMetricsManagement(t *testing.T) { // When iterating through the entire table span, we pick up correct version. // However, if we attempt to re-resolve single range, we may get incorrect/old // version that was cached. Thus, we occasionally see additional transient restarts. - require.GreaterOrEqual(t, metrics.RangefeedErrorCatchup.Count(), numRangesToRetry) - require.GreaterOrEqual(t, metrics.RangefeedRestartRanges.Count(), numRangesToRetry) + require.GreaterOrEqual(t, metrics.Errors.RangefeedErrorCatchup.Count(), numRangesToRetry) + require.GreaterOrEqual(t, metrics.Errors.RangefeedRestartRanges.Count(), numRangesToRetry) // Even though numCatchupToBlock ranges were blocked in the catchup scan phase, // the counter should be 0 once rangefeed is done. diff --git a/pkg/kv/kvpb/errors.go b/pkg/kv/kvpb/errors.go index 798c5c9439fe..c3b6ad0f8b33 100644 --- a/pkg/kv/kvpb/errors.go +++ b/pkg/kv/kvpb/errors.go @@ -348,6 +348,7 @@ func init() { errors.RegisterTypeMigration(roachpbPath, "*roachpb.RefreshFailedError", &RefreshFailedError{}) errors.RegisterTypeMigration(roachpbPath, "*roachpb.MVCCHistoryMutationError", &MVCCHistoryMutationError{}) errors.RegisterTypeMigration(roachpbPath, "*roachpb.InsufficientSpaceError", &InsufficientSpaceError{}) + } // GoError returns a Go error converted from Error. If the error is a transaction diff --git a/pkg/kv/kvpb/errors.proto b/pkg/kv/kvpb/errors.proto index 9b52cf68b818..6dcfd3504ea9 100644 --- a/pkg/kv/kvpb/errors.proto +++ b/pkg/kv/kvpb/errors.proto @@ -592,6 +592,10 @@ message MergeInProgressError { // because of a range lifecycle event, and can be retried. message RangeFeedRetryError { // Reason specifies what caused the error. + // NB: reason names should be stable because they are + // exposed as rangefeed error counters. + // NB: *never* delete enum values; if retry reason needs to be retired, + // reserve reason value. enum Reason { // The replica was removed from its store. REASON_REPLICA_REMOVED = 0; diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 249c81fe3ba7..b7f0a7fd5bfe 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -7099,6 +7099,7 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_end STRING, resolved STRING, last_event_utc INT, + catchup BOOL, num_errs INT, last_err STRING );`, @@ -7130,6 +7131,7 @@ CREATE TABLE crdb_internal.active_range_feeds ( tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.EndKey)), tree.NewDString(rf.Resolved.AsOfSystemTime()), lastEvent, + tree.MakeDBool(tree.DBool(rf.InCatchup)), tree.NewDInt(tree.DInt(rf.NumErrs)), lastErr, ) diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog index d13f6fb74753..710e2e9a22e9 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog @@ -405,7 +405,7 @@ SELECT id, strip_volatile(descriptor) FROM crdb_internal.kv_catalog_descriptor O 4294967207 {"table": {"columns": [{"id": 1, "name": "id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "database_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "super_region_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "regions", "nullable": true, "type": {"arrayContents": {"family": "StringFamily", "oid": 25}, "arrayElemType": "StringFamily", "family": "ArrayFamily", "oid": 1009}}], "formatVersion": 3, "id": 4294967207, "name": "super_regions", "nextColumnId": 5, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967208 {"table": {"columns": [{"id": 1, "name": "name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "implemented", "nullable": true, "type": {"oid": 16}}], "formatVersion": 3, "id": 4294967208, "name": "pg_catalog_table_is_implemented", "nextColumnId": 3, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967209 {"table": {"columns": [{"id": 1, "name": "tenant_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "total_ru", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 3, "name": "total_read_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "total_read_requests", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 5, "name": "total_write_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "total_write_requests", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "total_sql_pod_seconds", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 8, "name": "total_pgwire_egress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "total_external_io_ingress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "total_external_io_egress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 11, "name": "total_kv_ru", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 12, "name": "total_cross_region_network_ru", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}], "formatVersion": 3, "id": 4294967209, "name": "tenant_usage_details", "nextColumnId": 13, "nextConstraintId": 1, "nextMutationId": 1, "primaryIndex": {"foreignKey": {}, "geoConfig": {}, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1", "viewQuery": "SELECT tenant_id, (j->>'rU')::FLOAT8 AS total_ru, (j->>'readBytes')::INT8 AS total_read_bytes, (j->>'readRequests')::INT8 AS total_read_requests, (j->>'writeBytes')::INT8 AS total_write_bytes, (j->>'writeRequests')::INT8 AS total_write_requests, (j->>'sqlPodsCpuSeconds')::FLOAT8 AS total_sql_pod_seconds, (j->>'pgwireEgressBytes')::INT8 AS total_pgwire_egress_bytes, (j->>'externalIOIngressBytes')::INT8 AS total_external_io_ingress_bytes, (j->>'externalIOIngressBytes')::INT8 AS total_external_io_ingress_bytes, (j->>'kvRU')::FLOAT8 AS total_kv_ru, (j->>'crossRegionNetworkRU')::FLOAT8 AS total_cross_region_network_ru FROM (SELECT tenant_id, crdb_internal.pb_to_json('cockroach.roachpb.TenantConsumption', total_consumption) AS j FROM system.tenant_usage WHERE instance_id = 0)"}} -4294967210 {"table": {"columns": [{"id": 1, "name": "id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "tags", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "startts", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "diff", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "node_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "range_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "created", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "range_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 9, "name": "range_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 10, "name": "resolved", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 11, "name": "last_event_utc", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "num_errs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 13, "name": "last_err", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967210, "name": "active_range_feeds", "nextColumnId": 14, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} +4294967210 {"table": {"columns": [{"id": 1, "name": "id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "tags", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "startts", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "diff", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "node_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "range_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "created", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "range_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 9, "name": "range_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 10, "name": "resolved", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 11, "name": "last_event_utc", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "catchup", "nullable": true, "type": {"oid": 16}}, {"id": 13, "name": "num_errs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 14, "name": "last_err", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967210, "name": "active_range_feeds", "nextColumnId": 15, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967211 {"table": {"columns": [{"id": 1, "name": "database_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "schema_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "role", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "for_all_roles", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "object_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 6, "name": "grantee", "type": {"family": "StringFamily", "oid": 25}}, {"id": 7, "name": "privilege_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 8, "name": "is_grantable", "nullable": true, "type": {"oid": 16}}], "formatVersion": 3, "id": 4294967211, "name": "default_privileges", "nextColumnId": 9, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967212 {"table": {"columns": [{"id": 1, "name": "region", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "zones", "type": {"arrayContents": {"family": "StringFamily", "oid": 25}, "arrayElemType": "StringFamily", "family": "ArrayFamily", "oid": 1009}}], "formatVersion": 3, "id": 4294967212, "name": "regions", "nextColumnId": 3, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967213 {"table": {"columns": [{"id": 1, "name": "trace_id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "node_id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 3, "name": "root_op_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "trace_str", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "jaeger_json", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967213, "indexes": [{"foreignKey": {}, "geoConfig": {}, "id": 2, "interleave": {}, "keyColumnDirections": ["ASC"], "keyColumnIds": [1], "keyColumnNames": ["trace_id"], "name": "cluster_inflight_traces_trace_id_idx", "partitioning": {}, "sharded": {}, "storeColumnIds": [2, 3, 4, 5], "storeColumnNames": ["node_id", "root_op_name", "trace_str", "jaeger_json"], "version": 3}], "name": "cluster_inflight_traces", "nextColumnId": 6, "nextConstraintId": 2, "nextIndexId": 3, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} diff --git a/pkg/util/metric/registry.go b/pkg/util/metric/registry.go index 9e5e82337a49..0986cdef0c51 100644 --- a/pkg/util/metric/registry.go +++ b/pkg/util/metric/registry.go @@ -112,7 +112,7 @@ func (r *Registry) AddMetricStruct(metricStruct interface{}) { } switch vfield.Kind() { - case reflect.Array: + case reflect.Array, reflect.Slice: for i := 0; i < vfield.Len(); i++ { velem := vfield.Index(i) telemName := fmt.Sprintf("%s[%d]", tname, i) @@ -253,12 +253,7 @@ func checkFieldCanBeSkipped( } switch fieldType.Kind() { - case reflect.Slice: - if isMetricType(fieldType.Elem()) { - panicHandler(context.Background(), - "expected array, found slice instead for field %s (%s)", qualifiedFieldName, fieldType) - } - case reflect.Array: + case reflect.Array, reflect.Slice: checkFieldCanBeSkipped(skipReason, fieldName, fieldType.Elem(), parentType) case reflect.Struct: containsMetrics := false diff --git a/pkg/util/metric/registry_test.go b/pkg/util/metric/registry_test.go index d6da103265c6..6c9ed8f354b4 100644 --- a/pkg/util/metric/registry_test.go +++ b/pkg/util/metric/registry_test.go @@ -256,14 +256,14 @@ func TestRegistryPanicsWhenAddingUnexportedMetrics(t *testing.T) { }, ) - // Metric slice is a bug since registry only supports arrays. + // Panics when we have a mix of exported and unexported metrics. require.PanicsWithValue(t, - "expected array, found slice instead for field .unexportedGaugeSlice ([]*metric.Gauge)", + unexportedErr(unnamedStructName, "unexportedSlice"), func() { r.AddMetricStruct(struct { - ExportedArray [1]*Counter - unexportedGaugeSlice []*Gauge - }{[1]*Counter{c}, []*Gauge{g}}) + ExportedGauge *Gauge + unexportedSlice []*Counter + }{g, []*Counter{c}}) }, )