diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5c8036fe810a..6d5c0db22246 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -222,7 +222,8 @@ /pkg/kv/kvserver/gc/ @cockroachdb/kv-prs /pkg/kv/kvserver/idalloc/ @cockroachdb/kv-prs /pkg/kv/kvserver/intentresolver/ @cockroachdb/kv-prs -/pkg/kv/kvserver/kvadmission/ @cockroachdb/kv-prs +/pkg/kv/kvserver/kvadmission/ @cockroachdb/admission-control +/pkg/kv/kvserver/kvflowcontrol/ @cockroachdb/admission-control /pkg/kv/kvserver/kvserverbase/ @cockroachdb/kv-prs /pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs /pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 57e7afe17925..f4b8f4df8e0e 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1214,6 +1214,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/intentresolver:intentresolver", "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/kvadmission:kvadmission", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol", "//pkg/kv/kvserver/kvserverbase:kvserverbase", "//pkg/kv/kvserver/kvserverpb:kvserverpb", "//pkg/kv/kvserver/kvstorage:kvstorage", @@ -2607,6 +2609,8 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/idalloc:get_x_data", "//pkg/kv/kvserver/intentresolver:get_x_data", "//pkg/kv/kvserver/kvadmission:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", "//pkg/kv/kvserver/kvserverpb:get_x_data", "//pkg/kv/kvserver/kvstorage:get_x_data", diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 88e7e8d1ecb3..fdeda998c619 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -30,6 +30,7 @@ PROTOBUF_SRCS = [ "//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto", "//pkg/kv/kvserver/concurrency/lock:lock_go_proto", "//pkg/kv/kvserver/concurrency/poison:poison_go_proto", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto", "//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto", diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 44573b5e43fc..3c8953754572 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//:pebble", + "@io_etcd_go_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index c9d4a98b30cb..92fb01f8a832 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "go.etcd.io/raft/v3/raftpb" ) // elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted @@ -81,6 +82,14 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting( true, ) +// ProvisionedBandwidth set a value of the provisioned +// bandwidth for each store in the cluster. +var ProvisionedBandwidth = settings.RegisterByteSizeSetting( + settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", + "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ + "for each store. It can be over-ridden on a per-store basis using the --store flag", + 0).WithPublic() + // Controller provides admission control for the KV layer. type Controller interface { // AdmitKVWork must be called before performing KV work. @@ -108,6 +117,9 @@ type Controller interface { // replicated to a raft follower, that have not been subject to admission // control. FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes) + // AdmitRaftEntry informs admission control of a raft log entry being + // written to storage. + AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) } // TenantWeightProvider can be periodically asked to provide the tenant @@ -394,13 +406,12 @@ func (n *controllerImpl) FollowerStoreWriteBytes( followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo) } -// ProvisionedBandwidth set a value of the provisioned -// bandwidth for each store in the cluster. -var ProvisionedBandwidth = settings.RegisterByteSizeSetting( - settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", - "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ - "for each store. It can be over-ridden on a per-store basis using the --store flag", - 0).WithPublic() +// AdmitRaftEntry implements the Controller interface. +func (n *controllerImpl) AdmitRaftEntry( + roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry, +) { + panic("unimplemented") +} // FollowerStoreWriteBytes captures stats about writes done to a store by a // replica that is not the leaseholder. These are used for admission control. diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel new file mode 100644 index 000000000000..04ef54d4893d --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -0,0 +1,19 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "kvflowcontrol", + srcs = [ + "doc.go", + "kvflowcontrol.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/roachpb", + "//pkg/util/admission/admissionpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go new file mode 100644 index 000000000000..f4b1a0ed64a3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -0,0 +1,355 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontrol + +// This package contains machinery for "replication admission control" -- +// end-to-end flow control for replication traffic. It's part of the integration +// layer between KV and admission control. There are a few components, in and +// out of this package. The [{l,r}] annotations refer to the figure +// below. +// +// A. The central interfaces/types in this package are: +// - kvflowcontrol.Controller, held at the node-level and holds all available +// kvflowcontrol.Tokens for each kvflowcontrol.Stream. +// - kvflowcontrol.Tokens represent the finite capacity of a given stream, +// expressed in bytes we're looking to replicate over the given stream. +// The tokens we deduct are determined post-evaluation, after [3]. +// - kvflowcontrol.Stream models the stream over which we replicate data +// traffic, transmission for which we regulate using flow control. It's +// segmented by the specific store the traffic is bound for, and also the +// tenant driving it. +// - kvflowcontrol.Handle is held at the replica-level (only on those who are +// both leaseholder and raft leader), and is used to interface with +// the node-level kvflowcontrol.Controller. When replicating log entries, +// these replicas choose the log position (term+index) the data is to end +// up at, and use this handle to track the token deductions on a per log +// position basis, see [4]. After being informed of these log entries being +// admitted by the receiving end of the kvflowcontrol.Stream, it frees up +// the tokens. +// +// B. kvflowcontrolpb.RaftAdmissionMeta, embedded within each +// kvserverpb.RaftCommand, includes all necessary information for below-raft +// IO admission control. Also included is the node where this command +// originated, who wants to eventually learn of this command's admission. +// Entries that contain this metadata make use of AC-specific raft log entry +// encodings described below. They're passed through the [5r] and [5l]. +// +// C. kvflowcontrolpb.AdmittedRaftLogEntries, piggybacked as part of +// kvserverpb.RaftMessageRequest[^1] (see [7r] + [9r] below), contains +// coalesced information about all raft log entries that were admitted below +// raft. We use the origin node encoded in raft entry +// (RaftAdmissionMeta.AdmissionOriginNode) to know where to send these to. +// This information is used on the origin node to release flow tokens that +// were acquired when replicating the original log entries. This is [10r] and +// [8l'] below. +// +// D. kvflowcontrol.Dispatch is used to dispatch information about +// admitted raft log entries (AdmittedRaftLogEntries) to the specific nodes +// where (i) said entries originated, (ii) flow tokens were deducted and +// (iii) are waiting to be returned. The interface is also used to read +// pending dispatches, which will be used in the raft transport layer when +// looking to piggyback information on traffic already bound to specific +// nodes; see [8]. Since timely dispatching (read: piggybacking) is not +// guaranteed, we allow querying for all long-overdue dispatches. +// +// E. We use specific encodings for raft log entries that contain AC data: +// EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix +// byte that informs decoding routines how to interpret the subsequent bytes. +// Since we don't want to decode anything if the command is not subject to +// replication admission control, the encoding type is a convenient place to +// capture how a specific entry is to be considered. +// - The decision to use replication admission control happens above raft +// (using cluster settings, version gates), and AC-specific metadata is +// plumbed down as part of the marshaled raft command (RaftAdmissionMeta). +// +// F. AdmitRaftEntry, on the kvadmission.Controller is the integration +// point for log entries received below raft right as they're being written +// to storage. This is [6] in the figure below; it's non-blocking since we're +// below raft in the raft.Ready() loop. It effectively enqueues a "virtual" +// work item in underlying StoreWorkQueue mediating store IO. This virtual +// work item is what later gets dequeued once the store granter informs the +// work queue of newly available IO tokens. For standard work queue ordering, +// our work item needs to include the CreateTime and AdmissionPriority. The +// tenant ID is plumbed to find the right tenant heap to queue it under (for +// inter-tenant isolation); the store ID to find the right store work queue +// on multi-store nodes. Since the raftpb.Entry encodes within it its origin +// node (AdmissionOriginNode), it's used post-admission to dispatch to the +// right node; see [7'] below. +// +// --- +// +// Here's how the various pieces fit together: +// +// [1] Admit +// ○ +// │ +// │ +// ┌───────────────▼───────────────┬┬┐ ┌───────────────────────────────────┐ +// │ Replica (proposer) │││ │ kvflowcontrol.Controller │ +// │ ┌──────────────────────┬┬○────┼┼┼ [2] Admit ────────────▶ ┌───────────────────────────────┐ │ +// │ │ │││ │││ │ │ admissionpb.RegularWorkClass │ │ +// │ │ kvflowcontrol.Handle ││○────┼┼┼ [4] DeductTokens ─────▶ │┌──────────────────────┬┬┐ │ │ +// │ │ │││ │││ │ ││ kvflowcontrol.Stream │││ │ │ +// │ └─────────────▲──────○─┴┴○────┼┼┼ [10r] ReturnTokens ───▶ │└──────────────────────┴┴┘ │ │ +// │ ╲ ╱ │││ │ └───────────────────────────────┘ │ +// │ ╲ ╱ │││ │ ┌───────────────────────────────┐ │ +// │ [3] Evaluate │││ │ │ admissionpb.ElasticsWorkClass │ │ +// │ │││ │ │┌──────────────────────┬┬┐ │ │ +// │ │││ │ ││ kvflowcontrol.Stream │││ │ │ +// │ │││ │ │└──────────────────────┴┴┘ │ │ +// │ │││ │ └───────────────────────────────┘ │ +// └○────▲────────────────○───────▲┴┴┘ └──────────────────────▲────────────┘ +// │ │ │ +// │ │ │ │ +// │ │ │ +// [5l] MsgApp │ [5r] MsgApp(s) [9r] MsgAppResp(s) [8l'] ReturnTokens +// │ │ + kvflowcontrolpb.AdmittedRaftLogEntries │ (using kvflowcontrol.Handle) +// │ │ │ │ +// ┌────┴───────┴────┐ ┌───────────○────────────┐ +// │ │ │ RaftTransport ◀──── [8] PendingDispatchFor ────▶ kvflowcontrol.Dispatch │ +// └────┬───────┬────┘ └───────────▲────────────┘ +// │ │ │ │ │ +// │ │ │ +// │ │ │ │ │ +// │ │ [7'] Dispatch +// │ │ │ │ │ +// [7l] MsgAppResp │ [7r] MsgAppResp(s) │ +// │ │ │ │ │ +// ┌─▼────○────────────────▼───────○──┬┬┐ ┌───────────○────────────┐ +// │ Store │││ │ kvadmission.Controller │ +// │ ○┼┼───── [6] AdmitRaftEntry ───────▶ │ +// │ │││ │ │ +// │ │││ │ ┌─────────────────┴┬┬┐ +// │ │││ │ │ StoreWorkQueue │││ +// │ │││ │ ├──────────────────┼┼┤ +// └──────────────────────────────────┴┴┘ └──────┤ Granter │││ +// └──────────────────┴┴┘ +// +// Notation: +// - The top-half of the figure is above raft, the bottom half is below-raft. +// - The paths marked as [l] denote paths taken for the locally held +// store and [r] for remotely held stores where raft traffic crosses an +// RPC boundary (using the RaftTransport). The MsgApp and MsgAppResp shown in +// [5l] and [7l] don't actually exist -- the raft library short circuits +// things, but we include it for symmetry. [8l'] is a fast-path we use whereby +// we return flow tokens for locally admitted raft log entries without going +// through the RaftTransport. +// - The paths marked with ['] happen concurrently. In the diagram +// above, [7'] happens concurrently with [7r]; we're trying to show that a +// subsequent MsgAppResps may end up carrying AdmittedRaftLogEntries from +// earlier. [9r] shows this piggybacking. +// - Stacked boxes (with " │││" on the right hand side) indicate that there are +// multiple of a kind. Like multiple stores, multiple StoreWorkQueues, +// kvflowcontrol.Streams, etc. +// +// --- +// +// There are various interactions to consider: +// +// I1. What happens if the RaftTransport gRPC stream[^2] breaks? +// - When reading pending dispatches to piggyback onto outbound raft messages, +// we're removing it[^3] from the underlying list. So if the stream breaks +// after having done so, we're possibly leaking flow tokens at the origin +// node. Doing some sort of handshake/2PC for every return seems fraught, so +// we do something simpler: return all held tokens[^4] for a given +// kvflowcontrol.Stream when the underlying transport breaks. We need to +// ensure that if the stream re-connects we're not doubly returning flow +// tokens, for which we use the low water mark[^5] on each stream. +// +// I2. What happens if a node crashes? +// - We don't rely on it to return kvflowcontrolpb.AdmittedRaftLogEntries, and +// don't want the origin node to leak flow tokens. Here too we react +// similar to the gRPC stream breakage described above. We don't worry about +// double returns if the node never restarts, and if it does, we'll treat it +// similar to I3a below. +// - If the node containing store S2 crashed, we don't want to deduct flow +// tokens when proposing writes to (now under-replicated) ranges with replicas +// on S2. kvflowcontrol.Controller is made aware of the streams that are to be +// 'ignored' for flow token purposes, and informs callers when they attempt to +// deduct[^6], who in turn avoid tracking the (non-)deduction. +// - See I3a and [^7] for options on what to do when the node comes back up. +// +// I3. What happens if a follower store is paused, as configured by +// admission.kv.pause_replication_io_threshold? +// - Proposing replicas are aware of paused followers and don't deduct flow +// tokens for those corresponding streams (we're not issuing traffic over +// those replication streams so deducting flow tokens is meaningless). When +// the follower store is no longer paused, and the replica on that store +// is sufficiently caught up, only then do we start deducting flow tokens for +// it. +// +// I3a. Why do we need to wait for the previously paused replica to be caught +// up (via log entries, snapshots) to the raft log position found on a +// quorum of replicas? What about replicas that have fallen behind by +// being on recently crashed nodes? +// - If we didn't, we risk write stalls to the range (and others) due to flow +// tokens deducted for that store. Take the following example where we start +// deducting flow tokens as soon as the follower store is unpaused. +// - If the quorum is at , and the now-no-longer-paused +// replica at , i.e. lagging by L entries. +// - As we start proposing commands to , +// , ..., we're deducting flow tokens for the +// previously-paused follower at indexes > I. +// - These flow tokens that will only be released once the +// now-no-longer-paused follower stores those entries to its raft log and +// also admits them. +// - To do so, it has to first catch up to and admit entries from +// , , ..., , +// which could take a while. +// - We don't want to stall quorum writes at (if going up +// to I+i depletes the available flow tokens) because the previously +// paused follower here has not yet caught up. +// - When the previously-paused replica starts storing log entries from +// , , ..., and admitting them, it's +// going to try and return flow tokens for them. This is because those +// entries are encoded to use replication admission control. +// - We don't want to add these tokens to the stream's bucket since we +// didn't make corresponding deductions at propose time (the follower was +// paused then). +// - In this case, whenever we start tracking this follower's stream, i.e. +// after it's sufficiently caught up to the quorum raft log position, +// we'll ensure it's low water mark is at this position to ignore these +// invalid returns. +// +// I4. What happens when a replica gets caught up via raft snapshot? Or needs to +// get caught up via snapshot due to log truncation at the leader? +// - If a replica's fallen far enough behind the quorum with respect to its log +// position, we don't deduct flow tokens for its stream, similar to I3a. The +// same is true for if we've truncated our log ahead of what it's stored in +// its raft log. If it's just far behind on a non-truncated raft log, it's +// still receiving replication traffic through raft generated MsgApps for +// the older entries, but we don't deduct flow tokens for it. +// - When the replica gets caught up via snapshot, similar to I3a, given it's +// caught up to the quorum raft log position, we can start deducting flow +// tokens for its replication stream going forward. +// +// I5. What happens when the leaseholder and/or the raft leader changes? When +// the raft leader is not the same as the leaseholder? +// - The per-replica kvflowcontrol.Handle is tied to the lifetime of a +// leaseholder replica having raft leadership. When leadership is lost, or the +// lease changes hands, we release all held flow tokens. +// - Avoiding double returns on subsequent AdmittedRaftLogEntries for these +// already released flow tokens is easier for raft leadership changes since +// there's a term change, and all earlier/stale AdmittedRaftLogEntries with +// the lower term can be discarded. We do a similar thing for leases -- when +// being granted a lease, the low water mark in kvflowcontrol.Handle is at +// least as high as the command that transferred the lease. +// +// I6. What happens during replica GC? +// - It's unlikely that a replica gets GC-ed without first going through the +// leaseholder/raft leadership transition described in I5. Regardless, we'll +// free up all held flow tokens. If a replica were to be re-added (through +// membership change) and become raft leader + range leaseholder, and hear +// about stale AdmittedRaftLogEntries from earlier, they'd be ignored due to +// the low water mark on kvflowcontrol.Handle. +// +// I7. What happens during re-proposals? +// - Flow token deductions are tied to the first raft log position we propose +// at. We don't deduct flow tokens again for reproposed commands at higher log +// positions. Ditto for command reproposals at apply-time (if we violated +// MLAI). +// +// I8. What happens if the proposal is dropped from the raft transport's send +// queue? What if it gets dropped from some receive queue? +// - As described in I7, we're binding the proposal to the first log position we +// try to propose at. If the node-level raft transport's send queue is full, +// and we drop the proposal, we'll end up reproposing it at a higher index +// without deducting flow tokens again. Free-ing up the originally held tokens +// will only occur when any entry at a higher log position gets admitted (not +// necessarily the reproposed command). So we're relying on at least future +// attempt to make it to the follower's raft log. +// - If a replica abandons a proposal that its deducted tokens for (i.e. it'll +// no longer repropose it), we'll need to free up those tokens. That +// proposal may have never made it to the follower's logs (if they were +// dropped from the raft transport's send/receive queues for example). +// - Perhaps another way to think about safety (no token leaks) and liveness +// (eventual token returns) is that on the sender side, we should only +// deduct tokens for a proposal once actually transmitting it over the +// relevant, reliable gRPC stream (and reacting to it breaking as described +// in I1 above). On the receiver, since this proposal/message can be dropped +// due to full receive queues within the raft transport, we should either +// signal back to the sender that it return corresponding tokens (and +// re-acquire on another attempt), or that it simply free up all tokens for +// this replication stream. This could also apply to dropped messages on the +// sender side queue, where we just free up all held tokens. +// - If messages containing the entry gets dropped from the raft transport +// receive queue, we rely on raft re-transmitting said entries. Similar to +// above, we're relying on the logical admission of some entry with log +// position equal or higher to free up the deducted tokens. +// - Given AdmittedRaftLogEntries travel over the RaftTransport stream, and +// dropping them could cause flow token leakage, we guarantee delivery (aside +// from stream breaking, which is covered in I1) by only piggybacking on +// requests that exit the send queue, and returning flow tokens[^8] before +// potentially dropping the stream message due to full receive queues. +// +// I9. What happens during range splits and merges? +// - During merges when the RHS raft group is deleted, we'll free up all held +// flow tokens. Any subsequent AdmittedRaftLogEntries will be dropped since +// tokens only get returned through the kvflowcontrol.Handle identified by the +// RangeID, which for the RHS, no longer exists. +// - During splits, the LHS leaseholder+raft leader will construct its own +// kvflowcontrol.Handle, which will handle flow tokens for subsequent +// proposals. +// +// I10. What happens when replicas are added/removed from the raft group? +// - The leader+leaseholder replica is aware of replicas being added/removed +// replicas, and starts and stops deducting flow tokens for the relevant +// streams accordingly. If a replica is removed, we free up all flow tokens +// held for its particular stream. The low water mark for that particular +// stream is set to the raft log position of the command removing the replica, +// so stale AdmittedRaftLogEntries messages can be discarded. +// +// --- +// +// [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent +// back-and-forth between two nodes over their two uni-directional raft +// transport streams. +// [^2]: Over which we're dispatching kvflowcontrolpb.AdmittedRaftLogEntries. +// [^3]: kvflowcontrol.DispatchReader implementations do this as part of +// PendingDispatchFor. +// [^4]: Using DeductedTokensUpto + ReturnAllTokensUpto on +// kvflowcontrol.Handler. +// [^5]: Using ReturnAllTokensUpto on kvflowcontrol.Handler. +// [^6]: DeductTokens on kvflowcontrol.Controller returns whether the deduction +// was done. +// [^7]: When a node is crashed, instead of ignoring the underlying flow token +// buckets, an alternative is to DeductTokens without going through Admit +// (which blocks until flow tokens > 0). That way if the node comes back +// up and catches up via replay (and not snapshot) we have accounted for +// the load being placed on it. Though similar to I3a, the risk there is +// that if start going through Admit() as soon as the node comes back up, +// the tokens will still be negative and writes will stall. Whether we (i) +// DeductTokens but wait post node-restart lag before going through +// Admit(), or (ii) don't DeductTokens (Admit() is rendered a no-op), +// we're being somewhat optimistic, which is fine. +// [^8]: Using ReturnTokensUpto on kvflowcontrol.Handle. +// +// TODO(irfansharif): These descriptions are too high-level, imprecise and +// possibly wrong. Fix that. After implementing these interfaces and integrating +// it into KV, write tests for each one of them and document the precise +// interactions/integration points. It needs to be distilled to crisper +// invariants. The guiding principle seems to be 'only grab flow tokens when +// actively replicating a proposal along specific streams', which excludes +// dead/paused/lagging/pre-split RHS/non-longer-group-member replicas, and +// explains why we only do it on replicas that are both leaseholder and leader. +// It also explains why we don't re-deduct on reproposals, or try to intercept +// raft-initiated re-transmissions. For each of these scenarios, we know when +// not to deduct flow tokens. If we observe getting into the scenarios, we +// simply free up all held tokens and safeguard against a subsequent double +// returns, relying entirely on low water marks or RangeIDs not being re-used. +// - When framing invariants, talk about how we're achieving safety (no token +// leaks, no double returns) and liveness (eventual token returns). +// - Other than I8 above, are there cases where the sender has deducted tokens +// and something happens on the receiver/sender/sender-receiver stream that: +// (a) doesn't cause the sender to "return all tokens", i.e. it's relying on +// the receiver to send messages to return tokens up to some point, and +// (b) the receiver has either not received the message for which we've +// deducted tokens, or forgotten about it. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go new file mode 100644 index 000000000000..68f2cfa7b3a0 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -0,0 +1,138 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package kvflowcontrol provides flow control for replication traffic in KV. +// It's part of the integration layer between KV and admission control. +package kvflowcontrol + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +// Stream models the stream over which we replicate data traffic, the +// transmission for which we regulate using flow control. It's segmented by the +// specific store the traffic is bound for and the tenant driving it. Despite +// the underlying link/transport being shared across tenants, modeling streams +// on a per-tenant basis helps provide inter-tenant isolation. +type Stream struct { + TenantID roachpb.TenantID + StoreID roachpb.StoreID +} + +// Tokens represent the finite capacity of a given stream, expressed in bytes +// for data we're looking to replicate. Use of replication streams are +// predicated on tokens being available. +type Tokens uint64 + +// Controller provides flow control for replication traffic in KV, held at the +// node-level. +type Controller interface { + // Admit seeks admission to replicate data, regardless of size, for work + // with the given priority, create-time, and over the given stream. This + // blocks until there are flow tokens available. + Admit(context.Context, admissionpb.WorkPriority, time.Time, Stream) error + // DeductTokens deducts (without blocking) flow tokens for replicating work + // with given priority over the given stream. Requests are expected to + // have been Admit()-ed first. + DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) (deducted bool) + // ReturnTokens returns flow tokens for the given stream. These tokens are + // expected to have been deducted earlier with the same priority provided + // here. + ReturnTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) + + // TODO(irfansharif): We might need the ability to "disable" specific + // streams/corresponding token buckets when there are failures or + // replication to a specific store is paused due to follower-pausing. + // That'll have to show up between the Handler and the Controller somehow. + // See I2, I3a and [^7] in kvflowcontrol/doc.go. +} + +// Handle is used to interface with replication flow control; it's typically +// backed by a node-level kvflowcontrol.Controller. Handles are held on replicas +// initiating replication traffic, i.e. are both the leaseholder and raft +// leader, and manage multiple Streams (one per active replica) underneath. +// +// When replicating log entries, these replicas choose the log position +// (term+index) the data is to end up at, and use this handle to track the token +// deductions on a per log position basis. When informed of admitted log entries +// on the receiving end of the stream, we free up tokens by specifying the +// highest log position up to which we've admitted (below-raft admission, for a +// given priority, takes log position into account -- see +// kvflowcontrolpb.AdmittedRaftLogEntries for more details). +type Handle interface { + // Admit seeks admission to replicate data, regardless of size, for work + // with the given priority and create-time. This blocks until there are + // flow tokens available. + Admit(context.Context, admissionpb.WorkPriority, time.Time) + // DeductTokensFor deducts (without blocking) flow tokens for replicating + // work with given priority to members of the raft group. The deduction, + // if successful, is tracked with respect to the specific raft log position + // it's expecting it to end up in. Requests are assumed to have been + // Admit()-ed first. + DeductTokensFor(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens) + // DeductedTokensUpto returns the highest log position for which we've + // deducted flow tokens for, over the given stream. + DeductedTokensUpto(context.Context, Stream) kvflowcontrolpb.RaftLogPosition + // ReturnTokensUpto returns all previously deducted tokens of a given + // priority for all log positions less than or equal to the one specified. + // It does for the specific stream. Once returned, subsequent attempts to + // return tokens upto the same position or lower are no-ops. + ReturnTokensUpto(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream) + // ReturnAllTokensUpto is like ReturnTokensUpto but does so across all + // priorities. + // + // NB: This is used when a replica on the other end of a stream gets caught + // up via snapshot (say, after a log truncation), where we then don't expect + // dispatches for the individual AdmittedRaftLogEntries between what it + // admitted last and its latest RaftLogPosition. Another use is during + // successive lease changes (out and back) within the same raft term -- we + // want to both free up tokens from when we lost the lease, and also ensure + // that attempts to return them (on hearing about AdmittedRaftLogEntries + // replicated under the earlier lease), we discard the attempts. + ReturnAllTokensUpto(context.Context, kvflowcontrolpb.RaftLogPosition, Stream) + // Close closes the handle and returns all held tokens back to the + // underlying controller. Typically used when the replica loses its lease + // and/or raft leadership, or ends up getting GC-ed (if it's being + // rebalanced, merged away, etc). + Close(context.Context) +} + +// Dispatch is used (i) to dispatch information about admitted raft log entries +// to specific nodes, and (ii) to read pending dispatches. +type Dispatch interface { + DispatchWriter + DispatchReader +} + +// DispatchWriter is used to dispatch information about admitted raft log +// entries to specific nodes (typically where said entries originated, where +// flow tokens were deducted and waiting to be returned). +type DispatchWriter interface { + Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) +} + +// DispatchReader is used to read pending dispatches. It's used in the raft +// transport layer when looking to piggyback information on traffic already +// bound to specific nodes. It's also used when timely dispatching (read: +// piggybacking) has not taken place. +// +// NB: PendingDispatchFor is expected to remove dispatches from the pending +// list. If the GRPC stream we're sending it over happens to break, we drop +// these dispatches. The node waiting these dispatches is expected to react to +// the stream breaking by freeing up all held tokens. +type DispatchReader interface { + PendingDispatch() []roachpb.NodeID + PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel new file mode 100644 index 000000000000..0a2fd87eda43 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel @@ -0,0 +1,35 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "kvflowcontrolpb_proto", + srcs = ["kvflowcontrol.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"], +) + +go_proto_library( + name = "kvflowcontrolpb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + proto = ":kvflowcontrolpb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", # keep + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "kvflowcontrolpb", + srcs = ["raft_log_position.go"], + embed = [":kvflowcontrolpb_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + visibility = ["//visibility:public"], + deps = ["@com_github_cockroachdb_redact//:redact"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto new file mode 100644 index 000000000000..aa0589b5e085 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto @@ -0,0 +1,119 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.kv.kvserver.kvflowcontrol.kvflowcontrolpb; +option go_package = "kvflowcontrolpb"; + +import "gogoproto/gogo.proto"; + +// RaftAdmissionMeta contains information used by admission control for the +// select raft commands that use replication admission control. It contains a +// subset of the fields in kvserverpb.RaftCommand to selectively decode +// state[1]. When marshaling a RaftCommand, we willfully include this data in +// the prefix of the marshaled byte buffer. Information about whether this data +// is present is captured in the first byte of the encoded raft proposal -- see +// raftlog.EntryEncoding. +// +// [1]: The field tags and types must be kept identical with what's found there. +message RaftAdmissionMeta { + // AdmissionPriority of the command (maps to admission.WorkPriority); used + // within a tenant below-raft for replication admission control. + int32 admission_priority = 18; + // AdmissionCreateTime is equivalent to Time.UnixNano() at the creation time + // of the request, or a parent request, for which this command is a part of. + // It's used within a tenant below-raft for replication admission control; see + // admission.WorkInfo.CreateTime for details. + int64 admission_create_time = 19; + // AdmissionOriginNode captures where this raft command originated. It's used + // to inform said node of this raft command's (virtual) admission in order for + // it to release flow tokens for subsequent commands. + int32 admission_origin_node = 20 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + + // TODO(irfansharif): If the {marshaling,unmarshaling} performance overhead + // proves costly, we could: + // - For Admission{Priority,CreateTime}, pack them within a single int64 by + // using 8 bits for the priority (we're using an int8 in Go code) and the + // remaining bits for the create timestamp with lower resolution. + // - For AdmissionOriginNodeID, we could re-work the MultiRaft streaming RPCs + // to include upfront, during stream setup, which node the subsequent + // RaftMessageRequests are coming from. But this awkward to do with our + // current code layering: + // - We want to find out on a per raftpb.Entry level where it came from, and + // to do it once raft.Ready() tells to persist said entry into our raft log. + // - We're currently encoding this data in the raft entry itself, at the + // sender, so it's easy to decode at the right place in + // raft-ready-handling loop. + // - But if we had to "stitch in" the origin node ID once received off of + // the transport, or tie together raft entries with their origin node IDs + // through some other way (the raft library only wants to "step" through + // message type we can't so easily annotate), we'd have to do a fair bit + // of state tracking. + // If it's still too costly, we could rip all this out and coarsen + // intra-tenant ordering with respect to Admission{Priority,CreateTime}. We + // could instead introduce a WorkQueue-like ordering at the origin where + // requests wait for flow tokens for every it + // intends to write to. Below raft we could live with just side-loaded + // proposals being marked as admissionpb.BulkNormalPri. Origin-side ordering + // would work ok for epoch-LIFO. The coarseness comes from this re-ordering + // only happening on individual origin nodes. + // + // TODO(irfansharif): Get rid of this TODO block after simple performance + // benchmarks (say, `cockroach workload run kv` with high concurrency and + // small write sizes). The ideas above are too complicated. +} + +// AdmittedRaftLogEntries represents a set of raft log entries that were +// admitted below raft. These are identified by: +// - the range ID (there's one per raft group); +// - the admission priority of all said entries; +// - the (inclusive) raft log position up-to-which we've admitted entries; +// - the store ID on which these raft logs were admitted. +// +// This is used as part replication admission control to release, at the origin, +// the specific flow tokens acquired when replicating these log entries along +// this particular "replication stream" (i.e. flowing to a particular store, +// remote or otherwise). +message AdmittedRaftLogEntries { + // RangeID of the raft group these entries belong to. This is the range on + // whose behalf work was admitted. + int64 range_id = 1 [(gogoproto.customname) = "RangeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + + // AdmissionPriority of all admitted entries (maps to admissionpb.WorkPriority). + int32 admission_priority = 2; + + // UpToRaftLogPosition (inclusive) of the highest entry that was admitted. + // Within a given priority, admission takes place in raft log order (i.e. + // entries with lower terms get admitted first, or with lower indexes within + // the same term). So the value here implies admission of all entries that + // sort before and have the same priority. + RaftLogPosition up_to_raft_log_position = 3 [(gogoproto.nullable) = false]; + + // StoreID on which this raft log entry was admitted. + // + // TODO(irfansharif): We can avoid sending this for every logically admitted + // message if the raft transport stream we were sending it on had some + // handshake protocol at the start, where the client identified itself by its + // NodeID. That way the origin replica receiving this information can infer + // the StoreID where this work was done since we we never store multiple + // replicas of a range on the same {single,multi}-store node. + uint64 store_id = 4 [(gogoproto.customname) = "StoreID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"]; +} + +// RaftLogPosition is a point on the raft log, identified by a term and an +// index. +message RaftLogPosition { + option (gogoproto.goproto_stringer) = false; + + uint64 term = 1; + uint64 index = 2; +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go new file mode 100644 index 000000000000..c147618928fb --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go @@ -0,0 +1,42 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontrolpb + +import "github.com/cockroachdb/redact" + +func (p *RaftLogPosition) String() string { + return redact.StringWithoutMarkers(p) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (p *RaftLogPosition) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("position=%d/%d", p.Term, p.Index) +} + +// Equal returns whether the two raft log positions are identical. +func (p *RaftLogPosition) Equal(o RaftLogPosition) bool { + return p.Term == o.Term && p.Index == o.Index +} + +// Less returns whether the one raft log position is less than the other. Those +// with lower terms sort first, and barring that, those with lower indexes. +func (p *RaftLogPosition) Less(o RaftLogPosition) bool { + if p.Term != o.Term { + return p.Term < o.Term + } + return p.Index < o.Index +} + +// LessEq returns whether one raft log position is less than or equal to the +// other +func (p *RaftLogPosition) LessEq(o RaftLogPosition) bool { + return p.Less(o) || p.Equal(o) +} diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index c7259fa8dc0a..f2bf110a9e7c 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -34,6 +34,7 @@ proto_library( strip_import_prefix = "/pkg", visibility = ["//visibility:public"], deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto", "//pkg/kv/kvserver/readsummary/rspb:rspb_proto", "//pkg/roachpb:roachpb_proto", @@ -55,6 +56,7 @@ go_proto_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/closedts/ctpb", # keep + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/roachpb", diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index eb9e7cbd4c73..89acafa75709 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -358,6 +358,23 @@ message RaftCommand { // from" the proposer. map trace_data = 16; + // Fields used below-raft for replication admission control. See + // kvflowcontrolpb.RaftAdmissionMeta for how this data is selectively decoded. + // The field tags and types must be kept identical with what's found there. + + // AdmissionPriority of the command (maps to admission.WorkPriority); used + // within a tenant below-raft for replication admission control. + int32 admission_priority = 18; + // AdmissionCreateTime is equivalent to Time.UnixNano() at the creation time + // of the request (or a parent request) for which this command is a part of. + // It's used within a tenant below-raft for replication admission control; see + // admission.WorkInfo.CreateTime for details. + int64 admission_create_time = 19; + // AdmissionOriginNode captures where this raft command originated. It's used + // to inform said node of this raft command's (virtual) admission in order for + // it to release flow tokens for subsequent commands. + int32 admission_origin_node = 20 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + reserved 1, 2, 10001 to 10014; } diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4ec5d0288aed..229af24b873d 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -18,6 +18,7 @@ import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; +import "kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto"; import "raft/v3/raftpb/raft.proto"; import "gogoproto/gogo.proto"; import "util/tracing/tracingpb/recorded_span.proto"; @@ -91,6 +92,10 @@ message RaftMessageRequest { repeated RaftHeartbeat heartbeats = 6 [(gogoproto.nullable) = false]; repeated RaftHeartbeat heartbeat_resps = 7 [(gogoproto.nullable) = false]; + // AdmittedRaftLogEntries is coalesced information about all raft log entries + // that were admitted below raft. + repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedRaftLogEntries admitted_raft_log_entries = 11 [(gogoproto.nullable) = false]; + reserved 10; } diff --git a/pkg/kv/kvserver/raftlog/BUILD.bazel b/pkg/kv/kvserver/raftlog/BUILD.bazel index 710e68417394..628b21ea93d4 100644 --- a/pkg/kv/kvserver/raftlog/BUILD.bazel +++ b/pkg/kv/kvserver/raftlog/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/kv/kvserver/apply", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", @@ -29,6 +30,7 @@ go_library( go_test( name = "raftlog_test", srcs = [ + "encoding_test.go", "entry_bench_test.go", "entry_test.go", "iter_bench_test.go", @@ -38,12 +40,15 @@ go_test( embed = [":raftlog"], deps = [ "//pkg/keys", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/admission/admissionpb", "//pkg/util/hlc", + "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", diff --git a/pkg/kv/kvserver/raftlog/encoding.go b/pkg/kv/kvserver/raftlog/encoding.go index bf312d57f233..e0427b51ec9c 100644 --- a/pkg/kv/kvserver/raftlog/encoding.go +++ b/pkg/kv/kvserver/raftlog/encoding.go @@ -13,7 +13,9 @@ package raftlog import ( "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) // EntryEncoding enumerates the encodings used in CockroachDB for raftpb.Entry's @@ -148,3 +150,23 @@ func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase b[0] = enc.prefixByte() copy(b[1:], commandID) } + +// DecodeRaftAdmissionMeta decodes admission control metadata from a +// raftpb.Entry.Data. Expects an EntryEncoding{Standard,Sideloaded}WithAC +// encoding. +func DecodeRaftAdmissionMeta(data []byte) (kvflowcontrolpb.RaftAdmissionMeta, error) { + prefix := data[0] + if !(prefix == entryEncodingStandardWithACPrefixByte || prefix == entryEncodingSideloadedWithACPrefixByte) { + panic(fmt.Sprintf("invalid encoding: prefix %v", prefix)) + } + + // TODO(irfansharif): If the decoding overhead is noticeable, we can write a + // custom decoder and rely on the encoding for raft admission data being + // present at the start of the marshaled raft command. This could speed it + // up slightly. + var raftAdmissionMeta kvflowcontrolpb.RaftAdmissionMeta + if err := protoutil.Unmarshal(data[1+RaftCommandIDLen:], &raftAdmissionMeta); err != nil { + return kvflowcontrolpb.RaftAdmissionMeta{}, err + } + return raftAdmissionMeta, nil +} diff --git a/pkg/kv/kvserver/raftlog/encoding_test.go b/pkg/kv/kvserver/raftlog/encoding_test.go new file mode 100644 index 000000000000..0e5734d9d5bd --- /dev/null +++ b/pkg/kv/kvserver/raftlog/encoding_test.go @@ -0,0 +1,107 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftlog + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3/raftpb" +) + +// BenchmarkRaftAdmissionMetaOverhead measures the overhead of encoding/decoding +// raft metadata, compared to not doing it. It's structured similar to how raft +// command data is encoded + decoded end-to-end, including the optional +// below-raft admission control data, where steps (2) and (4) below are only +// done for proposals subject to below-raft admission. +// +// name old time/op new time/op delta +// RaftAdmissionMetaOverhead/bytes=1.0_KiB,raft-ac-10 1.30µs ± 1% 1.70µs ± 1% +30.43% (p=0.008 n=5+5) +// RaftAdmissionMetaOverhead/bytes=256_KiB,raft-ac-10 51.6µs ± 4% 50.6µs ± 5% ~ (p=0.421 n=5+5) +// RaftAdmissionMetaOverhead/bytes=512_KiB,raft-ac-10 91.9µs ± 4% 91.2µs ± 5% ~ (p=1.000 n=5+5) +// RaftAdmissionMetaOverhead/bytes=1.0_MiB,raft-ac-10 148µs ± 4% 151µs ± 5% ~ (p=0.095 n=5+5) +// RaftAdmissionMetaOverhead/bytes=2.0_MiB,raft-ac-10 290µs ± 3% 292µs ± 1% ~ (p=0.151 n=5+5) +func BenchmarkRaftAdmissionMetaOverhead(b *testing.B) { + defer log.Scope(b).Close(b) + + const KiB = 1 << 10 + const MiB = 1 << 20 + + for _, withRaftAdmissionMeta := range []bool{false, true} { + for _, bytes := range []int64{1 * KiB, 256 * KiB, 512 * KiB, 1 * MiB, 2 * MiB} { + var raftAdmissionMetaLen int + var raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta + entryEnc := EntryEncodingStandardWithoutAC + + raftCmd := mkRaftCommand(100, int(bytes), int(bytes+200)) + marshaledRaftCmd, err := protoutil.Marshal(raftCmd) + require.NoError(b, err) + + if withRaftAdmissionMeta { + raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(admissionpb.BulkNormalPri), + AdmissionCreateTime: 18581258253, + } + raftAdmissionMetaLen = raftAdmissionMeta.Size() + entryEnc = EntryEncodingStandardWithAC + } + + encodingBuf := make([]byte, RaftCommandPrefixLen+raftAdmissionMeta.Size()+len(marshaledRaftCmd)) + raftEnt := Entry{ + Entry: raftpb.Entry{ + Term: 1, + Index: 1, + Type: raftpb.EntryNormal, + Data: encodingBuf, + }, + } + + b.Run(fmt.Sprintf("bytes=%s,raft-ac=%t", humanizeutil.IBytes(bytes), withRaftAdmissionMeta), + func(b *testing.B) { + for i := 0; i < b.N; i++ { + // 1. Encode the raft command prefix. + EncodeRaftCommandPrefix(encodingBuf[:RaftCommandPrefixLen], entryEnc, "deadbeef") + + // 2. If using below-raft admission, encode the raft + // metadata right after the command prefix. + if withRaftAdmissionMeta { + _, err = protoutil.MarshalTo( + raftAdmissionMeta, + encodingBuf[RaftCommandPrefixLen:RaftCommandPrefixLen+raftAdmissionMetaLen], + ) + require.NoError(b, err) + } + + // 3. Marshal the rest of the command. + _, err = protoutil.MarshalTo(raftCmd, encodingBuf[RaftCommandPrefixLen+raftAdmissionMetaLen:]) + require.NoError(b, err) + + // 4. If using below-raft admission, decode the raft + // metadata. + if withRaftAdmissionMeta { + _, err = DecodeRaftAdmissionMeta(encodingBuf) + require.NoError(b, err) + } + + // 5. Decode the entire raft command. + require.NoError(b, raftEnt.load()) + } + }, + ) + } + } +} diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index d9a035428f0a..e8db8e8be5ae 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -131,6 +131,7 @@ package admission import ( "time" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" ) @@ -273,7 +274,7 @@ type granterWithIOTokens interface { setAvailableElasticDiskBandwidthTokensLocked(tokens int64) // getDiskTokensUsedAndResetLocked returns the disk bandwidth tokens used // since the last such call. - getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 + getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 // setAdmittedDoneModelsLocked supplies the models to use when // storeWriteDone is called, to adjust token consumption. Note that these // models are not used for token adjustment at admission time -- that is @@ -320,7 +321,7 @@ type CPULoadListener interface { // storeRequester is used to abstract *StoreWorkQueue for testing. type storeRequester interface { requesterClose - getRequesters() [numWorkClasses]requester + getRequesters() [admissionpb.NumWorkClasses]requester getStoreAdmissionStats() storeAdmissionStats setStoreRequestEstimates(estimates storeRequestEstimates) } diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index f856ed977e41..cd5f4a80dd0a 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -52,6 +52,44 @@ var WorkPriorityDict = map[WorkPriority]string{ HighPri: "high-pri", } +// WorkClass represents the class of work, which is defined entirely by its +// WorkPriority. Namely, everything less than NormalPri is defined to be +// "Elastic", while everything above and including NormalPri is considered +// "Regular. +type WorkClass int8 + +const ( + // RegularWorkClass is for work corresponding to workloads that are + // throughput and latency sensitive. + RegularWorkClass WorkClass = iota + // ElasticWorkClass is for work corresponding to workloads that can handle + // reduced throughput, possibly by taking longer to finish a workload. It is + // not latency sensitive. + ElasticWorkClass + // NumWorkClasses is the number of work classes. + NumWorkClasses +) + +// WorkClassFromPri translates a WorkPriority to its given WorkClass. +func WorkClassFromPri(pri WorkPriority) WorkClass { + class := RegularWorkClass + if pri < NormalPri { + class = ElasticWorkClass + } + return class +} + +func (w WorkClass) String() string { + switch w { + case RegularWorkClass: + return "regular" + case ElasticWorkClass: + return "elastic" + default: + return "" + } +} + // Prevent the linter from emitting unused warnings. var _ = LowPri var _ = TTLLowPri diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 49b179806346..c66c5c6da919 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -156,13 +156,13 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo // This is IO work, so override the usesTokens value. opts.usesTokens = true // TODO(sumeer): add per-store WorkQueue state for debug.zip and db console. - granters := [numWorkClasses]granterWithStoreWriteDone{ + granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ &kvStoreTokenChildGranter{ - workClass: regularWorkClass, + workClass: admissionpb.RegularWorkClass, parent: kvg, }, &kvStoreTokenChildGranter{ - workClass: elasticWorkClass, + workClass: admissionpb.ElasticWorkClass, parent: kvg, }, } @@ -170,8 +170,8 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, granters, sgc.settings, sgc.workQueueMetrics, opts) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() - kvg.regularRequester = requesters[regularWorkClass] - kvg.elasticRequester = requesters[elasticWorkClass] + kvg.regularRequester = requesters[admissionpb.RegularWorkClass] + kvg.elasticRequester = requesters[admissionpb.ElasticWorkClass] coord.granters[KVWork] = kvg coord.ioLoadListener = &ioLoadListener{ storeID: storeID, @@ -330,7 +330,7 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, granters [numWorkClasses]granterWithStoreWriteDone, + _ log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index acae136a3f97..49da834c509f 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -272,19 +272,6 @@ func (tg *tokenGranter) tryGrantLocked(grantChainID grantChainID) grantResult { return res } -type workClass int8 - -const ( - // regularWorkClass is for work corresponding to workloads that are - // throughput and latency sensitive. - regularWorkClass workClass = iota - // elasticWorkClass is for work corresponding to workloads that can handle - // reduced throughput, possibly by taking longer to finish a workload. It is - // not latency sensitive. - elasticWorkClass - numWorkClasses -) - // kvStoreTokenGranter implements granterWithLockedCalls. It is used for // grants to KVWork to a store, that is limited by IO tokens. It encapsulates // two granter-requester pairs, for the two workClasses. The granter in these @@ -317,7 +304,7 @@ type kvStoreTokenGranter struct { // Disk bandwidth tokens. elasticDiskBWTokensAvailable int64 - diskBWTokensUsed [numWorkClasses]int64 + diskBWTokensUsed [admissionpb.NumWorkClasses]int64 // Estimation models. l0WriteLM, l0IngestLM, ingestLM tokensLinearModel @@ -329,7 +316,7 @@ var _ granterWithIOTokens = &kvStoreTokenGranter{} // kvStoreTokenChildGranter handles a particular workClass. Its methods // pass-through to the parent after adding the workClass as a parameter. type kvStoreTokenChildGranter struct { - workClass workClass + workClass admissionpb.WorkClass parent *kvStoreTokenGranter } @@ -368,13 +355,13 @@ func (cg *kvStoreTokenChildGranter) storeWriteDone( return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo) } -func (sg *kvStoreTokenGranter) tryGet(workClass workClass, count int64) bool { +func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool { return sg.coord.tryGet(KVWork, count, int8(workClass)) } // tryGetLocked implements granterWithLockedCalls. func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grantResult { - wc := workClass(demuxHandle) + wc := admissionpb.WorkClass(demuxHandle) // NB: ideally if regularRequester.hasWaitingRequests() returns true and // wc==elasticWorkClass we should reject this request, since it means that // more important regular work is waiting. However, we rely on the @@ -383,13 +370,13 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant // elasticWorkClass is when the queue is empty, this case should be rare // (and not cause a performance isolation failure). switch wc { - case regularWorkClass: + case admissionpb.RegularWorkClass: if sg.availableIOTokens > 0 { sg.subtractTokens(count, false) sg.diskBWTokensUsed[wc] += count return grantSuccess } - case elasticWorkClass: + case admissionpb.ElasticWorkClass: if sg.elasticDiskBWTokensAvailable > 0 && sg.availableIOTokens > 0 { sg.elasticDiskBWTokensAvailable -= count sg.subtractTokens(count, false) @@ -400,31 +387,31 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant return grantFailLocal } -func (sg *kvStoreTokenGranter) returnGrant(workClass workClass, count int64) { +func (sg *kvStoreTokenGranter) returnGrant(workClass admissionpb.WorkClass, count int64) { sg.coord.returnGrant(KVWork, count, int8(workClass)) } // returnGrantLocked implements granterWithLockedCalls. func (sg *kvStoreTokenGranter) returnGrantLocked(count int64, demuxHandle int8) { - wc := workClass(demuxHandle) + wc := admissionpb.WorkClass(demuxHandle) // Return count tokens to the "IO tokens". sg.subtractTokens(-count, false) - if wc == elasticWorkClass { + if wc == admissionpb.ElasticWorkClass { // Return count tokens to the elastic disk bandwidth tokens. sg.elasticDiskBWTokensAvailable += count } sg.diskBWTokensUsed[wc] -= count } -func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass workClass, count int64) { +func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkClass, count int64) { sg.coord.tookWithoutPermission(KVWork, count, int8(workClass)) } // tookWithoutPermissionLocked implements granterWithLockedCalls. func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHandle int8) { - wc := workClass(demuxHandle) + wc := admissionpb.WorkClass(demuxHandle) sg.subtractTokens(count, false) - if wc == elasticWorkClass { + if wc == admissionpb.ElasticWorkClass { sg.elasticDiskBWTokensAvailable -= count } sg.diskBWTokensUsed[wc] += count @@ -462,7 +449,7 @@ func (sg *kvStoreTokenGranter) tryGrantLocked(grantChainID grantChainID) grantRe // First try granting to regular requester. for wc := range sg.diskBWTokensUsed { req := sg.regularRequester - if workClass(wc) == elasticWorkClass { + if admissionpb.WorkClass(wc) == admissionpb.ElasticWorkClass { req = sg.elasticRequester } if req.hasWaitingRequests() { @@ -519,7 +506,7 @@ func (sg *kvStoreTokenGranter) setAvailableElasticDiskBandwidthTokensLocked(toke } // getDiskTokensUsedAndResetLocked implements granterWithIOTokens. -func (sg *kvStoreTokenGranter) getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 { +func (sg *kvStoreTokenGranter) getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 { result := sg.diskBWTokensUsed for i := range sg.diskBWTokensUsed { sg.diskBWTokensUsed[i] = 0 @@ -538,7 +525,7 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModelsLocked( // storeWriteDone implements granterWithStoreWriteDone. func (sg *kvStoreTokenGranter) storeWriteDone( - wc workClass, originalTokens int64, doneInfo StoreWorkDoneInfo, + wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { // Normally, we follow the structure of a foo() method calling into a foo() // method on the GrantCoordinator, which then calls fooLocked() on the @@ -558,7 +545,7 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.coord.mu.Lock() exhaustedFunc := func() bool { return sg.availableIOTokens <= 0 || - (wc == elasticWorkClass && sg.elasticDiskBWTokensAvailable <= 0) + (wc == admissionpb.ElasticWorkClass && sg.elasticDiskBWTokensAvailable <= 0) } wasExhausted := exhaustedFunc() actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes) @@ -568,7 +555,7 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.subtractTokens(additionalL0TokensNeeded, false) actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes) additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens - if wc == elasticWorkClass { + if wc == admissionpb.ElasticWorkClass { sg.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded } sg.diskBWTokensUsed[wc] += additionalDiskBWTokensNeeded diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index 33a1f66d0456..3c4b5d019399 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -109,9 +109,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, granters [numWorkClasses]granterWithStoreWriteDone, + ambientCtx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { - makeTestRequester := func(wc workClass) *testRequester { + makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ workKind: KVWork, granter: granters[wc], @@ -120,18 +120,18 @@ func TestGranterBasic(t *testing.T) { returnValueFromGranted: 0, } switch wc { - case regularWorkClass: + case admissionpb.RegularWorkClass: req.additionalID = "-regular" - case elasticWorkClass: + case admissionpb.ElasticWorkClass: req.additionalID = "-elastic" } return req } req := &storeTestRequester{} - req.requesters[regularWorkClass] = makeTestRequester(regularWorkClass) - req.requesters[elasticWorkClass] = makeTestRequester(elasticWorkClass) - requesters[KVWork] = req.requesters[regularWorkClass] - requesters[numWorkKinds] = req.requesters[elasticWorkClass] + req.requesters[admissionpb.RegularWorkClass] = makeTestRequester(admissionpb.RegularWorkClass) + req.requesters[admissionpb.ElasticWorkClass] = makeTestRequester(admissionpb.ElasticWorkClass) + requesters[KVWork] = req.requesters[admissionpb.RegularWorkClass] + requesters[numWorkKinds] = req.requesters[admissionpb.ElasticWorkClass] return req }, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, @@ -287,15 +287,15 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, granters [numWorkClasses]granterWithStoreWriteDone, + ctx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { - reqReg := makeRequesterFunc(ctx, KVWork, granters[regularWorkClass], settings, metrics, opts) - reqElastic := makeRequesterFunc(ctx, KVWork, granters[elasticWorkClass], settings, metrics, opts) + reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) + reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} - str.requesters[regularWorkClass] = reqReg.(*testRequester) - str.requesters[regularWorkClass].additionalID = "-regular" - str.requesters[elasticWorkClass] = reqElastic.(*testRequester) - str.requesters[elasticWorkClass].additionalID = "-elastic" + str.requesters[admissionpb.RegularWorkClass] = reqReg.(*testRequester) + str.requesters[admissionpb.RegularWorkClass].additionalID = "-regular" + str.requesters[admissionpb.ElasticWorkClass] = reqElastic.(*testRequester) + str.requesters[admissionpb.ElasticWorkClass].additionalID = "-elastic" return str }, } @@ -393,13 +393,13 @@ func (tr *testRequester) continueGrantChain() { } type storeTestRequester struct { - requesters [numWorkClasses]*testRequester + requesters [admissionpb.NumWorkClasses]*testRequester } var _ storeRequester = &storeTestRequester{} -func (str *storeTestRequester) getRequesters() [numWorkClasses]requester { - var rv [numWorkClasses]requester +func (str *storeTestRequester) getRequesters() [admissionpb.NumWorkClasses]requester { + var rv [admissionpb.NumWorkClasses]requester for i := range str.requesters { rv[i] = str.requesters[i] } diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 23c89ad9714c..1f05b1695caf 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -405,8 +405,8 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics io.mu.Unlock() io.aux.diskBW.intervalLSMInfo = intervalLSMInfo{ incomingBytes: int64(cumLSMIncomingBytes) - int64(cumDiskBW.incomingLSMBytes), - regularTokensUsed: diskTokensUsed[regularWorkClass], - elasticTokensUsed: diskTokensUsed[elasticWorkClass], + regularTokensUsed: diskTokensUsed[admissionpb.RegularWorkClass], + elasticTokensUsed: diskTokensUsed[admissionpb.ElasticWorkClass], } if metrics.DiskStats.ProvisionedBandwidth > 0 { io.elasticDiskBWTokens = io.diskBandwidthLimiter.computeElasticTokens(ctx, diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 7d664c12210c..a62b14052e1c 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" @@ -151,11 +152,11 @@ func TestIOLoadListener(t *testing.T) { if d.HasArg("disk-bw-tokens-used") { var regularTokensUsed, elasticTokensUsed int d.ScanArgs(t, "disk-bw-tokens-used", ®ularTokensUsed, &elasticTokensUsed) - kvGranter.diskBandwidthTokensUsed[regularWorkClass] = int64(regularTokensUsed) - kvGranter.diskBandwidthTokensUsed[elasticWorkClass] = int64(elasticTokensUsed) + kvGranter.diskBandwidthTokensUsed[admissionpb.RegularWorkClass] = int64(regularTokensUsed) + kvGranter.diskBandwidthTokensUsed[admissionpb.ElasticWorkClass] = int64(elasticTokensUsed) } else { - kvGranter.diskBandwidthTokensUsed[regularWorkClass] = 0 - kvGranter.diskBandwidthTokensUsed[elasticWorkClass] = 0 + kvGranter.diskBandwidthTokensUsed[admissionpb.RegularWorkClass] = 0 + kvGranter.diskBandwidthTokensUsed[admissionpb.ElasticWorkClass] = 0 } var printOnlyFirstTick bool if d.HasArg("print-only-first-tick") { @@ -337,7 +338,7 @@ var _ storeRequester = &testRequesterForIOLL{} func (r *testRequesterForIOLL) close() {} -func (r *testRequesterForIOLL) getRequesters() [numWorkClasses]requester { +func (r *testRequesterForIOLL) getRequesters() [admissionpb.NumWorkClasses]requester { panic("unimplemented") } @@ -352,7 +353,7 @@ func (r *testRequesterForIOLL) setStoreRequestEstimates(estimates storeRequestEs type testGranterWithIOTokens struct { buf strings.Builder allTokensUsed bool - diskBandwidthTokensUsed [numWorkClasses]int64 + diskBandwidthTokensUsed [admissionpb.NumWorkClasses]int64 } var _ granterWithIOTokens = &testGranterWithIOTokens{} @@ -370,7 +371,7 @@ func (g *testGranterWithIOTokens) setAvailableElasticDiskBandwidthTokensLocked(t tokensForTokenTickDurationToString(tokens)) } -func (g *testGranterWithIOTokens) getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 { +func (g *testGranterWithIOTokens) getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 { return g.diskBandwidthTokensUsed } @@ -410,8 +411,8 @@ func (g *testGranterNonNegativeTokens) setAvailableElasticDiskBandwidthTokensLoc require.LessOrEqual(g.t, int64(0), tokens) } -func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndResetLocked() [numWorkClasses]int64 { - return [numWorkClasses]int64{} +func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndResetLocked() [admissionpb.NumWorkClasses]int64 { + return [admissionpb.NumWorkClasses]int64{} } func (g *testGranterNonNegativeTokens) setAdmittedDoneModelsLocked( diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 35e20f995806..be9b4fabc3d4 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -1644,10 +1644,10 @@ type StoreWriteWorkInfo struct { // StoreWorkQueue is responsible for admission to a store. type StoreWorkQueue struct { - q [numWorkClasses]WorkQueue + q [admissionpb.NumWorkClasses]WorkQueue // Only calls storeWriteDone. The rest of the interface is used by // WorkQueue. - granters [numWorkClasses]granterWithStoreWriteDone + granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone mu struct { syncutil.RWMutex estimates storeRequestEstimates @@ -1662,7 +1662,7 @@ type StoreWorkHandle struct { tenantID roachpb.TenantID // The writeTokens acquired by this request. Must be > 0. writeTokens int64 - workClass workClass + workClass admissionpb.WorkClass admissionEnabled bool } @@ -1680,10 +1680,7 @@ func (q *StoreWorkQueue) Admit( ctx context.Context, info StoreWriteWorkInfo, ) (handle StoreWorkHandle, err error) { // For now, we compute a workClass based on priority. - wc := regularWorkClass - if info.Priority < admissionpb.NormalPri { - wc = elasticWorkClass - } + wc := admissionpb.WorkClassFromPri(info.Priority) h := StoreWorkHandle{ tenantID: info.TenantID, workClass: wc, @@ -1732,7 +1729,7 @@ func (q *StoreWorkQueue) BypassedWorkDone(workCount int64, doneInfo StoreWorkDon q.updateStoreAdmissionStats(uint64(workCount), doneInfo, true) // Since we have no control over such work, we choose to count it as // regularWorkClass. - _ = q.granters[regularWorkClass].storeWriteDone(0, doneInfo) + _ = q.granters[admissionpb.RegularWorkClass].storeWriteDone(0, doneInfo) } // StatsToIgnore is called for range snapshot ingestion -- see the comment in @@ -1767,8 +1764,8 @@ func (q *StoreWorkQueue) SetTenantWeights(tenantWeights map[uint64]uint32) { } // getRequesters implements storeRequester. -func (q *StoreWorkQueue) getRequesters() [numWorkClasses]requester { - var result [numWorkClasses]requester +func (q *StoreWorkQueue) getRequesters() [admissionpb.NumWorkClasses]requester { + var result [admissionpb.NumWorkClasses]requester for i := range q.q { result[i] = &q.q[i] } @@ -1795,7 +1792,7 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, - granters [numWorkClasses]granterWithStoreWriteDone, + granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index 6b97e7509af0..0296c442a7ad 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -454,13 +454,13 @@ func TestPriorityStates(t *testing.T) { }) } -func tryScanWorkClass(t *testing.T, d *datadriven.TestData) workClass { - wc := regularWorkClass +func tryScanWorkClass(t *testing.T, d *datadriven.TestData) admissionpb.WorkClass { + wc := admissionpb.RegularWorkClass if d.HasArg("elastic") { var b bool d.ScanArgs(t, "elastic", &b) if b { - wc = elasticWorkClass + wc = admissionpb.ElasticWorkClass } } return wc @@ -487,7 +487,7 @@ func TestStoreWorkQueueBasic(t *testing.T) { } } defer closeFn() - var tg [numWorkClasses]*testGranter + var tg [admissionpb.NumWorkClasses]*testGranter var wrkMap workMap var buf builderWithMu var st *cluster.Settings @@ -495,7 +495,7 @@ func TestStoreWorkQueueBasic(t *testing.T) { q.mu.Lock() defer q.mu.Unlock() return fmt.Sprintf("regular workqueue: %s\nelastic workqueue: %s\nstats:%+v\nestimates:%+v", - q.q[regularWorkClass].String(), q.q[elasticWorkClass].String(), q.mu.stats, + q.q[admissionpb.RegularWorkClass].String(), q.q[admissionpb.ElasticWorkClass].String(), q.mu.stats, q.mu.estimates) } @@ -506,18 +506,18 @@ func TestStoreWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg[regularWorkClass] = &testGranter{name: " regular", buf: &buf} - tg[elasticWorkClass] = &testGranter{name: " elastic", buf: &buf} + tg[admissionpb.RegularWorkClass] = &testGranter{name: " regular", buf: &buf} + tg[admissionpb.ElasticWorkClass] = &testGranter{name: " elastic", buf: &buf} opts := makeWorkQueueOptions(KVWork) opts.usesTokens = true opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), - [numWorkClasses]granterWithStoreWriteDone{tg[regularWorkClass], tg[elasticWorkClass]}, + [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, st, metrics, opts).(*StoreWorkQueue) - tg[regularWorkClass].r = q.getRequesters()[regularWorkClass] - tg[elasticWorkClass].r = q.getRequesters()[elasticWorkClass] + tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] + tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() return ""