Skip to content

Commit

Permalink
kv: Include error information in crdb_internal.active_range_feeds
Browse files Browse the repository at this point in the history
Include error count, and the last error information in
`crdb_internal.active_range_feeds` table whenever rangefeed
disconnects due to an error.

Release justification: observability improvement.
Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Aug 25, 2022
1 parent 8888295 commit 07cb344
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ type PartialRangeFeed struct {
CreatedTime time.Time
LastValueReceived time.Time
Resolved hlc.Timestamp
NumErrs int
LastErr error
}

// ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure.
Expand Down Expand Up @@ -280,6 +282,14 @@ func (a *activeRangeFeed) onRangeEvent(
a.RangeID = rangeID
}

func (a *activeRangeFeed) setLastError(err error) {
a.Lock()
defer a.Unlock()
a.LastErr = errors.Wrapf(err, "disconnect at %s: checkpoint %s/-%s",
timeutil.Now().Format(time.RFC3339), a.Resolved, timeutil.Since(a.Resolved.GoTime()))
a.NumErrs++
}

// rangeFeedRegistry is responsible for keeping track of currently executing
// range feeds.
type rangeFeedRegistry struct {
Expand Down Expand Up @@ -389,6 +399,8 @@ func (ds *DistSender) partialRangeFeed(
startAfter.Forward(maxTS)

if err != nil {
active.setLastError(err)

if log.V(1) {
log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v",
span, timeutil.Since(startAfter.GoTime()), err)
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5664,7 +5664,9 @@ CREATE TABLE crdb_internal.active_range_feeds (
range_start STRING,
range_end STRING,
resolved STRING,
last_event_utc INT
last_event_utc INT,
num_errs INT,
last_err STRING
);`,
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
return p.execCfg.DistSender.ForEachActiveRangeFeed(
Expand All @@ -5675,6 +5677,12 @@ CREATE TABLE crdb_internal.active_range_feeds (
} else {
lastEvent = tree.NewDInt(tree.DInt(rf.LastValueReceived.UTC().UnixNano()))
}
var lastErr tree.Datum
if rf.LastErr == nil {
lastErr = tree.DNull
} else {
lastErr = tree.NewDString(rf.LastErr.Error())
}

return addRow(
tree.NewDInt(tree.DInt(rfCtx.ID)),
Expand All @@ -5688,6 +5696,8 @@ CREATE TABLE crdb_internal.active_range_feeds (
tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.EndKey)),
tree.NewDString(rf.Resolved.AsOfSystemTime()),
lastEvent,
tree.NewDInt(tree.DInt(rf.NumErrs)),
lastErr,
)
},
)
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/create_statements
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ CREATE TABLE crdb_internal.active_range_feeds (
range_start STRING NULL,
range_end STRING NULL,
resolved STRING NULL,
last_event_utc INT8 NULL
last_event_utc INT8 NULL,
num_errs INT8 NULL,
last_err STRING NULL
) CREATE TABLE crdb_internal.active_range_feeds (
id INT8 NULL,
tags STRING NULL,
Expand All @@ -55,7 +57,9 @@ CREATE TABLE crdb_internal.active_range_feeds (
range_start STRING NULL,
range_end STRING NULL,
resolved STRING NULL,
last_event_utc INT8 NULL
last_event_utc INT8 NULL,
num_errs INT8 NULL,
last_err STRING NULL
) {} {}
CREATE TABLE crdb_internal.backward_dependencies (
descriptor_id INT8 NULL,
Expand Down

0 comments on commit 07cb344

Please sign in to comment.