Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
119324: kvfeed: add more code documentation to rangefeed client r=jayshrivastava,miraradeva,rharding6373 a=andyyang890

Epic: None

Release note: None


119404: workflows: add GitHub actions check generated code job r=rail a=rickystewart

Epic: CRDB-8308
Release note: None

Co-authored-by: Andy Yang <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
3 people committed Feb 20, 2024
3 parents 84ca65f + c5a616c + 558274e commit 1edea8c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
33 changes: 21 additions & 12 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type physicalFeedFactory interface {
Run(ctx context.Context, sink kvevent.Writer, cfg rangeFeedConfig) error
}

// rangeFeedConfig contains configuration options for creating a rangefeed.
// It provides an abstraction over the actual rangefeed API.
type rangeFeedConfig struct {
Frontier hlc.Timestamp
Spans []kvcoord.SpanTimePair
Expand All @@ -35,20 +37,27 @@ type rangeFeedConfig struct {
Knobs TestingKnobs
}

// rangefeedFactory is a function that creates and runs a rangefeed.
type rangefeedFactory func(
ctx context.Context,
spans []kvcoord.SpanTimePair,
eventC chan<- kvcoord.RangeFeedMessage,
eventCh chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error

// rangefeed tracks a running rangefeed and facilitates conversion from
// kvcoord.RangeFeedMessage's to kvevent.Event's.
type rangefeed struct {
// memBuf is the buffer that converted kvevent.Event's will be written to.
memBuf kvevent.Writer
cfg rangeFeedConfig
eventC chan kvcoord.RangeFeedMessage
knobs TestingKnobs
// eventCh is a receive-only channel corresponding to the send-only channel
// that the rangefeed uses to send event messages to.
eventCh <-chan kvcoord.RangeFeedMessage
knobs TestingKnobs
}

// Run implements the physicalFeedFactory interface.
func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rangeFeedConfig) error {
// To avoid blocking raft, RangeFeed puts all entries in a server side
// buffer. But to keep things simple, it's a small fixed-sized buffer. This
Expand All @@ -67,11 +76,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
// `SchemaFeed` is responsible for detecting and enforcing these , but the
// after-KVFeed buffer doesn't have access to any of this state. A cleanup is
// in order.
eventCh := make(chan kvcoord.RangeFeedMessage, 128)
feed := rangefeed{
memBuf: sink,
cfg: cfg,
eventC: make(chan kvcoord.RangeFeedMessage, 128),
knobs: cfg.Knobs,
memBuf: sink,
cfg: cfg,
eventCh: eventCh,
knobs: cfg.Knobs,
}
g := ctxgroup.WithContext(ctx)
g.GoCtx(feed.addEventsToBuffer)
Expand All @@ -90,18 +100,17 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
}

g.GoCtx(func(ctx context.Context) error {
return p(ctx, cfg.Spans, feed.eventC, rfOpts...)
return p(ctx, cfg.Spans, eventCh, rfOpts...)
})
return g.Wait()
}

// addEventsToBuffer consumes rangefeed events from `p.eventC`, transforms
// them to changfeed events and push onto `p.memBuf`.
// `p.memBuf`.
// addEventsToBuffer consumes rangefeed events from `p.eventCh`, transforms
// them to kvevent.Event's, and pushes them into `p.memBuf`.
func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
for {
select {
case e := <-p.eventC:
case e := <-p.eventCh:
switch t := e.GetValue().(type) {
case *kvpb.RangeFeedValue:
if p.cfg.Knobs.OnRangeFeedValue != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ go_test(
],
embed = [":sql"],
exec_properties = {
"test.Pool": "heavy",
"Pool": "heavy",
},
shard_count = 16,
deps = [
Expand Down

0 comments on commit 1edea8c

Please sign in to comment.