-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
spanconfig: introduce spanconfig.KVSubscriber #69614
spanconfig: introduce spanconfig.KVSubscriber #69614
Conversation
ed1705e
to
a5e850f
Compare
a5e850f
to
9372967
Compare
cbecf62
to
bced2ec
Compare
In cockroachdb#69172 we introduced a spanconfig.StoreReader interface to abstract away the gossiped system config span. We motivated that PR by teasing a future implementation of the same interface, an in-memory data structure to maintain a mapping between between spans and configs (powered through a view over system.span_configurations introduced in \cockroachdb#69047). This PR introduces just that. Intended (future) usages: - cockroachdb#69614 introduces the KVWatcher interface, listening in on system.span_configurations. The updates generated by it will be used to populate per-store instantiations of this data structure, with an eye towards providing a "drop-in" replacement of the gossiped system config span (conveniently implementing the sibling spanconfig.StoreReader interface). - cockroachdb#69661 introduces the SQLWatcher interface, listening in on changes to system.{descriptor,zones} and generating denormalized span config updates for every descriptor/zone config change. These updates will need to be diffed against a spanconfig.StoreWriter populated with the existing contents of KVAccessor to generate the "targeted" diffs KVAccessor expects. Release note: None
bced2ec
to
861be9c
Compare
861be9c
to
6ef1291
Compare
In cockroachdb#69172 we introduced a spanconfig.StoreReader interface to abstract away the gossiped system config span. We motivated that PR by teasing a future implementation of the same interface, an in-memory data structure to maintain a mapping between between spans and configs (powered through a view over system.span_configurations introduced in \cockroachdb#69047). This PR introduces just that. Intended (future) usages: - cockroachdb#69614 introduces the KVWatcher interface, listening in on system.span_configurations. The updates generated by it will be used to populate per-store instantiations of this data structure, with an eye towards providing a "drop-in" replacement of the gossiped system config span (conveniently implementing the sibling spanconfig.StoreReader interface). - cockroachdb#69661 introduces the SQLWatcher interface, listening in on changes to system.{descriptor,zones} and generating denormalized span config updates for every descriptor/zone config change. These updates will need to be diffed against a spanconfig.StoreWriter populated with the existing contents of KVAccessor to generate the "targeted" diffs KVAccessor expects. Release note: None
Grafted from cockroachdb#69269. This seems like a useful primitive for users of this library. We intend to use it in cockroachdb#69661 and cockroachdb#69614. Release note: None Co-authored-by: irfan sharif <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll (timidly) push back again against getting rid of lazy binding for the reasons outlined below, at best it's simplifying a nil check and a boolean and makes for what I think is a more awkward interface to mock out. The latter is something I'm trying to do in #71994, which is where the pushback stems from.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):
Now that you've pushed the retrying into the Subscribe call, would a mocked impl simply be able to return?
No, it'll still have to a blocking call, right? Unless we want Subscribe to fire off an async task internally -- which is more surprising for the many reasons discussed elsewhere.
I'm also not sure what lazily binding this handler gets us, considering we only ever allow a single handler.
The lazy binding is not related to having a single handler, it's to do with not wanting to bunch together code dealing with the lifecycle the subscription and code that wants to learn about updates. The mock impl considerations are another aspect -- what does it mean to mock the interface that's providing you a retry option?
I still don't really see the complexity around lazy binding, everything else in this package was more intricate and that's where all my bugs were when writing tests. Really, it's one a nil check to see if a handler is available and a single bool.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 186 at r27 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
This could use a comment
Added to lastFrontierTS.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 308 at r27 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Do you only need this in the else case?
Yea, that works, done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 317 at r27 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Might be worth calling out why we're using initialTS here as the timestamp to flush at, considering it's referencing behaviour in the
rangefeed
package.
Renamed it to initialScanTS instead.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 327 at r27 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Consider pulling this code out into a function so that you can call it both from here and above the very first time the handler is called.
Meh, it's two lines. I prefer the inline commentary.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 310 at r28 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
This could use a comment
Added below + at the type level.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 390 at r29 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
nit: clarify that only one handler is ever allowed?
Specified in the interface commentary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to chat about this offline, I may not be appreciating fully why we want to keep the lifecycle of subscription and learning about updates separate. I do think it buys us a bit more than simplifying a nil check though as it reduces the state this thing needs to track, which makes it easier to reason about.
The latter is something I'm trying to do in #71994, which is where the pushback stems from.
Are you referring to TestSpanConfigUpdateAppliedToReplica
here or something else? If that's the one, then I think you could make it work even with the interface suggestion below.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):
I agree, there's not much code complexity around the lazy binding. To me the complexity stems from the fact that it isn't completely obvious to me why these distinct phases exist (as a first time reader of this code).
I could buy that you don't want to bunch together the code that deals with the lifecycle of the subscription and the code that wants to learns about updates, but then I'd expect us to be registering/de-registering multiple handlers while maintaining the rangefeed. We currently register only one handler and it is never deregistered. Do we see that changing in the future? If not, is there a benefit to having this separation?
No, it'll still have to a blocking call, right? Unless we want Subscribe to fire off an async task internally -- which is more surprising for the many reasons discussed elsewhere.
I wasn't suggesting that Subscribe fire off an async task internally. (I think) earlier you had retry code in the server package that retried this thing in a goroutine. Now that the retry code has been moved inside the kvsubscriber
package, a mock implementation can simply no-op. I think I may have misunderstood what you meant above?
5b630b6
to
684bf0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, TestSpanConfigUpdateAppliedToReplica from that this branch is right. I'm still not following your suggestion, can you show what that test would look like if we did this differently?
I may not be appreciating fully why we want to keep the lifecycle of subscription and learning about updates separate
Because I feel they're separate things that benefit from the separation. I fear we're going in circles, I don't really have reasons other than ones we've already discussed below. Does this feel like a hard blocker? If not, I'll #meal-meetups expense you a beer for the rubber stamp.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):
We currently register only one handler and it is never deregistered. Do we see that changing in the future?
I don't see it changing in the future, no. If it does I'm happy to revisit.
a mock implementation can simply no-op. I think I may have misunderstood what you meant above?
But we want the handler to be called in the same goroutine as Subscribe, so it has to be blocking -- I'm not sure what "simply no-op" means. I'd welcome a code example of the test we're discussing above.
3bf4a64
to
46538ef
Compare
Pulled the stopper async task back into Subscribe (I'd benefit from a rule of thumb for when we should vs. not), and introduced a tiny type handler struct {
initialized bool // tracks whether we need to invoke with a [min,max) span first
fn func(update roachpb.Span)
}
func (h handler) invoke(update roachpb.Span) {
if !h.initialized {
h.fn(keys.EverythingSpan)
h.initialized = true
if update.Equal(keys.EverythingSpan) {
return // we can opportunistically avoid re-invoking with the same update
}
}
h.fn(update)
} At the callers, it's then simply: for _, h := range handlers {
for _, ev := range events {
h.invoke(ev.(*bufferEvent).Update.Span)
}
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comments are largely minor or superficial. This is looking good.
aab7691
to
3514e3c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor/cleanup comments from me as well.
Reviewed 1 of 4 files at r27, 1 of 9 files at r28.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):
// KVSubscriber is used to subscribe to global span configuration changes. It's // a concrete implementation of the spanconfig.KVSubscriber interface. //
It might be worth explicitly adding words around the lifecycle of this thing. Maybe a summary of what we discussed in yesterday's pod meeting to get here? Specifically, that we expect there to be a singleton KVSubscriber started per node that establishes and maintains the lifecycle of rangefeeds internally. It also allows all stores on that node to subscribe to to updates to particular spans by registering handlers.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 47 at r34 (raw file):
// updates and flushing them out en-masse in timestamp order when the rangefeed // frontier is bumped[2]. If the buffer overflows (as dictated by the memory // limit the KVSubscriber is instantiated with), the subscriber is wound down and
Should this say "subscriber retries" internally now?
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 50 at r34 (raw file):
// an appropriate error is returned to the caller. // // When running into the errors above, it's safe for the caller to re-subscribe
This needs an update
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 67 at r34 (raw file):
// // TODO(irfansharif): When swapping the old spanconfig.StoreReader for the new, // instead of informing callers with an everything [min,max) span, we could diff
nit: s/callers/registered handlers/
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 75 at r34 (raw file):
// getting split. When applying these updates, we need to make sure to // process the deletion event for S before processing S1...SN. // [2]: In our example above deleting the config for S and adding configs for S1...Nwe
Nwe?
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):
// lest we expose the intermediate state where the config for S was deleted // but the configs for S1...SN were not yet applied. // [3]: TODO(irfansharif): When tearing down the subscriber due to underlying errors,
Now that we're doing this internally, should we address this TODO? Or, if not, move it inline?
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 173 at r34 (raw file):
// the exported StoreReader will be up-to-date and continue to be // incrementally maintained. func (s *KVSubscriber) Subscribe(ctx context.Context) error {
In line with Andrew's comment about renaming OnSpanConfigUpdate
to Subscribe
, let's rename this thing to Start
?
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 199 at r34 (raw file):
Previously, ajwerner wrote…
nit: how do you feel about calling this
run
orrunRangefeed
?
- 1 on
Start
/run
pattern we have in other places as well.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 217 at r34 (raw file):
defer func() { mu.Lock() s.lastFrontierTS = mu.frontierTS
Should this be a s.lastFrontierTS.Forwad(mu.frontierTS)
instead?
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 375 at r34 (raw file):
Previously, ajwerner wrote…
nit: I could see renaming this method to
Subscribe
- 1
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):
// TestingSubscribeInner exports the inner subscription route for testing // purposes. func (s *KVSubscriber) TestingSubscribeInner(ctx context.Context) error {
Move this method to a testutils
file instead.
3514e3c
to
31fded2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What in this change obviates the need to do this?
This was a stand alone commit unrelated to anything else in the PR. It's actually just dead code like mentioned in the commit message. Was discussed in this thread: https://reviewable.io/reviews/cockroachdb/cockroach/69614#-MlLiZryEFeTuS3XXoq2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvserver/store.go, line 2010 at r36 (raw file):
Previously, ajwerner wrote…
We've got the
RSpan
now, may as well use it:!sp.ContainsKey(startKey)
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
It might be worth explicitly adding words around the lifecycle of this thing. Maybe a summary of what we discussed in yesterday's pod meeting to get here? Specifically, that we expect there to be a singleton KVSubscriber started per node that establishes and maintains the lifecycle of rangefeeds internally. It also allows all stores on that node to subscribe to to updates to particular spans by registering handlers.
I feel like how this thing is expecting to get used is not commentary well suited here. A lot of the text below talks about the lifecycle, and looking the uses it's (I hope) clear how it's being used -- "started at the node level once" and "subscribed to by each store".
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 47 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Should this say "subscriber retries" internally now?
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 50 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
This needs an update
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 75 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Nwe?
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Now that we're doing this internally, should we address this TODO? Or, if not, move it inline?
I'm fine with just leaving the TODO here.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 173 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
In line with Andrew's comment about renaming
OnSpanConfigUpdate
toSubscribe
, let's rename this thing toStart
?
Already done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 178 at r34 (raw file):
Previously, ajwerner wrote…
I was thinking something like:
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { const aWhile = 5 * time.Minute // arbitrary but much longer than a retry started := timeutil.Now() if err := s.run(ctx); err != nil { if errors.Is(err, context.Canceled) { return // we're done here } ranFor := timeutil.Since(start) if ranFor > aWhile { r.Reset() } log.Warningf(ctx, "spanconfig-kvsubscriber failed with %v after %v, retrying...", err, ranFor) continue }
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 199 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
- 1 on
Start
/run
pattern we have in other places as well.
Already done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 217 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Should this be a
s.lastFrontierTS.Forwad(mu.frontierTS)
instead?
Sure, done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 375 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
- 1
Already done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 188 at r36 (raw file):
Previously, ajwerner wrote…
what is the no-error return from
s.run
? Is that the context being canceled? It's not totally obvious. Perhaps add some more commentary and note that case on therun
comment?
Done (was already mentioned in run
).
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Move this method to a
testutils
file instead.
Does this apply? I'm exporting a package internal method.
31fded2
to
dcfa5c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/spanconfig/spanconfig.go, line 51 at r37 (raw file):
// callers can subscribe to learn about what key spans may have seen a // configuration change. After learning about a span update through a callback // invocation, callers can consult the embedded StoreReader to retrieve an
s/callers/subscribers/ (in a couple of places below as well).
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I feel like how this thing is expecting to get used is not commentary well suited here. A lot of the text below talks about the lifecycle, and looking the uses it's (I hope) clear how it's being used -- "started at the node level once" and "subscribed to by each store".
IMO the details about the lifecycle are getting lost in some of the details below. I'd strongly encourage you to write about the two phases (start and subscribe) and why they exist, even if you do it in general terms rather than talking about specific callers :)
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I'm fine with just leaving the TODO here.
Okay. It's still worth updating the TODO to talk about internally retrying from the last saved checkpoint as opposed to surfacing it to the callers of this function.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 97 at r37 (raw file):
knobs *spanconfig.TestingKnobs subscribed int32 // accessed atomically
s/subscribed/started
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Does this apply? I'm exporting a package internal method.
I would've expected kvsubscriber_test.go
to have unit tests and testutils.go
to have wrappers around internal methods we want to export for testing purposes.
KVSubscriber presents a consistent[^1] snapshot of a spanconfig.StoreReader that's incrementally maintained with changes made to the global span configurations state. The maintenance happens transparently; callers can subscribe to learn about what key spans may have seen a configuration change. After learning about a span update, consulting the embedded StoreReader would retrieve an up-to-date[^2] config for it. When a callback is first installed, it's invoked with the [min,max) span -- a shorthand to indicate that subscribers should consult the StoreReader for all spans of interest. Subsequent updates are of the more incremental kind. It's possible that the span updates received are no-ops, i.e. consulting the StoreReader for the given span would retrieve the last config observed for the span[^2]. type KVSubscriber interface { StoreReader Subscribe(func(updated roachpb.Span)) } It's expected to Start-ed once, after which one or many subscribers can listen in for updates. Internally we maintain a rangefeed over the global store of span configurations (system.span_configurations), applying updates from it into an embedded spanconfig.Store. A read-only view of this data structure (spanconfig.StoreReader) is exposed as part of the KVSubscriber interface. Rangefeeds used as is don't offer any ordering guarantees with respect to updates made over non-overlapping keys, which is something we care about[^4]. For that reason we make use of a rangefeed buffer, accumulating raw rangefeed updates and flushing them out en-masse in timestamp order when the rangefeed frontier is bumped[^5]. If the buffer overflows (as dictated by the memory limit the KVSubscriber is instantiated with), the old rangefeed is wound down and a new one re-established. When running into the internal errors described above, it's safe for us to re-establish the underlying rangefeeds. When re-establishing a new rangefeed and populating a spanconfig.Store using the contents of the initial scan[3], we wish to preserve the existing spanconfig.StoreReader. Discarding it would entail either blocking all external readers until a new spanconfig.StoreReader was fully populated, or presenting an inconsistent view of the spanconfig.Store that's currently being populated. For new rangefeeds what we do then is route all updates from the initial scan to a fresh spanconfig.Store, and once the initial scan is done, swap at the source for the exported spanconfig.StoreReader. During the initial scan, concurrent readers would continue to observe the last spanconfig.StoreReader if any. After the swap, it would observe the more up-to-date source instead. Future incremental updates will also target the new source. When this source swap occurs, we inform the handler of the need to possibly refresh its view of all configs. This commit also wires up the KVSubscriber into KV stores, replacing the use of the gossiped system config span (possible given the StoreReader interface, only happens if a testing flag/env var is set). [^1]: The contents of the StoreReader at t1 corresponds exactly to the contents of the global span configuration state at t0 where t0 <= t1. If the StoreReader is read from at t2 where t2 > t1, it's guaranteed to observe a view of the global state at t >= t0. [^2]: For the canonical KVSubscriber implementation, this is typically the closed timestamp target duration. [^3]: The canonical KVSubscriber implementation internally re-establishes feeds when errors occur, possibly re-transmitting earlier updates (usually through a lazy [min,max) span) despite possibly not needing to. We could do a bit better and diff the two data structures, emitting only targeted updates. [^4]: For a given key k, it's config may be stored as part of a larger span S (where S.start <= k < S.end). It's possible for S to get deleted and replaced with sub-spans S1...SN in the same transaction if the span is getting split. When applying these updates, we need to make sure to process the deletion event for S before processing S1...SN. [^5]: In our example above deleting the config for S and adding configs for S1...SN, we want to make sure that we apply the full set of updates all at once -- lest we expose the intermediate state where the config for S was deleted but the configs for S1...SN were not yet applied. [^6]: When tearing down the subscriber due to underlying errors, we could also surface a checkpoint to use the next time the subscriber is established. That way we can avoid the full initial scan over the span configuration state and simply pick up where we left off with our existing spanconfig.Store. Release note: None
dcfa5c9
to
89f8aba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks y'all! I'm sure this felt like a doozy.
bors r+
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/spanconfig/spanconfig.go, line 51 at r37 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
s/callers/subscribers/ (in a couple of places below as well).
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
IMO the details about the lifecycle are getting lost in some of the details below. I'd strongly encourage you to write about the two phases (start and subscribe) and why they exist, even if you do it in general terms rather than talking about specific callers :)
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Okay. It's still worth updating the TODO to talk about internally retrying from the last saved checkpoint as opposed to surfacing it to the callers of this function.
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 97 at r37 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
s/subscribed/started
Done.
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
I would've expected
kvsubscriber_test.go
to have unit tests andtestutils.go
to have wrappers around internal methods we want to export for testing purposes.
It's not atypical to export test-only symbols in a _test package, which is why I did it this way (testutils.go would surface this symbol in the godoc and your editor unless it's testutils_test.go 🤷♀️).
Build succeeded: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 2 files at r29, 6 of 10 files at r35, 3 of 4 files at r37, 2 of 2 files at r38, all commit messages.
Reviewable status: complete! 1 of 0 LGTMs obtained (and 1 stale)
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 336 at r38 (raw file):
events := buffer.Flush(ctx, frontierTS) s.mu.Lock()
It would have been nice to have added some commentary about this locking. It's pretty subtle, but as we've found, also very important. Maybe we can add a comment next time we're working around here?
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 418 at r38 (raw file):
} type handler struct {
Much nicer!
pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 428 at r38 (raw file):
h.initialized = true if update.Equal(keys.EverythingSpan) {
Out of curiosity, why can't we always return on this path? Do we ever need the second call to fn if we just called it with keys.EverythingSpan
?
Congrats on landing this. It came out nicely. |
KVSubscriber presents a consistent1 snapshot of a spanconfig.StoreReader that's incrementally maintained with changes made to the global span configurations state. The maintenance happens transparently; callers can subscribe to learn about what key spans may have seen a configuration change. After learning about a span update, consulting the embedded StoreReader would retrieve an up-to-date2 config for it.
When a callback is first installed, it's invoked with the [min,max) span -- a shorthand to indicate that callers should consult the StoreReader for all spans of interest. Subsequent updates are of the more incremental kind. It's possible that the span updates received are no-ops, i.e. consulting the StoreReader for the given span would retrieve the last config observed for the span2.
Internally we maintain a rangefeed over the global store of span configurations (system.span_configurations), applying updates from it into an embedded spanconfig.Store. A read-only view of this data structure (spanconfig.StoreReader) is exposed as part of the KVSubscriber interface. Rangefeeds used as is don't offer any ordering guarantees with respect to updates made over non-overlapping keys, which is something we care about3. For that reason we make use of a rangefeed buffer, accumulating raw rangefeed updates and flushing them out en-masse in timestamp order when the rangefeed frontier is bumped4. If the buffer overflows (as dictated by the memory limit the KVSubscriber is instantiated with), the subscriber is wound down and an appropriate error is returned to the caller.
When running into the errors above, it's safe for the caller to re-subscribe to effectively re-establish the underlying rangefeeds. When re-establishing a new rangefeed and populating a spanconfig.Store using the contents of the initial scan5, we wish to preserve the existing spanconfig.StoreReader. Discarding it would entail either blocking all external readers until a new spanconfig.StoreReader was fully populated, or presenting an inconsistent view of the spanconfig.Store that's currently being populated. For new rangefeeds what we do then is route all updates from the initial scan to a fresh spanconfig.Store, and once the initial scan is done, swap at the source for the exported spanconfig.StoreReader. During the initial scan, concurrent readers would continue to observe the last spanconfig.StoreReader if any. After the swap, it would observe the more up-to-date source instead. Future incremental updates will also target the new source. When this source swap occurs, we inform handlers of the need to possibly refresh their view of all configs.
This commit also wires up the KVSubscriber into KV stores, replacing the use of the gossiped system config span (possible given the StoreReader interface, only happens if a testing flag/env var is set).
Release note: None
Footnotes
The contents of the StoreReader at t1 corresponds exactly to the contents of the global span configuration state at t0 where t0 <= t1. If the StoreReader is read from at t2 where t2 > t1, it's guaranteed to observe a view of the global state at t >= t0. ↩
For the canonical KVSubscriber implementation, this is typically the closed timestamp target duration. ↩ ↩2
For a given key k, it's config may be stored as part of a larger span S (where S.start <= k < S.end). It's possible for S to get deleted and replaced with sub-spans S1...SN in the same transaction if the span is getting split. When applying these updates, we need to make sure to process the deletion event for S before processing S1...SN. ↩
In our example above deleting the config for S and adding configs for S1...Nwe want to make sure that we apply the full set of updates all at once -- lest we expose the intermediate state where the config for S was deleted but the configs for S1...SN were not yet applied. ↩
When tearing down the subscriber due to underlying errors, we could also surface a checkpoint to use the next time the subscriber is established. That way we can avoid the full initial scan over the span configuration state and simply pick up where we left off with our existing spanconfig.Store. ↩