From cca034dda3c81f8c97dd7a90e3a2d400c2db26a8 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 22 Mar 2024 13:37:17 +0000 Subject: [PATCH] rangefeed: add frontier visitor option to client Release note: none. Epic: none. --- pkg/kv/kvclient/rangefeed/config.go | 20 ++++++++++++++++++++ pkg/kv/kvclient/rangefeed/rangefeed.go | 3 +++ pkg/kv/kvclient/rangefeed/scanner.go | 6 +++++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 38ef66c4c6e7..9d83262d76b8 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/span" ) // Option configures a RangeFeed. @@ -41,6 +42,7 @@ type config struct { onUnrecoverableError OnUnrecoverableError onCheckpoint OnCheckpoint onFrontierAdvance OnFrontierAdvance + frontierVisitor FrontierSpanVisitor onSSTable OnSSTable onDeleteRange OnDeleteRange extraPProfLabels []string @@ -222,6 +224,24 @@ func WithOnFrontierAdvance(f OnFrontierAdvance) Option { }) } +// VisitableFrontier is the subset of the span.Frontier interface required to +// inspect the content of the frontier. +type VisitableFrontier interface { + Entries(span.Operation) +} + +// FrontierSpanVisitor is called when the FrontierSpanVisitTrigger requests the +// frontier be visited after a checkpoint. +type FrontierSpanVisitor func(ctx context.Context, advanced bool, frontier VisitableFrontier) + +// WithFrontierSpanVisitor sets up a callback to optionally inspect the frontier +// after a checkpoint is processed. +func WithFrontierSpanVisitor(fn FrontierSpanVisitor) Option { + return optionFunc(func(c *config) { + c.frontierVisitor = fn + }) +} + func initConfig(c *config, options []Option) { *c = config{} // the default config is its zero value for _, o := range options { diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 679b79bc0729..9455037f901b 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -372,6 +372,9 @@ func (f *RangeFeed) processEvents( if advanced && f.onFrontierAdvance != nil { f.onFrontierAdvance(ctx, frontier.Frontier()) } + if f.frontierVisitor != nil { + f.frontierVisitor(ctx, advanced, frontier) + } case ev.SST != nil: if f.onSSTable == nil { return errors.AssertionFailedf( diff --git a/pkg/kv/kvclient/rangefeed/scanner.go b/pkg/kv/kvclient/rangefeed/scanner.go index 7583065a4a84..3202f251f3fb 100644 --- a/pkg/kv/kvclient/rangefeed/scanner.go +++ b/pkg/kv/kvclient/rangefeed/scanner.go @@ -112,7 +112,11 @@ func (f *RangeFeed) getSpansToScan(ctx context.Context) (func() []roachpb.Span, return err } } - _, err := frontier.Forward(sp, f.initialTimestamp) + advanced, err := frontier.Forward(sp, f.initialTimestamp) + if f.frontierVisitor != nil { + f.frontierVisitor(ctx, advanced, frontier) + } + return err }