Skip to content

Commit

Permalink
kvflowcontrol,raftlog: interfaces for replication control
Browse files Browse the repository at this point in the history
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of #79215, settling on #83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of #95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     TrackLowWater(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
  • Loading branch information
irfansharif committed Jan 25, 2023
1 parent 4bcb8c7 commit 9ef9864
Show file tree
Hide file tree
Showing 26 changed files with 893 additions and 96 deletions.
3 changes: 2 additions & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@
/pkg/kv/kvserver/gc/ @cockroachdb/kv-prs
/pkg/kv/kvserver/idalloc/ @cockroachdb/kv-prs
/pkg/kv/kvserver/intentresolver/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvadmission/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvadmission/ @cockroachdb/admission-control
/pkg/kv/kvserver/kvflowcontrol/ @cockroachdb/admission-control
/pkg/kv/kvserver/kvserverbase/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs
Expand Down
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/intentresolver:intentresolver",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvadmission:kvadmission",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/kvstorage:kvstorage",
Expand Down Expand Up @@ -2587,6 +2589,8 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/idalloc:get_x_data",
"//pkg/kv/kvserver/intentresolver:get_x_data",
"//pkg/kv/kvserver/kvadmission:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/kvstorage:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ PROTOBUF_SRCS = [
"//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto",
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
"//pkg/kv/kvserver/concurrency/poison:poison_go_proto",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto",
"//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@io_etcd_go_raft_v3//raftpb",
],
)

Expand Down
25 changes: 18 additions & 7 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"go.etcd.io/raft/v3/raftpb"
)

// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted
Expand Down Expand Up @@ -81,6 +82,14 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting(
true,
)

// ProvisionedBandwidth set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()

// Controller provides admission control for the KV layer.
type Controller interface {
// AdmitKVWork must be called before performing KV work.
Expand Down Expand Up @@ -108,6 +117,9 @@ type Controller interface {
// replicated to a raft follower, that have not been subject to admission
// control.
FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes)
// AdmitRaftEntry informs admission control of a raft log entry being
// written to storage.
AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
}

// TenantWeightProvider can be periodically asked to provide the tenant
Expand Down Expand Up @@ -394,13 +406,12 @@ func (n *controllerImpl) FollowerStoreWriteBytes(
followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo)
}

// ProvisionedBandwidth set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()
// AdmitRaftEntry implements the Controller interface.
func (n *controllerImpl) AdmitRaftEntry(
roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry,
) {
panic("unimplemented")
}

// FollowerStoreWriteBytes captures stats about writes done to a store by a
// replica that is not the leaseholder. These are used for admission control.
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "kvflowcontrol",
srcs = [
"doc.go",
"kvflowcontrol.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
],
)

get_x_data(name = "get_x_data")
Loading

0 comments on commit 9ef9864

Please sign in to comment.