Skip to content

Commit

Permalink
rangefeed: add frontier visitor option to client
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed Mar 26, 2024
1 parent 7ff7978 commit cca034d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
20 changes: 20 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/span"
)

// Option configures a RangeFeed.
Expand All @@ -41,6 +42,7 @@ type config struct {
onUnrecoverableError OnUnrecoverableError
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
frontierVisitor FrontierSpanVisitor
onSSTable OnSSTable
onDeleteRange OnDeleteRange
extraPProfLabels []string
Expand Down Expand Up @@ -222,6 +224,24 @@ func WithOnFrontierAdvance(f OnFrontierAdvance) Option {
})
}

// VisitableFrontier is the subset of the span.Frontier interface required to
// inspect the content of the frontier.
type VisitableFrontier interface {
Entries(span.Operation)
}

// FrontierSpanVisitor is called when the FrontierSpanVisitTrigger requests the
// frontier be visited after a checkpoint.
type FrontierSpanVisitor func(ctx context.Context, advanced bool, frontier VisitableFrontier)

// WithFrontierSpanVisitor sets up a callback to optionally inspect the frontier
// after a checkpoint is processed.
func WithFrontierSpanVisitor(fn FrontierSpanVisitor) Option {
return optionFunc(func(c *config) {
c.frontierVisitor = fn
})
}

func initConfig(c *config, options []Option) {
*c = config{} // the default config is its zero value
for _, o := range options {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ func (f *RangeFeed) processEvents(
if advanced && f.onFrontierAdvance != nil {
f.onFrontierAdvance(ctx, frontier.Frontier())
}
if f.frontierVisitor != nil {
f.frontierVisitor(ctx, advanced, frontier)
}
case ev.SST != nil:
if f.onSSTable == nil {
return errors.AssertionFailedf(
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvclient/rangefeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func (f *RangeFeed) getSpansToScan(ctx context.Context) (func() []roachpb.Span,
return err
}
}
_, err := frontier.Forward(sp, f.initialTimestamp)
advanced, err := frontier.Forward(sp, f.initialTimestamp)
if f.frontierVisitor != nil {
f.frontierVisitor(ctx, advanced, frontier)
}

return err
}

Expand Down

0 comments on commit cca034d

Please sign in to comment.