Skip to content

Commit

Permalink
kvcoord: Improve rangefeed observability.
Browse files Browse the repository at this point in the history
Improve rangefeed observability by introducing new counters:
 * `distsender.rangefeed.retry.<reason>`: counter keeping track of the
   number of ranges that ecountered a retryable error of particular type
(e.g. slow counsumer, range split, etc).

Fixes cockroachdb#98842

Release note (ops change): Improve rangefeed observability by adding
additional metrics indicating the reason for rangefeed restart.

# Conflicts:
#	pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
  • Loading branch information
Yevgeniy Miretskiy authored and wenyihu6 committed Jul 23, 2024
1 parent e3581e3 commit 2d763d1
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 26 deletions.
91 changes: 78 additions & 13 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ This counts the number of ranges with an active rangefeed that are performing ca
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartStuck = metric.Metadata{
Name: "distsender.rangefeed.restart_stuck",
Help: `Number of times a rangefeed was restarted due to not receiving ` +
`timely updates (kv.rangefeed.range_stuck_threshold cluster setting)`,
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
)

// CanSendToFollower is used by the DistSender to determine if it needs to look
Expand Down Expand Up @@ -263,9 +256,7 @@ type DistSenderMetrics struct {
type DistSenderRangeFeedMetrics struct {
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
Errors rangeFeedErrorCounters
}

func makeDistSenderMetrics() DistSenderMetrics {
Expand Down Expand Up @@ -300,13 +291,87 @@ func makeDistSenderMetrics() DistSenderMetrics {
return m
}

// rangeFeedErrorCounters are various error related counters for rangefeed.
type rangeFeedErrorCounters struct {
RangefeedRestartRanges *metric.Counter
RangefeedErrorCatchup *metric.Counter
RetryErrors []*metric.Counter
Stuck *metric.Counter
SendErrors *metric.Counter
StoreNotFound *metric.Counter
NodeNotFound *metric.Counter
RangeNotFound *metric.Counter
RangeKeyMismatch *metric.Counter
}

func makeRangeFeedErrorCounters() rangeFeedErrorCounters {
var retryCounters []*metric.Counter
for name := range kvpb.RangeFeedRetryError_Reason_value {
name = strings.TrimPrefix(name, "REASON_")
retryCounters = append(retryCounters, metric.NewCounter(metric.Metadata{
Name: fmt.Sprintf("distsender.rangefeed.retry.%s", strings.ToLower(name)),
Help: fmt.Sprintf(`Number of ranges that encountered retryable %s error`, name),
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}))
}

retryMeta := func(name string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("distsender.rangefeed.retry.%s", strings.ReplaceAll(name, " ", "_")),
Help: fmt.Sprintf("Number of ranges that encountered retryable %s error", name),
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
}

return rangeFeedErrorCounters{
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RetryErrors: retryCounters,
Stuck: metric.NewCounter(retryMeta("stuck")),
SendErrors: metric.NewCounter(retryMeta("send")),
StoreNotFound: metric.NewCounter(retryMeta("store not found")),
NodeNotFound: metric.NewCounter(retryMeta("node not found")),
RangeNotFound: metric.NewCounter(retryMeta("range not found")),
RangeKeyMismatch: metric.NewCounter(retryMeta("range key mismatch")),
}
}

// GetRangeFeedRetryCounter returns retry reason counter for the specified reason.
// Use this method instead of direct counter access (since this method handles
// potential gaps in retry reason values).
func (c rangeFeedErrorCounters) GetRangeFeedRetryCounter(
reason kvpb.RangeFeedRetryError_Reason,
) *metric.Counter {
// Normally, retry reason values are contiguous. One way gaps could be
// introduced, is if some retry reasons are retired (deletions are
// accomplished by reserving enum value to prevent its re-use), and then more
// reason added after. Then, we can't use reason value as an index into
// retryCounters. Because this scenario is believed to be very unlikely, we
// forego any fancy re-mapping schemes, and instead opt for explicit handling.
switch reason {
case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED,
kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
kvpb.RangeFeedRetryError_REASON_RANGE_MERGED,
kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT,
kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING,
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER,
kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER,
kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED:
return c.RetryErrors[reason]
default:
panic(errors.AssertionFailedf("unknown retry reason %d", reason))
}
}

func (rangeFeedErrorCounters) MetricStruct() {}

func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics {
return DistSenderRangeFeedMetrics{
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
Errors: makeRangeFeedErrorCounters(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
case *kvpb.RangeFeedError:
log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError())
if active.catchupRes != nil {
m.metrics.RangefeedErrorCatchup.Inc(1)
m.metrics.Errors.RangefeedErrorCatchup.Inc(1)
}
ms.deleteStream(event.StreamID)
// Restart rangefeed on another goroutine. Restart might be a bit
Expand Down Expand Up @@ -506,7 +506,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeeds(
func (m *rangefeedMuxer) restartActiveRangeFeed(
ctx context.Context, active *activeMuxRangeFeed, reason error,
) error {
m.metrics.RangefeedRestartRanges.Inc(1)
m.metrics.Errors.RangefeedRestartRanges.Inc(1)
active.setLastError(reason)

// Release catchup scan reservation if any -- we will acquire another
Expand Down
28 changes: 22 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,11 @@ type rangefeedErrorInfo struct {
// handleRangefeedError handles an error that occurred while running rangefeed.
// Returns rangefeedErrorInfo describing how the error should be handled for the
// range. Returns an error if the entire rangefeed should terminate.
func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, error) {
func handleRangefeedError(
ctx context.Context, metrics *DistSenderRangeFeedMetrics, err error,
) (rangefeedErrorInfo, error) {
metrics.Errors.RangefeedRestartRanges.Inc(1)

if err == nil {
return rangefeedErrorInfo{}, nil
}
Expand All @@ -550,11 +554,17 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e
case errors.Is(err, io.EOF):
// If we got an EOF, treat it as a signal to restart single range feed.
return rangefeedErrorInfo{}, nil
case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) ||
errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)):
case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)):
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
metrics.Errors.StoreNotFound.Inc(1)
return rangefeedErrorInfo{}, nil
case errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)):
// These errors are likely to be unique to the replica that
// reported them, so no action is required before the next
// retry.
metrics.Errors.NodeNotFound.Inc(1)
return rangefeedErrorInfo{}, nil
case errors.Is(err, errRestartStuckRange):
// Stuck ranges indicate a bug somewhere in the system. We are being
Expand All @@ -564,16 +574,23 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e
//
// The error contains the replica which we were waiting for.
log.Warningf(ctx, "restarting stuck rangefeed: %s", err)
metrics.Errors.Stuck.Inc(1)
return rangefeedErrorInfo{evict: true}, nil
case IsSendError(err):
metrics.Errors.SendErrors.Inc(1)
return rangefeedErrorInfo{evict: true}, nil
case IsSendError(err), errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)):
case errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)):
metrics.Errors.RangeNotFound.Inc(1)
return rangefeedErrorInfo{evict: true}, nil
case errors.HasType(err, (*kvpb.RangeKeyMismatchError)(nil)):
metrics.Errors.RangeKeyMismatch.Inc(1)
return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil
case errors.HasType(err, (*kvpb.RangeFeedRetryError)(nil)):
var t *kvpb.RangeFeedRetryError
if ok := errors.As(err, &t); !ok {
return rangefeedErrorInfo{}, errors.AssertionFailedf("wrong error type: %T", err)
}
metrics.Errors.GetRangeFeedRetryCounter(t.Reason).Inc(1)
switch t.Reason {
case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED,
kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT,
Expand Down Expand Up @@ -834,7 +851,7 @@ func (ds *DistSender) singleRangeFeed(
case *kvpb.RangeFeedError:
log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError())
if catchupRes != nil {
metrics.RangefeedErrorCatchup.Inc(1)
metrics.Errors.RangefeedErrorCatchup.Inc(1)
}
if stuckWatcher.stuck() {
// When the stuck watcher fired, and the rangefeed call is local,
Expand Down Expand Up @@ -869,7 +886,6 @@ func handleStuckEvent(
threshold time.Duration,
m *DistSenderRangeFeedMetrics,
) error {
m.RangefeedRestartStuck.Inc(1)
if afterCatchupScan {
telemetry.Count("rangefeed.stuck.after-catchup-scan")
} else {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestRestartsStuckRangeFeeds(t *testing.T) {
closeFeed()

require.True(t, blockingClient.ctxCanceled)
require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().RangefeedRestartStuck.Count())
require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().Errors.Stuck.Count())
}

func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) {
Expand Down Expand Up @@ -587,7 +587,7 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) {
// NB: We really expect exactly 1 but with a 1s timeout, it's not inconceivable that
// on a particularly slow CI machine some unrelated rangefeed could also catch the occasional
// retry.
require.NotZero(t, ds.Metrics().RangefeedRestartStuck.Count())
require.NotZero(t, ds.Metrics().Errors.Stuck.Count())
}

func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
Expand Down Expand Up @@ -682,7 +682,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
// Upon shutdown, make sure the metrics have correct values.
defer func() {
require.EqualValues(t, 0, metrics.RangefeedRanges.Value())
require.EqualValues(t, 0, metrics.RangefeedRestartStuck.Count())
require.EqualValues(t, 0, metrics.Errors.Stuck.Count())

// We injected numRangesToRetry transient errors during catchup scan.
// It is possible however, that we will observe key-mismatch error when restarting
Expand All @@ -691,8 +691,8 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
// When iterating through the entire table span, we pick up correct version.
// However, if we attempt to re-resolve single range, we may get incorrect/old
// version that was cached. Thus, we occasionally see additional transient restarts.
require.GreaterOrEqual(t, metrics.RangefeedErrorCatchup.Count(), numRangesToRetry)
require.GreaterOrEqual(t, metrics.RangefeedRestartRanges.Count(), numRangesToRetry)
require.GreaterOrEqual(t, metrics.Errors.RangefeedErrorCatchup.Count(), numRangesToRetry)
require.GreaterOrEqual(t, metrics.Errors.RangefeedRestartRanges.Count(), numRangesToRetry)

// Even though numCatchupToBlock ranges were blocked in the catchup scan phase,
// the counter should be 0 once rangefeed is done.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ message MergeInProgressError {
// because of a range lifecycle event, and can be retried.
message RangeFeedRetryError {
// Reason specifies what caused the error.
// NB: reason names should be stable because they are
// exposed as rangefeed error counters.
// NB: *never* delete enum values; if retry reason needs to be retired,
// reserve reason value.
enum Reason {
// The replica was removed from its store.
REASON_REPLICA_REMOVED = 0;
Expand Down

0 comments on commit 2d763d1

Please sign in to comment.