-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rangefeedbuffer: introduce a rangefeed buffer #71225
rangefeedbuffer: introduce a rangefeed buffer #71225
Conversation
de30374
to
d89057e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't had a thorough look yet, but I don't think we can use checkpoint events here as they may not encompass the entire rangefeed span. I think this needs to work on frontier timestamps instead.
Maybe we can change the buffer methods to:
func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedValue) error
func (b *Buffer) Advance(ctx context.Context, TS hlc.Timestamp)
We can then use the OnFrontierAdvance
option from #71256 to carve out a pattern like:
r, err := f.RangeFeed(ctx, ...,
func(ctx context.Context, value *roachpb.RangeFeedValue) {
b.Add(ctx, value)
},
rangefeed.WithOnFrontierAdvance(func(ctx context.Context, ts hlc.Timestamp) {
b.Advance(ctx, ts)
}),
Wdyt?
Reviewable status: complete! 0 of 0 LGTMs obtained
I know, haha, that's what I was getting at here + discussed with you yesterday.
I see you've already typed up #71256, I'll rebase. |
d89057e
to
93d30ad
Compare
93d30ad
to
3858ebf
Compare
3858ebf
to
efc15c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r4.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Quoted 4 lines of code…
value := ev.Val.Value if !value.IsPresent() { value = ev.Val.PrevValue }
what is this case?
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
type entry struct { *roachpb.RangeFeedEvent hlc.Timestamp
Why separate timestamp when it's already a field in the event? Function seems better.
Data structure question: we only care about the ordering when we go to deal with a frontier. It feels like it would be simpler to just sort.Sort
at Flush point and append during execution? That also allows us to end up freeing up the buffer after a burst and to avoid allocating a new one on flush. The flip side is if there are many flushes before a lot of data, you'll sort for each flush, but I really don't care about that case. The bigger reason to do it is that I think it's simpler. Consider:
type rangefeedEvents []*roachpb.RangefeedEvent
func (events rangefeedEvents) Len() int { return len(events) }
func (events rangefeedEvents) Swap(i, j int) { events[i], events[j] = events[j], events[i] }
func (events rangefeedEvents) Less(i, j int) bool { return eventTS(events[i].Timestamp).Less(eventTS(events[j])) }
func eventTS(ev *roachpb.RangefeedEvent) hlc.Timestamp { /* ... */ }
// Add adds the given rangefeed entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedEvent) error {
// ...
b.mu.Lock()
defer b.mu.Unlock()
if value.Timestamp.LessEq(b.mu.frontier) {
// If the rangefeed entry is at a timestamp less than or equal to our
// last known checkpoint, we don't need to record it.
return nil
}
if len(b.mu.events) > b.limit {
return ErrBufferLimitExceeded
}
b.mu.events = append(b.mu.events, ev)
return nil
}
func (b *Buffer) Flush(
ctx context.Context, frontier hlc.Timestamp,
) (events []*roachpb.RangeFeedEvent) {
b.mu.Lock()
defer b.mu.Unlock()
if frontier.Less(b.mu.frontier) {
log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier)
}
sort.Sort(rangefeedEvents(b.mu.events))
idx := sort.Search(len(b.mu.events), func(i int) bool {
return !eventTS(b.mu.events).Less(frontier)
})
events = b.mu.events[:idx]
b.mu.events = b.mu.events[idx:]
b.mu.frontier = frontier
return events
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
value := ev.Val.Value if !value.IsPresent() { value = ev.Val.PrevValue }
what is this case?
Just a way to extract a timestamp for a rangefeed value deletion event.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
Previously, ajwerner wrote…
Why separate timestamp when it's already a field in the event? Function seems better.
Data structure question: we only care about the ordering when we go to deal with a frontier. It feels like it would be simpler to just
sort.Sort
at Flush point and append during execution? That also allows us to end up freeing up the buffer after a burst and to avoid allocating a new one on flush. The flip side is if there are many flushes before a lot of data, you'll sort for each flush, but I really don't care about that case. The bigger reason to do it is that I think it's simpler. Consider:type rangefeedEvents []*roachpb.RangefeedEvent func (events rangefeedEvents) Len() int { return len(events) } func (events rangefeedEvents) Swap(i, j int) { events[i], events[j] = events[j], events[i] } func (events rangefeedEvents) Less(i, j int) bool { return eventTS(events[i].Timestamp).Less(eventTS(events[j])) } func eventTS(ev *roachpb.RangefeedEvent) hlc.Timestamp { /* ... */ } // Add adds the given rangefeed entry to the buffer. func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedEvent) error { // ... b.mu.Lock() defer b.mu.Unlock() if value.Timestamp.LessEq(b.mu.frontier) { // If the rangefeed entry is at a timestamp less than or equal to our // last known checkpoint, we don't need to record it. return nil } if len(b.mu.events) > b.limit { return ErrBufferLimitExceeded } b.mu.events = append(b.mu.events, ev) return nil } func (b *Buffer) Flush( ctx context.Context, frontier hlc.Timestamp, ) (events []*roachpb.RangeFeedEvent) { b.mu.Lock() defer b.mu.Unlock() if frontier.Less(b.mu.frontier) { log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier) } sort.Sort(rangefeedEvents(b.mu.events)) idx := sort.Search(len(b.mu.events), func(i int) bool { return !eventTS(b.mu.events).Less(frontier) }) events = b.mu.events[:idx] b.mu.events = b.mu.events[idx:] b.mu.frontier = frontier return events }
Good idea, done for both. One think I'm not sure about is this bit:
events = b.mu.events[:idx]
b.mu.events = b.mu.events[idx:]
Are we creating an ever-growing slice here, discarding only the prefix when flushing out events lesseq than the frontier timestamp?
efc15c5
to
2d4a1e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Just a way to extract a timestamp for a rangefeed value deletion event.
🤔 the delete should have a timestamp, no?
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Good idea, done for both. One think I'm not sure about is this bit:
events = b.mu.events[:idx] b.mu.events = b.mu.events[idx:]
Are we creating an ever-growing slice here, discarding only the prefix when flushing out events lesseq than the frontier timestamp?
When the slice re-allocs, it'll be a new backing array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
🤔 the delete should have a timestamp, no?
Yea, isn't it ev.Val.PrevValue.Timestamp? That's what this code block was doing.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
Previously, ajwerner wrote…
When the slice re-allocs, it'll be a new backing array.
Ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Yea, isn't it ev.Val.PrevValue.Timestamp? That's what this code block was doing.
I don't think so. PrevValue will be empty unless you explicitly request it. It will also have the timestamp of the value that preceded the deletion.
Can you write a test that exercises the deletion path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r2, 3 of 4 files at r3, 1 of 1 files at r5, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @irfansharif)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
I don't think so. PrevValue will be empty unless you explicitly request it. It will also have the timestamp of the value that preceded the deletion.
Can you write a test that exercises the deletion path?
I'm also confused by this. The Value
should have a timestamp even if it is a deletion tombstone.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
} func eventTS(ev *roachpb.RangeFeedEvent) hlc.Timestamp {
@arulajmani and I were talking yesterday about whether this structure could be used in the SQLWatcher
as well in place of the current eventBuffer
he introduced over there. That use would be a little different in that it is zipping together two rangefeeds, but it has a lot of the same requirements. We discussed whether this buffer should be made slightly more generic to accommodate other uses cases, essentially by replacing *roachpb.RangeFeedEvent
with a type Event interface { Timestamp() hlc.Timestamp }
.
We wanted to get your thoughts on that.
2d4a1e9
to
e0a4e2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Hm, looks like I misunderstood the APIs here; wrote a test at the top-level rangefeed package spelling out what the two timestamps should look like when writing/overwriting/deleting keys.
It will also have the timestamp of the value that preceded the deletion.
Looks like this isn't true? I get an empty timestamp for the value preceding the deletion.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
@arulajmani and I were talking yesterday about whether this structure could be used in the
SQLWatcher
as well in place of the currenteventBuffer
he introduced over there. That use would be a little different in that it is zipping together two rangefeeds, but it has a lot of the same requirements. We discussed whether this buffer should be made slightly more generic to accommodate other uses cases, essentially by replacing*roachpb.RangeFeedEvent
with atype Event interface { Timestamp() hlc.Timestamp }
.We wanted to get your thoughts on that.
I don't mind the idea of hiding away the even through an interface. I imagine then in the SQLWatcher the events we'd store would be the decoded descriptor IDs (decoded as per the right table's schema), which feels cleaner than juggling two instances of this buffer; made that change.
There's little in this library that's now "rangefeed" specific, though I expect it to mostly be used in conjunction with it. Suggestions for where it should be placed or what it should be named?
ac785d0
to
b7a0e52
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
PrevValue will be empty unless you explicitly request it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
In TestRangefeedValueTimestamps I'm specifying
rangefeed.WithDiff()
, I'm seeing the previous values just fine -- the timestamps however are empty.
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 632 to 644 in 0417041
var prevVal roachpb.Value | |
if prevValue != nil { | |
prevVal.RawBytes = prevValue | |
} | |
var event roachpb.RangeFeedEvent | |
event.MustSetValue(&roachpb.RangeFeedValue{ | |
Key: key, | |
Value: roachpb.Value{ | |
RawBytes: value, | |
Timestamp: timestamp, | |
}, | |
PrevValue: prevVal, | |
}) |
Doesn't look like we set a prev-value timestamp above, unlike during the initial scan:
cockroach/pkg/kv/kvclient/rangefeed/rangefeed.go
Lines 258 to 278 in fc803ee
func (f *RangeFeed) maybeRunInitialScan( | |
ctx context.Context, n *log.EveryN, r *retry.Retry, | |
) (canceled bool) { | |
if !f.withInitialScan { | |
return false // canceled | |
} | |
scan := func(kv roachpb.KeyValue) { | |
v := roachpb.RangeFeedValue{ | |
Key: kv.Key, | |
Value: kv.Value, | |
} | |
// Mark the data as occurring at the initial timestamp, which is the | |
// timestamp at which it was read. | |
v.Value.Timestamp = f.initialTimestamp | |
// Supply the value from the scan as also the previous value to avoid | |
// indicating that the value was previously deleted. | |
if f.withDiff { | |
v.PrevValue = v.Value | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 632 to 644 in 0417041
var prevVal roachpb.Value if prevValue != nil { prevVal.RawBytes = prevValue } var event roachpb.RangeFeedEvent event.MustSetValue(&roachpb.RangeFeedValue{ Key: key, Value: roachpb.Value{ RawBytes: value, Timestamp: timestamp, }, PrevValue: prevVal, }) Doesn't look like we set a prev-value timestamp above, unlike during the initial scan:
cockroach/pkg/kv/kvclient/rangefeed/rangefeed.go
Lines 258 to 278 in fc803ee
func (f *RangeFeed) maybeRunInitialScan( ctx context.Context, n *log.EveryN, r *retry.Retry, ) (canceled bool) { if !f.withInitialScan { return false // canceled } scan := func(kv roachpb.KeyValue) { v := roachpb.RangeFeedValue{ Key: kv.Key, Value: kv.Value, } // Mark the data as occurring at the initial timestamp, which is the // timestamp at which it was read. v.Value.Timestamp = f.initialTimestamp // Supply the value from the scan as also the previous value to avoid // indicating that the value was previously deleted. if f.withDiff { v.PrevValue = v.Value }
Ah, guess we didn't expose the PrevValue timestamp. That was probably smart from an API contract perspective.
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 625 to 627 in efb1a95
func (p *Processor) publishValue( | |
ctx context.Context, key roachpb.Key, timestamp hlc.Timestamp, value, prevValue []byte, | |
) { |
More commentary on
cockroach/pkg/roachpb/api.proto
Lines 2384 to 2388 in efb1a95
// prev_value is only populated if both: | |
// 1. with_diff was passed in the corresponding RangeFeedRequest. | |
// 2. the key-value was present and not a deletion tombstone before | |
// this event. | |
Value prev_value = 3 [(gogoproto.nullable) = false]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
Ah, guess we didn't expose the PrevValue timestamp. That was probably smart from an API contract perspective.
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 625 to 627 in efb1a95
func (p *Processor) publishValue( ctx context.Context, key roachpb.Key, timestamp hlc.Timestamp, value, prevValue []byte, ) { More commentary on
probably wouldn't hurt.cockroach/pkg/roachpb/api.proto
Lines 2384 to 2388 in efb1a95
// prev_value is only populated if both: // 1. with_diff was passed in the corresponding RangeFeedRequest. // 2. the key-value was present and not a deletion tombstone before // this event. Value prev_value = 3 [(gogoproto.nullable) = false];
Probably also worth reconciling those behaviors. That certainly was not intentional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I don't mind the idea of hiding away the even through an interface. I imagine then in the SQLWatcher the events we'd store would be the decoded descriptor IDs (decoded as per the right table's schema), which feels cleaner than juggling two instances of this buffer; made that change.
There's little in this library that's now "rangefeed" specific, though I expect it to mostly be used in conjunction with it. Suggestions for where it should be placed or what it should be named?
Another thing that has come up in offline discussions with both @irfansharif and @nvanbenschoten, and I'd like to get more thoughts on here, is how we feel about generalizing this even further so that this can track multiple frontier timestamps. Specifically for the SQLWatcher, this would allow us to push the "combined frontier timestamp" into the buffer (as opposed to tracking it in a layer above). We could extend this interface a bit to make this work.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 75 at r6 (raw file):
// recorded (expected to monotonically increase), and future events with // timestamps less than or equal to it are discarded. func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) {
In the EventBuffer from the SQLWatcher
PR we were de-duplicating events from the buffer when flushing. Is that something we want to do here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTRs so far! Do we think this is close to getting merged? I want to build on top of this for the TODOs I've left over on #69614.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
Probably also worth reconciling those behaviors. That certainly was not intentional.
Added a commit.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Another thing that has come up in offline discussions with both @irfansharif and @nvanbenschoten, and I'd like to get more thoughts on here, is how we feel about generalizing this even further so that this can track multiple frontier timestamps. Specifically for the SQLWatcher, this would allow us to push the "combined frontier timestamp" into the buffer (as opposed to tracking it in a layer above). We could extend this interface a bit to make this work.
I'm 👎 on it, especially if you can simply add as buffer events as decoded IDs. There are no other instances (to my knowledge) where we're maintaining multiple (n=2) rangefeeds to keep track of multiple frontier timestamps, so that sounds overgeneralized. What do you imagine that API would look like? Would it give us much over something like the following:
type ... struct {
rangefeedbuffer.Buffer // accumulates descpb.IDs
zonesFrontierTS, descriptorFrontierTS hlc.Timestamp
}
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 75 at r6 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
In the EventBuffer from the
SQLWatcher
PR we were de-duplicating events from the buffer when flushing. Is that something we want to do here as well?
I had a TODO here, but decided ultimately against it -- easy enough to do at the caller + difficult now that we're taking in an opaque Event interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 7 of 7 files at r6, 4 of 4 files at r7, all commit messages.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @irfansharif)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I'm 👎 on it, especially if you can simply add as buffer events as decoded IDs. There are no other instances (to my knowledge) where we're maintaining multiple (n=2) rangefeeds to keep track of multiple frontier timestamps, so that sounds overgeneralized. What do you imagine that API would look like? Would it give us much over something like the following:
type ... struct { rangefeedbuffer.Buffer // accumulates descpb.IDs zonesFrontierTS, descriptorFrontierTS hlc.Timestamp }
+1 to what Irfan said.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 85 at r6 (raw file):
// Accumulate all events with timestamps <= the given timestamp in sorted // order. sort.Sort(b.mu.events)
I think that this is going to heap allocate because you have the interface implemented on a value. If you implement the interface on *events
and then pass sort.Sort(&b.mu.events)
, you can avoid that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 7 files at r6.
Reviewable status: complete! 2 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
+1 to what Irfan said.
Had written this earlier but forgot to push publish. Seems like we're in agreement:
Leave generalizing refactors for later, when they’re obvious or better understood?
In both cockroachdb#69614 and cockroachdb#69661 we find ourselves reaching for a thin, memory-bounded buffer to sit on top a rangefeed. We want to be able to accumulate raw rangefeed events, and when the rangefeed frontier is bumped, flush everything out en-masse in timestamp sorted order. Package rangefeedbuffer introduces such a datastructure. If we're past the configured memory limit before having observed a frontier bump, we have little recourse -- since we receive checkpoints and rangefeed values on the same stream, we're waiting for a future checkpoint to flush out the accumulated memory. Given this, we simply error out to the caller (expecting them to re-establish the rangefeed). Release note: None Co-authored-by: Arul Ajmani <[email protected]>
99c5df4
to
83acbc5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ Thanks y'all!
bors r+
Reviewable status: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @arulajmani and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 85 at r6 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
I think that this is going to heap allocate because you have the interface implemented on a value. If you implement the interface on
*events
and then passsort.Sort(&b.mu.events)
, you can avoid that.
TIL, you're right:
$ go build -gcflags '-m' github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer
...
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go:86:16: b.mu.events escapes to heap
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 85 at r6 (raw file):
Previously, irfansharif (irfan sharif) wrote…
TIL, you're right:
$ go build -gcflags '-m' github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer ... pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go:86:16: b.mu.events escapes to heap
Done.
an equivalent choice which maybe is worse is to keep the slice for the struct field and cast like: sort.Sort((*events)(&b.mu.events))
. Also, I don't think you need to define the methods on the pointer to defeat the allocation as the pointer inherits the method set of the non-pointer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
What do you imagine that API would look like? Would it give us much over something like the following ...
We could structure it such that tracking of the frontier timestamp tracking is pushed inside this buffer, instead of the caller having to do it at the layer above. What I had in mind was something like:
type Event interface {
Timestamp() hlc.Timestamp
Type() int // key into the array that tracks frontier timestamps
FrontierAdvance() bool // true if the event corresponds to the frontier being advanced.
}
This would allow the SQLWatcher to construct different event types based on where the event was generated at (system.descriptors or system.zones) and not have to individually track the frontiers of these rangefeeds.
All that being said, looks like this isn't super popular, so I'm fine this being structured as is.
CI flaked on #70220. |
Build failed: |
There was a discrepancy with whether or not the previous contained the timestamp of when the previous value was recorded. With the initial scan, we populated the timestamp of the initial scan itself. With regular diffs, we omitted the timestamp. This made for confusing semantics (well, confusing at least for this author). Release note: None
83acbc5
to
d93e7cf
Compare
CI + bazel infra flake: #71679, retrying. bors r+ |
Build succeeded: |
When unmarshaling descriptor protos into their specific types, we want to pass in the MVCC timestamp at which that descriptor was read. Given we receive these protos through the surrounding rangefeed, we want to use the rangefeed event timestamp. We we erroneously using the timestamp found on the rangefeed event's "previous value", which the API guarantees will be the zero timestamp. (This tripped us up before; we added some commentary + tests in cockroachdb#71225 for the rangefeed library to make this clearer.) Release note: None
When unmarshaling descriptor protos into their specific types, we want to pass in the MVCC timestamp at which that descriptor was read. Given we receive these protos through the surrounding rangefeed, we want to use the rangefeed event timestamp. We we erroneously using the timestamp found on the rangefeed event's "previous value", which the API guarantees will be the zero timestamp. (This tripped us up before; we added some commentary + tests in cockroachdb#71225 for the rangefeed library to make this clearer.) Release note: None
73439: spanconfig/sqlwatcher: use the right mvcc timestamp r=irfansharif a=irfansharif When unmarshaling descriptor protos into their specific types, we want to pass in the MVCC timestamp at which that descriptor was read. Given we receive these protos through the surrounding rangefeed, we want to use the rangefeed event timestamp. We we erroneously using the timestamp found on the rangefeed event's "previous value", which the API guarantees will be the zero timestamp. (This tripped us up before; we added some commentary + tests in #71225 for the rangefeed library to make this clearer.) Release note: None Co-authored-by: irfan sharif <[email protected]>
Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It
accumulates raw rangefeed events1, which can be flushed out in timestamp
sorted order en-masse whenever the rangefeed frontier is bumped. If we
accumulate more events than the limit allows for, we error out to the caller.
We need such a thing in both #69614 and #69661.
Release note: None
First commit is from #71256. Co-authored-by: Arul Ajmani [email protected].
Footnotes
Rangefeed error events are propagated to the caller, checkpoint events
are discarded. ↩