-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvflowcontrol,raftlog: interfaces for replication control #95637
kvflowcontrol,raftlog: interfaces for replication control #95637
Conversation
efe25ac
to
43c1c48
Compare
d5aa826
to
f29a8e5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mostly looks fine. I did not look at the encoding/decoding changes.
Reviewed 12 of 33 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif)
pkg/kv/kvserver/kvflowcontrol/doc.go
line 16 at r1 (raw file):
// into KV, write a "life of a replicated proposal" timeline here for flow-token // interactions. Talk about how range splits/merges interact and how we ensure // now flow tokens are leaked or double returned. Talk also about snapshots, log
ensure flow tokens are not leaked or double returned.
pkg/kv/kvserver/kvflowcontrol/doc.go
line 55 at r1 (raw file):
// raft log entries that were admitted below raft. We use the origin node // encoded in raft entry (RaftAdmissionMeta.AdmissionOriginNode) to know // where to send these to. This information used on the origin node to
nit: is used
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 42 at r1 (raw file):
// of size) over the specified streams. This is a blocking operation; // requests wait until there are flow tokens available. Admit(admissionpb.WorkPriority, ...Stream)
worth adding CreateTime
as a parameter here, even though the callee will ignore it, since ideally we should eventually use it.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 67 at r1 (raw file):
// stream), it's done so by specifying the log position up to which we free up // all deducted tokens. See kvflowcontrolpb.AdmittedRaftLogEntries for more // details.
So the Handle implementation knows the underlying Streams and will call into the Controller? Is there a Handle for multiple Streams, or does a Handle
refer to exactly one Stream
?
I am guessing the latter since the same interface is being used for admission above raft, token deduction below raft and for returning tokens, and returning tokens can only be done for individual Streams.
Do the Controller methods need to be public? I was expecting that the rest of the KV code would only need to talk to Handle
.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 76 at r1 (raw file):
// priority to members of the raft group, and tracks it with respect to the // specific raft log position it's expecting it to end up in. Requests are // assumed to have been Admit()-ed first.
Though I suppose we will allow requests to bypas Admit
, e.g. regular priority ones for the initial deployment?
Worth saying that this is non-blocking.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 96 at r1 (raw file):
// that attempts to return them (on hearing about AdmittedRaftLogEntries // replicated under the earlier lease), we discard the attempts. TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
I'm puzzled by the Stream
parameter, since I was assuming there is one Stream
per Handle
.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 124 at r1 (raw file):
type DispatchReader interface { PendingDispatch() []roachpb.NodeID PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
does PendingDispatchFor
remove these from the pending list? If no, when do they get removed? If yes, what if the underlying gRPC
stream broke before these were successfully sent?
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 26 at r1 (raw file):
message RaftAdmissionMeta { // AdmissionPriority of the command (maps to admission.WorkPriority); used // within a tenant below-raft for replication admission control.
same question about using a single byte.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 28 at r1 (raw file):
// within a tenant below-raft for replication admission control. int32 admission_priority = 18; // AdmissionCreateTime is equivalent to Time.UnixNano() from the creation time
... at the creation time ...
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 54 at r1 (raw file):
// - But if we had to "stitch in" the origin node ID once received off of // the transport, or tie together raft entries with their origin node IDs // by through some other way (the raft library only wants to "step"
not: IDs through some ...
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 57 at r1 (raw file):
// through message type we can't so easily annotate), we'd have to do a // fair bit of state tracking. // If it's still too costly, we could rip all this out and coarsen
We could also choose to include this information only for raft entries that will not bypass admission control. The bypassing ones (which for the near future will include all normal priority ones, which are the ones which are likely to be small and for which this overhead may matter) can bypass token subtraction (and later addition) on the sender. Though ideally it would be nice if we do the token tracking for these bypassing ones so we can look at a system and understand that we are losing performance isolation because of this bypass.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 95 at r1 (raw file):
// with lower terms get admitted first, or with lower indexes within the same // term). So the value here implies admission of all entries that sort before // and have the same priority.
We are doing this "prefix admission" logic to ease our token return story in the presence of failures. But by doing this at the granularity of admission_priority
are we reopening the complexity. There are 256 different priority levels and if we are worried that a message has been lost we need to return the highest admitted RaftLogPosition
for each of the priorities. We could coarsen this to regular and elastic.
Though there is still some difficulty. If the node returning the tokens restarts it does not know whether the stream has seen both regular and elastic traffic in the pat and that it has not returned some elastic tokens. I suspect we need the capability for the node to say I am returning all tokens (across both elastic and regular) up to some RaftLogPosition, because I have lost state. It's fine to do this with two such AdmittedRaftLogEntries
messages, one for regular and one for elastic.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 104 at r1 (raw file):
// handshake protocol at the start, where the client identified itself by its // NodeID. That way the origin replica receiving this information can infer // identify the StoreID where this work was done (since we we never store
nit: can infer the StoreID
pkg/kv/kvserver/kvserverpb/proposer_kv.proto
line 365 at r1 (raw file):
// AdmissionPriority of the command (maps to admission.WorkPriority); used // within a tenant below-raft for replication admission control.
admissionpb.WorkPriority
is a int8
. Why not use a byte here instead of this varint encoded int32?
pkg/kv/kvserver/kvserverpb/proposer_kv.proto
line 368 at r1 (raw file):
int32 admission_priority = 18; // AdmissionCreateTime is equivalent to Time.UnixNano() from the creation time
nit: ... at the creation time ...
(from can make someone think this is a duration)
f29a8e5
to
b06939c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR.
I did not look at the encoding/decoding changes.
@tbg, do you think you can review the changes to pkg/kv/kvserver/raftlog
? And the few callsites. They should (hopefully) be straightforward. The new encodings are not used anywhere, yet, but you could look at:
BenchmarkRaftAdmissionMetaOverhead
that explains how it's going to be used;- Uses of
DecodeRaftAdmissionMeta
in the prototype branch ([prototype] *: replication admission control #93102) to see how it will get used below raft; (r *Replica) propose
inreplica_raft.go
in the prototype branch ([prototype] *: replication admission control #93102) to see how it will get used above raft.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @sumeerbhola)
pkg/kv/kvserver/kvflowcontrol/doc.go
line 16 at r1 (raw file):
Previously, sumeerbhola wrote…
ensure flow tokens are not leaked or double returned.
Done.
pkg/kv/kvserver/kvflowcontrol/doc.go
line 55 at r1 (raw file):
Previously, sumeerbhola wrote…
nit: is used
Done.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 42 at r1 (raw file):
Previously, sumeerbhola wrote…
worth adding
CreateTime
as a parameter here, even though the callee will ignore it, since ideally we should eventually use it.
Done.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 67 at r1 (raw file):
So the Handle implementation knows the underlying Streams and will call into the Controller?
Yes.
Is there a Handle for multiple Streams, or does a Handle refer to exactly one Stream?
Single kvflowcontrol.Handle
for multiple kvflowcontrol.Stream
s. Since it's held at the replica level, it knows about all the destination replicas the per-range replication traffic is bound for.
since the same interface is being used for admission above raft,
For write work admission above raft, i.e. re-using kvadmission.Controller.AdmitKVWork
but if relevant cluster settings + version gate let us, we'll look up the right kvflowcontrol.Handle
first (basically looking up the replica the batch request refers to by ba.RangeID
) and call ~kvflowcontrol.Handle.Admitwhich in turn calls
kvflowcontrol.Controller.Admit` for relevant flows[*].
I am guessing the latter since [Handle] is being used for [...] token deduction below raft and for returning tokens
The kvflowcontrol.Handle
is not used for token deduction below raft. It goes through kvadmission.Controller.AdmitRaftEntry()
. Below raft we're not doing flow token deductions, only IO tokens, so kvflowcontrol.Handle
is not involved. For returning flow tokens below raft, we'll go through the kvflowcontrol.Dispatch
. If you're talking about "dispatching" to the local node, i.e. where no RPC piggybacking is possible/needed, the kvflowcontrol.Dispatch
implementation, I was assuming, would have a fast path to immediately kvflowcontrol.Controller.ReturnTokens()
on the same node.
and returning tokens can only be done for individual Streams.
Yes, which is why kvflowcontrol.Controller
methods explicitly specify the kvflowcontrol.Stream
(s) we're operating over.
Do the [kvflowcontrol.]Controller methods need to be public?
See comments above, not everything will go through kvflowcontrol.Handle
, so it'll need to be accessible to the kvadmission package.
[*]: This TODO is somewhat relevant here:
// TODO(irfansharif): We might need the ability to "disable" specific
// streams/corresponding token buckets when there are failures or
// replication to a specific store is paused due to follower-pausing.
// That'll have to show up between the Handler and the Controller somehow.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 76 at r1 (raw file):
Though I suppose we will allow requests to bypas Admit, e.g. regular priority ones for the initial deployment?
Yes. If you're thinking we would do the whole "deduct/return flow tokens but don't block on kvflowcontrol.Controller.Admit()
" as part of rolling this out/shaking bugs, that sounds good to me. I don't think it's worth mentioning this in these interface comments.
Worth saying that this is non-blocking.
Done.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 96 at r1 (raw file):
Previously, sumeerbhola wrote…
I'm puzzled by the
Stream
parameter, since I was assuming there is oneStream
perHandle
.
Multiple streams per handle, see comment above. I also updated the interface comment to be explicit:
// [..] Handles are held on replicas initiating
// replication traffic, i.e. are both the leaseholder and raft leader, and
// manage multiple Streams (one per active replica) underneath.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 124 at r1 (raw file):
does PendingDispatchFor remove these from the pending list? If no, when do they get removed?
It removes them, yes.
If yes, what if the underlying gRPC stream broke before these were successfully sent?
The node on the other end is expected to react to the stream breaking by freeing up all held tokens. And ignoring later attempts to return these tokens if the stream was re-established ~instantly. Was hoping to avoid any sort of acknowledgement/handshake over flow token returns. Of course, if the stream breaks because the receiver node has crashed, we'll need to react to it similarly (returning flow tokens and also ignoring this specific stream's flow token bucket when looking to Admit). I'll flesh out some more specifics with node failures in subsequent PRs. There's a comment above that also talks about node failures.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 26 at r1 (raw file):
Previously, sumeerbhola wrote…
same question about using a single byte.
See below.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 28 at r1 (raw file):
Previously, sumeerbhola wrote…
... at the creation time ...
Done.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 54 at r1 (raw file):
Previously, sumeerbhola wrote…
not: IDs through some ...
Done.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 57 at r1 (raw file):
We could also choose to include this information only for raft entries that will not bypass admission control.
Planning to do just this. On a per-request basis we'll decide whether it's subject to replication admission control based on some cluster settings (that disable replication admission control completely, only for regular requests, or enable it completely). If we're not using replication admission control for a request, we'll encode these raft entries using EntryEncoding{Standard,Sideloaded}WithoutAC which will not include this information.
Though ideally it would be nice if we do the token tracking for these bypassing ones so we can look at a system and understand that we are losing performance isolation because of this bypass.
We can consider a cluster setting that allows flow token tracking but does not actually block on kvflowcontrol.Controller.Admit(). Added a bullet point to #95563 that says the following:
Also support a mode where we do end-to-end flow control token tracking but don't actually block at admit time due to lack of requisite flow tokens. It'll let us look at production systems and understand that we are losing performance isolation due to a lack flow control.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 95 at r1 (raw file):
We could coarsen this to regular and elastic.
Good idea. Originally want to avoid leaking "work classes" at this level, but the complexity is not worth it given the kvflowcontrol.Controller
will be segmenting flow tokens by work classes anyway. Moved + exported an admissionpb.WorkClass
(we'd need to eventually anyway, for our data structures). If later we're ok with this complexity, it's just a proto change (none of this is stateful).
I suspect we need the capability for the node to say I am returning all tokens (across both elastic and regular) up to some RaftLogPosition, because I have lost state. It's fine to do this with two such AdmittedRaftLogEntries messages, one for regular and one for elastic.
Yes, we could do with two such AdmittedRaftLogEntries
messages. I was hoping however to not have to involve the restarted node at all. I was hoping for the origin node waiting for AdmittedRaftLogEntries
to react to the node failure (failed liveness heartbeat, or even bumped liveness epoch for) by free-ing up all held flow tokens. This needs some further specification and is something I hadn't prototyped yet. So I'll either find time to prototype it soon and be specific, or get to it as part of the TODO I added to #95563 that says:
- [ ] Support and add tests to make sure we don't leak flow tokens, or return them repeatedly, in the face of node failures, gRPC streams breaking (including intermittently), reproposals, snapshots, log truncations, splits, merges, lease transfers, leadership transfers, raft membership changing, follower pausing, prolonged leaseholder != leader, etc.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 104 at r1 (raw file):
Previously, sumeerbhola wrote…
nit: can infer the StoreID
Done.
pkg/kv/kvserver/kvserverpb/proposer_kv.proto
line 365 at r1 (raw file):
Previously, sumeerbhola wrote…
admissionpb.WorkPriority
is aint8
. Why not use a byte here instead of this varint encoded int32?
Noob question: how does one use a single byte field in proto3? I don't see it listed here: https://developers.google.com/protocol-buffers/docs/proto3#scalar. Using an int32, I think, is efficient. With varint int32 encoding it'll take 1 byte for values up to 127, which is math.MaxInt8
.
pkg/kv/kvserver/kvserverpb/proposer_kv.proto
line 368 at r1 (raw file):
Previously, sumeerbhola wrote…
nit: ... at the creation time ...
(from can make someone think this is a duration)
Done.
b06939c
to
56d1ecd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 33 files at r1, 1 of 14 files at r2, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif and @tbg)
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 67 at r1 (raw file):
This could use some ascii art to show how these components fit together.
and returning tokens can only be done for individual Streams.
Yes, which is why kvflowcontrol.Controller methods explicitly specify the kvflowcontrol.Stream(s) we're operating over.
I'm still confused about ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
in Handle
. We are not returning tokens for all streams at the same time. Why does this exist?
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 124 at r1 (raw file):
Previously, irfansharif (irfan sharif) wrote…
does PendingDispatchFor remove these from the pending list? If no, when do they get removed?
It removes them, yes.
If yes, what if the underlying gRPC stream broke before these were successfully sent?
The node on the other end is expected to react to the stream breaking by freeing up all held tokens. And ignoring later attempts to return these tokens if the stream was re-established ~instantly. Was hoping to avoid any sort of acknowledgement/handshake over flow token returns. Of course, if the stream breaks because the receiver node has crashed, we'll need to react to it similarly (returning flow tokens and also ignoring this specific stream's flow token bucket when looking to Admit). I'll flesh out some more specifics with node failures in subsequent PRs. There's a comment above that also talks about node failures.
Can you write a code comment about the protocol in kvflowcontrol/doc.go. The protocol comment can ignore the code abstractions that are already documented there -- would just like a discussion about gRPC streams, messages that flow on them, failure modes.
pkg/kv/kvserver/kvserverpb/proposer_kv.proto
line 365 at r1 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Noob question: how does one use a single byte field in proto3? I don't see it listed here: https://developers.google.com/protocol-buffers/docs/proto3#scalar. Using an int32, I think, is efficient. With varint int32 encoding it'll take 1 byte for values up to 127, which is
math.MaxInt8
.
never mind. I'm getting forgetful about protos. Even proto2 did not have a single byte type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you send the raft encoding changes as a separate PR? I glanced over it and I don't think you'll need to make any larger changes but I'd like to give it some attention in isolation and this is difficult in such a big PR.
I looked at the prototype PR's propose
method and it was what I expected.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif and @sumeerbhola)
pkg/clusterversion/cockroach_versions.go
line 408 at r2 (raw file):
// // TODO(irfansharif): Actually use this. V23_1UseEncodingWithBelowRaftAdmissionData
You'll get away with it, but technically you can only introduce this setting when the code actually supports it. Imagine this PR running in a mixed-version cluster with a 23.1 binary. 23.1 would assume this PR can do replication admission control but it can't.
Part of cockroachdb#95563. Predecessor to cockroachdb#95637. This commit introduces two new encodings for raft log entries, EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, EntryEncoding{Standard,Sideloaded}[^1], to indicate whether the entry came with sideloaded data[^2]. Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll "admit the work without blocking", which is further explained in cockroachdb#95637. The decision to use replication admission control happens above raft and a per-entry basis. If using replication admission control, AC-specific metadata will be plumbed down as part of the marshaled raft command. This too is explained in in cockroachdb#95637, specifically, the 'RaftAdmissionMeta' section. This commit then adds an unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData) to use replication admission control. Since we're using a different prefix byte for raft commands (see EntryEncodings above), one not recognized in earlier CRDB versions, we need explicit versioning. We add it out of development convenience -- adding version gates is most prone to merge conflicts. We expect to use it shortly, before alpha/beta cuts. [^1]: Now renamed to EntryEncoding{Standard,Sideloaded}WithoutAC. [^2]: These are typically AddSSTs, the storage for which is treated differently for performance reasons. Release note: None
56d1ecd
to
9ef9864
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you send the raft encoding changes as a separate PR?
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @sumeerbhola and @tbg)
pkg/clusterversion/cockroach_versions.go
line 408 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
You'll get away with it, but technically you can only introduce this setting when the code actually supports it. Imagine this PR running in a mixed-version cluster with a 23.1 binary. 23.1 would assume this PR can do replication admission control but it can't.
Yea, I just always end up catching merge conflicts in this file and this file alone, so find it easier to just reserve my slot. I'm assuming I'll wire this up before any alpha/beta cuts where these mixed-version incompatibilities don't come up.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 67 at r1 (raw file):
This could use some ascii art to show how these components fit together.
Done.
I'm still confused about ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition) in Handle
That's an oversight, meant to include a Stream
in that signature. Done.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 124 at r1 (raw file):
Previously, sumeerbhola wrote…
Can you write a code comment about the protocol in kvflowcontrol/doc.go. The protocol comment can ignore the code abstractions that are already documented there -- would just like a discussion about gRPC streams, messages that flow on them, failure modes.
Done. But I think my description is imprecise and/or too high-level. I tried typing out some other interactions too which are plagued by similar difficulties. Could you take a look and improve my understanding? For most of them I was hoping to learn through writing tests + fixing them in the prototype, which I haven't done yet.
9ef9864
to
2c9d16f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 24 files at r9.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale)
pkg/kv/kvserver/kvflowcontrol/doc.go
line 177 at r8 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Added the following as a footnote, to explain the differences between this and I2+I3a. They're both similar in that we're effectively not going through Admit() though for [^7] we're tracking flow token deductions, which can be helpful for observability and also it being used somehow within (replay?) flow control.
// [^7]: When a node is crashed, instead of ignoring the underlying flow token // buckets, an alternative is to DeductTokens without going through Admit // (which blocks until flow tokens > 0). That way if the node comes back // up and catches up via replay (and not snapshot) we have accounted for // the load being placed on it. Though similar to I3a, the risk there is // that if start going through Admit() as soon as the node comes back up, // the tokens will still be negative and writes will stall. Whether we (i) // DeductTokens but wait post node-restart lag before going through // Admit(), or (ii) don't DeductTokens (Admit() is rendered a no-op), // we're being somewhat optimistic, which is fine.
looks good.
pkg/kv/kvserver/kvflowcontrol/doc.go
line 276 at r8 (raw file):
+1 to the description changes
We still have the possibility of the receiver perpetually dropping those messages and the sender not finding out.
That is ok in that if the receiver repeatedly drops then ideally we should not be returning those tokens. The concern is that something is dropped somewhere and there will be no retries, and the sender does not know that this has happened.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 44 at r8 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I ended up plumbing a context. I also just got rid of the variadic Stream parameter and made it single-Stream oriented instead, which will making writing Handle.Admit code easier.
Looks good.
Given Admit
doesn't deduct tokens I suppose there is no concern that replica1 calls Admit(stream1), Admit(stream2) and replica2 calls Admit(stream2), Admit(stream1) and cause a deadlock. We can of course also call in sorted stream order just to be defensive.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 72 at r8 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I'll keep this as is for a slight preference for single word names, even when unreasonable. If it helps I spent far too long asking ChatGPT for answers.
:)
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 84 at r8 (raw file):
This might all end up being unnecessary, so I'll rip things out later.
Sounds good.
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 113 at r8 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I'll keep it as Dispatch for now since we're not exactly returning the tokens directly (or at least kvflowcontrol.Tokens doesn't appear anywhere in the signature). This type feels more like a general outbox for "KV flow control". I'll ask ChatGPT for more names.
Ack
pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto
line 29 at r8 (raw file):
See the older comment below. I think we need the token return to be per workClass to avoid this need to iterate. It's ok to have this asymmetry where the sender of the raft command is specifying the priority and the token returns are per work class.
We are doing this "prefix admission" logic to ease our token return story in the presence of failures. But by doing this at the granularity of admission_priority are we reopening the complexity. There are 256 different priority levels and if we are worried that a message has been lost we need to return the highest admitted RaftLogPosition for each of the priorities. We could coarsen this to regular and elastic.
Though there is still some difficulty. If the node returning the tokens restarts it does not know whether the stream has seen both regular and elastic traffic in the pat and that it has not returned some elastic tokens. I suspect we need the capability for the node to say I am returning all tokens (across both elastic and regular) up to some RaftLogPosition, because I have lost state. It's fine to do this with two such AdmittedRaftLogEntries messages, one for regular and one for elastic.
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models (that map between accounted-for writes and observed L0 growth, using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to: - Return flow tokens on the origin node[^1][^2]. - In WorkQueue ordering -- for replicated writes below-raft, we ignore CreateTime/epoch-LIFO, and instead sort by priority and within a priority, sort by log position. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models (that map between accounted-for writes and observed L0 growth, using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to: - Return flow tokens on the origin node[^1][^2]. - In WorkQueue ordering -- for replicated writes below-raft, we ignore CreateTime/epoch-LIFO, and instead sort by priority and within a priority, sort by log position. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models (that map between accounted-for writes and observed L0 growth, using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to: - Return flow tokens on the origin node[^1][^2]. - In WorkQueue ordering -- for replicated writes below-raft, we ignore CreateTime/epoch-LIFO, and instead sort by priority and within a priority, sort by log position. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models (that map between accounted-for writes and observed L0 growth, using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to: - Return flow tokens on the origin node[^1][^2]. - In WorkQueue ordering -- for replicated writes below-raft, we ignore CreateTime/epoch-LIFO, and instead sort by priority and within a priority, sort by log position. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models (that map between accounted-for writes and observed L0 growth, using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to: - Return flow tokens on the origin node[^1][^2]. - In WorkQueue ordering -- for replicated writes below-raft, we ignore CreateTime/epoch-LIFO, and instead sort by priority and within a priority, sort by log position. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models (that map between accounted-for writes and observed L0 growth, using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to: - Return flow tokens on the origin node[^1][^2]. - In WorkQueue ordering -- for replicated writes below-raft, we ignore CreateTime/epoch-LIFO, and instead sort by priority and within a priority, sort by log position. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models that map between accounted-for writes and observed L0 growth (using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to return flow tokens on the origin node[^1][^2]. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models that map between accounted-for writes and observed L0 growth (using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to return flow tokens on the origin node[^1][^2]. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models that map between accounted-for writes and observed L0 growth (using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to return flow tokens on the origin node[^1][^2]. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in cockroachdb#95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in cockroachdb#95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None
97599: admission: support non-blocking {Store,}WorkQueue.Admit() r=irfansharif a=irfansharif Part of #95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: ```go // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) ``` This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models (that map between accounted-for writes and observed L0 growth, using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to return flow tokens on the origin node[^1][^2]. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: ```go // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } ``` Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: ```go // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } ``` [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in #95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in #95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in #95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in #95637. Release note: None 98419: clusterversion: add a gate for new system privileges r=jayshrivastava a=rafiss A 22.2/23.1 mixed version cluster cannot handle new system privileges well. This commit gates their usage and adds a test. Without this gate, the included test would fail and users would not be able to log in to nodes running on the old binary. Epic: None Release note: None 98495: settingswatcher: version guard support for clusters bootstrapped at old versions r=JeffSwenson a=JeffSwenson When a cluster is bootstrapping, the sql server is initialized before the cluster version is populated in the DB. Previously, the version guard utility was unable to handle this state if the version is older than the maxVersion used to initialize the version guard. Now, the versionGuard handles this bootstrapping state by falling back on the in-memory cluster version. Part of #94843 Release note: none Co-authored-by: irfan sharif <[email protected]> Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Jeff <[email protected]>
Fixes cockroachdb#104696. Fixes cockroachdb#104697. Fixes cockroachdb#104698. Part of cockroachdb#98703. In 072c16d (added as part of cockroachdb#95637) we re-worked the locking structure around the RaftTransport's per-RPC class level send queues. When new send queues are instantiated or old ones deleted, we now also maintain the kvflowcontrol connection tracker, so such maintenance now needs to happen while holding a kvflowcontrol mutex. When rebasing \cockroachdb#95637 onto master, we accidentally included earlier queue deletion code without holding the appropriate mutex. Queue deletions now happened twice which made it possible to hit a RaftTransport assertion about expecting the right send queue to already exist. Specifically, the following sequence was possible: - (*RaftTransport).SendAsync is invoked, observes no queue for <nodeid,class>, creates it, and tracks it in the queues map. - It invokes an async worker W1 to process that send queue through (*RaftTransport).startProcessNewQueue. The async worker is responsible for clearing the tracked queue in the queues map once done. - W1 expects to find the tracked queue in the queues map, finds it, proceeds. - W1 is done processing. On its way out, W1 clears <nodeid,class> from the queues map the first time. - (*RaftTransport).SendAsync is invoked by another goroutine, observes no queue for <nodeid,class>, creates it, and tracks it in the queues map. - It invokes an async worker W2 to process that send queue through (*RaftTransport).startProcessNewQueue. The async worker is responsible for clearing the tracked queue in the queues map once done. - W1 blindly clears the <nodeid,class> raft send queue the second time. - W2 expects to find the queue in the queues map, but doesn't, and fatals. Release note: None
104699: kvserver: fix clearrange/* tests r=irfansharif a=irfansharif Fixes #104696. Fixes #104697. Fixes #104698. Part of #98703. In 072c16d (added as part of #95637) we re-worked the locking structure around the RaftTransport's per-RPC class level send queues. When new send queues are instantiated or old ones deleted, we now also maintain the kvflowcontrol connection tracker, so such maintenance now needs to happen while holding a kvflowcontrol mutex. When rebasing \#95637 onto master, we accidentally included earlier queue deletion code without holding the appropriate mutex. Queue deletions now happened twice which made it possible to hit a RaftTransport assertion about expecting the right send queue to already exist. Specifically, the following sequence was possible: - `(*RaftTransport).SendAsync` is invoked, observes no queue for `<nodeid,class>`, creates it, and tracks it in the queues map. - It invokes an async worker W1 to process that send queue through `(*RaftTransport).startProcessNewQueue`. The async worker is responsible for clearing the tracked queue in the queues map once done. - W1 expects to find the tracked queue in the queues map, finds it, proceeds. - W1 is done processing. On its way out, W1 clears `<nodeid,class>` from the queues map the first time. - `(*RaftTransport).SendAsync` is invoked by another goroutine, observes no queue for <nodeid,class>, creates it, and tracks it in the queues map. - It invokes an async worker W2 to process that send queue through `(*RaftTransport).startProcessNewQueue`. The async worker is responsible for clearing the tracked queue in the queues map once done. - W1 blindly clears the `<nodeid,class>` raft send queue the second time. - W2 expects to find the queue in the queues map, but doesn't, and fatals. Release note: None Co-authored-by: irfan sharif <[email protected]>
Follower replication work, today, is not subject to admission control. It consumes IO tokens without waiting, which both (i) does not prevent the LSM from being inverted, and (ii) can cause priority inversion where low-pri follower write work ends up causing IO token exhaustion, which in turn causes throughput and latency impact for high-pri non-follower write work on that same store. This latter behavior was especially noticeble with large index backfills (#82556) where >2/3rds of write traffic on stores could be follower work for large AddSSTs, causing IO token exhaustion for regular write work being proposed on those stores.
We last looked at this problem as part of #79215, settling on #83851 which pauses replication traffic to stores close to exceeding their IO overload threshold (data that's periodically gossiped). In large index backfill experiments we found this to help slightly, but it's still a coarse and imperfect solution -- we're deliberately causing under-replication instead of being able to shape the rate of incoming writes for low-pri work closer to the origin.
As part of #95563 we're introducing machinery for "replication admission control" -- end-to-end flow control for replication traffic. With it we expect to no longer need to bypass follower write work in admission control and solve the issues mentioned above. Some small degree of familiarity with the design is assumed below. In this first, proto{col,buf}/interface-only PR, we introduce:
Package
kvflowcontrol{,pb}
, which will provide flow control for replication traffic in KV. It will be part of the integration layer between KV and admission control. In it we have a few central interfaces:kvflowcontrol.Controller
, held at the node-level and holds allflowcontrol.Tokens
for eachflowcontrol.Stream
(one per store we're sending raft traffic to and tenant we're sending it for).kvflowcontrol.Handle
, which will held at the replica-level (only on those who are both leaseholder and raft leader), and will be used to interface with the node-levelkvflowcontrol.Controller
. When replicating log entries, these replicas choose the log position (term+index) the data is to end up at, and use this handle to track the token deductions on a per log position basis. Later when freeing up tokens (after being informed of said log entries being admitted on the receiving end of the stream), it's done so by specifying the log position up to which we free up all deducted tokens.kvflowcontrolpb.RaftAdmissionMeta
and relevant encoding/decoding routines.RaftAdmissionMeta
is 'embedded' within akvserverpb.RaftCommand
, and includes necessary AC metadata on a per raft entry basis. Entries that contain this metadata will make use of the AC-specific raft log entry encodings described earlier. The AC metadata is decoded below-raft when looking to admit the write work. Also included is the node where this command originated, who wants to eventually learn of this command's admission.kvflowcontrolpb.AdmittedRaftLogEntries
, which now features inkvserverpb.RaftMessageRequest
, the unit of what's sent back-and-forth between two nodes over their two uni-directional raft transport streams.AdmittedRaftLogEntries
, just like raft heartbeats, is coalesced information about all raft log entries that were admitted below raft. We'll use the origin node encoded in raft entry (admission_origin_node
from 2. from above) to know where to send these to. This information used on the origin node to release flow tokens that were acquired when replicating the original log entries.kvflowcontrol.Dispatch
, which is used to dispatch information about admitted raft log entries (see 3. from above) to specific nodes where (i) said entries originated, (ii) flow tokens were deducted and (iii) are waiting to be returned. The interface is also used to read pending dispatches, which will be used in the raft transport layer when looking to piggyback information on traffic already bound to specific nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we allow querying for all long-overdue dispatches. The interface looks roughly like:Two new encodings for raft log entries,
EntryEncoding{Standard,Sideloaded}WithAC
. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two,EntryEncoding{Standard,Sideloaded}
(now renamed toEntryEncoding{Standard,Sideloaded}WithoutAC
), to indicate whether the entry came with sideloaded data (these are typically AddSSTs, the storage for which is treated differently for performance). Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll admit the work without blocking.An unused version gate (
V23_1UseEncodingWithBelowRaftAdmissionData
) to use replication admission control. Since we're using a different prefix byte for raft commands (point 5. above), one not recognized in earlier CRDB versions, we need explicit versioning.AdmitRaftEntry
, on thekvadmission.Controller
interface. We'll use this as the integration point for log entries received below raft, right as they're being written to storage. This will be non-blocking since we'll be below raft in theraft.Ready()
loop, and will effectively enqueue a "virtual" work item in underlyingStoreWorkQueue
mediating store IO. This virtual work item is what later gets dequeued once the store granter informs the work queue of newly available IO tokens. For standard work queue ordering, our work item needs to include the create time and admission pri. The tenant ID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation); the store ID to find the right store work queue on multi-store nodes. Theraftpb.Entry
encodes within it its origin node (see 2. from above), which is used post-admission to inform the right node of said admission. It looks like:Here's how the various pieces fit together: