From 2611320aba28e3eaca601f6e3614c73562a80af5 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Mon, 19 Dec 2022 10:19:55 -0500 Subject: [PATCH 1/6] go.mod: bump Pebble to 6ca7956d84c1 ``` 6ca7956d db: tweak NextPrefix semantics in prefix-iteration mode 7fced9a5 metamorphic: fix NextPrefix integration 33377d59 metamorphic: fix NextPrefix integration ``` Release note: none --- DEPS.bzl | 6 +++--- build/bazelutil/distdir_files.bzl | 2 +- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 562bb1358fbf..a565c43e84a3 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1485,10 +1485,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "0401364025128ab1d3d592293d0c3e1109eb6b50f6c30bb948c5743e7200e9e1", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20221212215000-0893071d8a52", + sha256 = "8e17511f72ac42694d17f2fc293a484a6f243d0ea6bb592120ca9424bce3dbfb", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20221219140626-6ca7956d84c1", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221212215000-0893071d8a52.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221219140626-6ca7956d84c1.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index d591ccba0b60..9742697db933 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -193,7 +193,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/google-api-go-client/com_github_cockroachdb_google_api_go_client-v0.80.1-0.20221117193156-6a9f7150cb93.zip": "b3378c579f4f4340403038305907d672c86f615f8233118a8873ebe4229c4f39", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20211118104740-dabe8e521a4f.zip": "1972c3f171f118add3fd9e64bcea6cbb9959a3b7fa0ada308e8a7310813fea74", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221212215000-0893071d8a52.zip": "0401364025128ab1d3d592293d0c3e1109eb6b50f6c30bb948c5743e7200e9e1", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20221219140626-6ca7956d84c1.zip": "8e17511f72ac42694d17f2fc293a484a6f243d0ea6bb592120ca9424bce3dbfb", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.3.zip": "7778b1e4485e4f17f35e5e592d87eb99c29e173ac9507801d000ad76dd0c261e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/sentry-go/com_github_cockroachdb_sentry_go-v0.6.1-cockroachdb.2.zip": "fbb2207d02aecfdd411b1357efe1192dbb827959e36b7cab7491731ac55935c9", diff --git a/go.mod b/go.mod index 66159f8dd102..dd4fef1d9d7f 100644 --- a/go.mod +++ b/go.mod @@ -112,7 +112,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.19.0 github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f - github.com/cockroachdb/pebble v0.0.0-20221212215000-0893071d8a52 + github.com/cockroachdb/pebble v0.0.0-20221219140626-6ca7956d84c1 github.com/cockroachdb/redact v1.1.3 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index 943c0cfd4430..a26a5ca244f2 100644 --- a/go.sum +++ b/go.sum @@ -474,8 +474,8 @@ github.com/cockroachdb/gostdlib v1.19.0/go.mod h1:+dqqpARXbE/gRDEhCak6dm0l14AaTy github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74= github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= -github.com/cockroachdb/pebble v0.0.0-20221212215000-0893071d8a52 h1:kT8xHygytiJSQpqKv01NpGMG3ZdTlCjr9lW5K5RDh+k= -github.com/cockroachdb/pebble v0.0.0-20221212215000-0893071d8a52/go.mod h1:JsehdjcR1QgLZkqBeYrbVdE3cdxbdrycA/PN+Cg+RNw= +github.com/cockroachdb/pebble v0.0.0-20221219140626-6ca7956d84c1 h1:blSuAUKDTqzi+l1WP0RvB2uh4AZYLuAKUm7Xc6e2G68= +github.com/cockroachdb/pebble v0.0.0-20221219140626-6ca7956d84c1/go.mod h1:JsehdjcR1QgLZkqBeYrbVdE3cdxbdrycA/PN+Cg+RNw= github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd h1:KFOt5I9nEKZgCnOSmy8r4Oykh8BYQO8bFOTgHDS8YZA= From e49ed8984cc4243a5de89bfeb775a95e88a69e52 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Mon, 19 Dec 2022 10:28:14 -0500 Subject: [PATCH 2/6] scripts: adapt bump-pebble.sh for removal of vendor Remove all references to the vendor directory in the bump-pebble script. Release note: None Epic: None --- scripts/bump-pebble.sh | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/scripts/bump-pebble.sh b/scripts/bump-pebble.sh index 94547c335cc3..0064712dfeeb 100755 --- a/scripts/bump-pebble.sh +++ b/scripts/bump-pebble.sh @@ -72,8 +72,7 @@ popd COCKROACH_BRANCH="$USER/pebble-${BRANCH}-${NEW_SHA:0:12}" -# Pull in the Pebble module at the desired SHA and rebuild the vendor -# directory. +# Pull in the Pebble module at the desired SHA. pushd "$COCKROACH_DIR" go get "github.com/cockroachdb/pebble@${NEW_SHA}" go mod tidy @@ -83,10 +82,9 @@ popd pushd "$COCKROACH_DIR" ./dev generate bazel --mirror git add go.mod go.sum DEPS.bzl build/bazelutil/distdir_files.bzl -git add vendor git branch -D "$COCKROACH_BRANCH" || true git checkout -b "$COCKROACH_BRANCH" -git commit -m "vendor: bump Pebble to ${NEW_SHA:0:12} +git commit -m "go.mod: bump Pebble to ${NEW_SHA:0:12} $COMMITS From 99a5df94bed1803b2089d6b4e2c7984e65bf0a4f Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 12 Nov 2020 14:01:09 -0500 Subject: [PATCH 3/6] rfc: closed timestamps v2 RFC We need a more flexible closed timestamps mechanism because the Non-blocking Transactions project will use two different closed timestamps policies for different ranges. The existing mechanism closes a single timestamps for all the ranges for which a node is the leaseholder, which is not good enough. Besides, the existing mechanism is quite complex. The alternative proposed by this RFC seems significantly simpler. Release note: None --- docs/RFCS/20191108_closed_timestamps_v2.md | 581 +++++++++++++++++++++ 1 file changed, 581 insertions(+) create mode 100644 docs/RFCS/20191108_closed_timestamps_v2.md diff --git a/docs/RFCS/20191108_closed_timestamps_v2.md b/docs/RFCS/20191108_closed_timestamps_v2.md new file mode 100644 index 000000000000..e4217bad3e99 --- /dev/null +++ b/docs/RFCS/20191108_closed_timestamps_v2.md @@ -0,0 +1,581 @@ +- Feature Name: Log-Driven Closed Timestamps +- Status: implemented +- Start Date: 2020-11-13 +- Authors: Andrei Matei, Nathan VanBenschoten +- RFC PR: [56675](https://github.com/cockroachdb/cockroach/pull/56675) +- Cockroach Issue: None + +# Summary + +This RFC discusses changing the way in which closed timestamps are published +(i.e. communicated from leaseholders to followers). It argues for replacing the +current system, where closed timestamps are tracked per store and communicated +out of band from replication, and migrating to a hybrid scheme where, for +“active” ranges, closed timestamps are piggy-backed on Raft commands, and +"inactive" ranges continue to use a separate (but simpler) out-of-band/coalesced +mechanism. + +This RFC builds on, and contrasts itself, with the current system. Familiarity +with [the status +quo](https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20180603_follower_reads.md) +is assumed. + +# Motivation + +The Non-blocking Transactions project requires us to have at least two +categories of ranges with different policies for closing timestamps. The current +system does not allow this flexibility. Even regardless of the new need, having +a per-range closed timestamp was already something we've repeatedly wanted. The +existing system has also proven to be subtle and hard to understand; this +proposal seems significantly simpler which would be a big win in itself. + +## Simplicity + +- The existing closed timestamp subsystem is too hard to understand. This +proposal will make it more straightforward by inverting the relationship between +closed timestamps and log entries. Instead of closed timestamp information +pointing to log entries, log entries will contain closedts information. +- Closed timestamps will no longer be tied to liveness epochs. Currently, +followers need to reconcile the closed timestamps they receive with the current +lease (i.e. does the sender still own the lease for each of the ranges it's +publishing closed timestamps for?); a closed timestamp only makes sense when +coupled with a particular lease. This proposal gets rid of that; closed +timestamps would simply be associated with log positions. The association of +closed timestamps to leases has resulted in [a subtle bug](https://github.com/cockroachdb/cockroach/issues/48553). + - This is allowed by basing everything off of applied state (i.e. commands + that the leaseholder knows have applied / will apply) because command + application already takes into account lease validity using lease sequences. +- The Raft log becomes the transport for closed timestamps on active ranges, eliminating +the need for a separate transport. The separate transport remains for inactive +(inactive is pretty similar to quiesced) ranges, but it will be significantly +simpler because: + - The log positions communicated by the side-transport always refer to + applied state, not prospective state. The fact that currently they might + refer prospective state (from the perspective of the follower) is a + significant source of complexity. + - The side-transport only deals with ranges without in-flight proposals. + - The side-transport would effectively increase the closed timestamp of the + last applied Raft entry without going through consensus. + - The entries coming from the side-transport don't need to be stored + separately upon reception, eliminating the need for + `pkg/kv/kvserver/closedts/storage`. +- No ["Lagging followers" +problem](https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20180603_follower_reads.md#lagging-followers). +Closed-timestamps are immediately actionable by followers receiving them +(because they refer to state the followers have just applied, not to prospective +state). +- The proposal avoids closed timestamp stalls when MLAI is never reached due to +failed proposal and then quiescence. +- [Disabling closed timestamps during +merges](https://github.com/cockroachdb/cockroach/pull/50265) becomes much +simpler. The current solution is right on the line between really clever and +horrible - because updates for many ranges are bundled together, we rely on +publishing a funky closed timestamp with a LAI that can only be reached if the +merge fails. +- Propagating the closed timestamps to the RHS on splits becomes easier. +Currently, this happens below Raft - each replica independently figures out an +`initialMaxClosed` value suitable for the new range. With this proposal, the +RHS's initial closed will simply be the closed ts carried by the command +corresponding to the `EndTxn` with the `SplitTrigger`. +- We can stop resetting the timestamp cache when a new lease is applied, and +instead simply rely on the closed timestamp check. Currently, we need both; the +closed timestamp check is not sufficient because the lease's start timestamp is +closed asynchronously wrt applying the lease. In this proposal, leases will +naturally carry (implicitly) their start as a closed timestamp. + + +## Flexibility + +- The proposal makes the closed timestamp entirely a per-range attribute, +eliminating the confusing relationship between the closed timestamp and the +store owning the lease for a range. Each range evolves independently, +eliminating cross-range dependencies (i.e. a request that's slow to evaluate no +longer holds up the closing of timestamps for other ranges) and letting us set +different policies for different ranges. In particular, for the Non-blocking +Transactions project, we'll have ranges with a policy of closing out *future* +timestamps. +- Ranges with expiration-based leases can now start to use closed timestamps +too. Currently, only epoch-based leases are compatible with closed timestamps, +because a closed timestamp is tied to a lease epoch. +- Closed timestamps will become durable and won't disappear across node +restarts, because they'll be part of the Range Applied State key. + - This is important for [recovery from insufficient + quorum](https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20180603_follower_reads.md#recovery-from-insufficient-quorum). + With the existing mechanism, it's unclear how a consistent closed timestamp + can be determined across multiple ranges. With the new proposal, it becomes + trivial. +- The proposal will make it easier for follower reads to wait when they can't be +immediately satisfied, instead of being rejected and forwarded to the +leaseholder immediately. We don't currently have a way for a follower read that +fails due to the follower's closed timestamp to queue up and wait for the +follower to catch up. This would be difficult to implement today because of how +closed timestamp entries are only understandable in the context of a lease. +Keeping these separate and having a clear, centralized location where a +replica's closed timestamp increases would make this kind of queue much easier +to build. + +# Design + +A concept fundamental to this proposal is that closed timestamps become +associated with Raft commands and their LAIs. Each command carries a closed +timestamp that says: whoever applies this command *c1* with closed ts *ts1* is +free to serve reads at timestamps <= *ts1* on the respective range because, +given that *c1* applied, all further applying commands perform writes at +timestamps > *ts1*. + +Closed timestamps carried by commands that ultimately are rejected are +inconsequential. This is a subtle, but important difference from the existing +infrastructure. Currently, an update says something like "if a follower gets +this update (epoch,MLAI), and if, at the time when it gets it, the sender is +still the leaseholder, then the receiver will be free to use this closed +timestamp as soon as replication catches up to MLAI". The current proposal does +away with the lease part - the receiver no longer concerns itself with the lease +status. Referencing leases in the current system is necessary because, at the +time when the provider sends a closed timestamp update, that update references a +command that may or may not ultimately apply. If it doesn't apply, then the +epoch is used by the receiver to discern that the update is not actionable. In +this proposal, leases no longer play a role because updates refer to committed +commands, not to uncertain commands. + + +### Splits and Merges + +The Right-Hand Side (RHS) of a split will start with the closed timestamps carried +by the command that creates the range (i.e. the `EndTxn` with the `SplitTrigger`). +We've considered choosing an initial closed timestamp for the RHS during +evaluation and putting it in the `SplitTrigger`. The problem with that is that +it'd allow the closed timestamp to regress for the RHS once the split is +applied, which would allow follower reads to use inconsistent snapshots across +ranges. Consider: +1. Say we're splitting the range `a-c` into `a-b` and `b-c`. The `AdminSplit` is + evaluated. The `SplitTrigger` is computed to contain a closed ts = `10`. +2. After the `AdminSplit` is evaluated, but before the corresponding command + (`c1`) is sequenced by the proposal buffer, another request evaluates and its + proposal gets sequenced with a closed ts of `20`. +3. `c1` applies. Then the split applies on some, but not, all replicas. So now, + the replicas that have applied the split have a closed ts of `20` for `a-b` and + a closed ts of `10` for `b-c`. The replicas that have not applied it have a + closed ts of `20` for `a-c`. +4. A transaction now writes at ts `15` to keys `b` and, say, `x`. Say that the + follower who hasn't yet applied the split continues to not apply it for a while, + and thus also doesn't apply the write to `b`. +5. Now a read is performed across `b` and `x` @ `16`. The read on `b` can be + served by a non-split follower because it's below (what it believes to be) the + relevant closed timestamp. This means that the read might fail to see the write in question, + which is no good - in particular because the read could see the write on `x`. + +For merges, the closed timestamp after the merge is applied will be the one of +the LHS. This technically allows for a regression in the RHS's closed timestamp +(i.e. the RHS might have been frozen at a time when its closed timestamps is 20 +and the merge is completed when the closed ts of the LHS is 10), but this will +be inconsequential because writes to the old RHS keyrange between 10 and 20 will +be dissalowed by the timestamp cache: we'll initialize the timestamp cache of +the RHS keyrange to the freeze time (currently we only muck with the RHS's +timestamp cache if the leases for the LHS and RHS were owned by different +stores). + +### LAIs vs. Raft log indexes + +Like the current implementation, this proposal would operate in terms of LAIs, +not of Raft log indexes. Each command will carry a closed timestamp, +guaranteeing that further *applied* commands write at higher timestamps. We +can't perfectly control the order in which commands are applied to the log +because proposing at index *i* does not mean that the entry cannot be appended +to the log at a higher position in situations where the leaseholder (i.e. the +proposer) is different from the Raft leader. This kind of reordering of commands +in the log means that, even though the proposer increases the closed timestamps +monotonically, the log entries aren't always monotonic. However, applying +commands still have monotonic closed timestamps because reodered commands don't +apply (courtesy of the LAI protection). + +The fact that a log entry's closed timestamp is only valid if the entry passes +the LAI check indicates that closed timestamps conceptually live in the lease +domain, not in the log domain, and so we think of closed timestamps being +associated with a LAI, not with a log index position. Another consideration is +the fact that there are Raft entries that don't correspond to evaluated requests +(things like leader elections and quiescence messages). These entries won't +carry a closed timestamp. + +The LAI vs log index distinction is inconsequential in the case of entries +carrying their own closed timestamp, but plays a role in the design of the +side-transport: that transport needs to identify commands somehow, and it will +do so by LAI. + +We'll now first describe how closed timestamps are propagated, and then we'll +discuss how timestamps are closed. + +## Closed timestamp transport and storage + +For "active" ranges, closed timestamps will be carried by Raft commands. Each +proposal carries the range's closed timestamp (at the time of the proposal). If +the proposal is accepted, it will apply on each replica. At application time, +each replica will thus become aware of a new closed timestamp. The one exception +are lease request (including lease transfers): these proposals will not carry a +closed timestamp explicitly. Instead, the lease start time will act as the +closed timestamp associated with that proposal. This allows us to get rid of the +reset of the tscache that we currently perform when a new lease is applied; +writes can simply check against the closed timestamp. + +One thing to note is that closed ts are monotonic below Raft, so when applying +commands a replica can blindly overwrite its known closed timestamp. Another +thing to note is that, while a range is active, the close-frequency parameter of +closed timestamps (i.e. the `kv.closed_timestamp.close_fraction` cluster +setting) doesn't matter. Closed timestamp publishing is more continuous, rather +than the discreet increments dictated by the current, periodic closing. The +periodic closing will still happen for inactive ranges, though. + +Each replica will write the closed timestamp it knows about in a new field part +of the `RangeStateAppliedKey` - the range-local key maintaining the applied LAI +and the range stats, among others. This key is already being written to on every +command application (because of the stats). Snapshots naturally include the +`RangeStateAppliedKey`, so new replicas will be initialized with a closed +timestamp. Splits will initialize the RHS by propagating the RHS's closed +timestamp through the `SplitTrigger` structure (which btw will be a lot cleaner +than the way in which we currently prevent closed ts regressions on the RHS - a +below-Raft mechanism). + +We've talked about "active/inactive" ranges, but haven't defined them. A range +is inactive at a point in time if there's no inflight write requests for it +(i.e. no writes being evaluated at the moment and no proposals in-flight). The +leaseholder can verify this condition. If there's no lease, then no replica can +verify the condition by itself, which will result in the range not advancing its +closed timestamp (that's also the current situation); the next request will +cause a lease acquisition, at which point timestamps will start being closed +again. When a leaseholder detects a range to be inactive, it can choose to bump +its closed timestamp however it wants as long as it stays below the lease expiration +period (as long as it promises to not accept new writes below it). Each node +will periodically scan its ranges for inactive ones (in reality, it will only +consider ranges that have been inactive for a little while) and, when detecting +a new one, it will move it over to the set of inactive ranges for which closed +timestamps are published periodically through a side-transport (see below). An +exception is a subsumed range, which cannot have its closed timestamp advanced +(because that might cause an effecting closed ts regression once the LHS of the +respective merge takes over the key space). + +When the leaseholder for an inactive range starts evaluating a write request, +the range becomes active again, and so goes back to having its closed timestamps +transported by Raft proposals. + +The inactivity notion is similar to quiescence, but otherwise distinct from it +(there's no reason to tie the two). Quiescence is initiated on each range's Raft +ticker (ticking every 200ms); we may or may not reuse this for detecting +inactivity. 200ms might prove too high of a duration for non-blocking ranges, +where this duration needs to be factored in when deciding how far in future a +non-blocking txn writes. + +### Side-transport for inactive ranges + +As hinted above, each node will maintain a set of inactive ranges for which +timestamps are to be closed periodically and communicated to followers through a +dedicated streaming RPC. Each node will have a background goroutine scanning for +newly-inactive ranges to add to the set, and ranges for which the lease is not +owned any more to remove from the set (updates for a range are only sent for +timestamps at which the sender owns a valid lease). Writes remove ranges from +the set, as do lease expirations. Periodically, the node closes new timestamps +for all the inactive ranges. The exact timestamp that is closed depends on each +individual range's policy, but all ranges with the same policy are bumped to the +same timestamp (for compression purposes, see below). For "normal" ranges, that +timestamp will be `now() - kv.closed_timestamp.target_duration`. + +Once closed, these timestamps need to be communicated to the followers of the +respective ranges, and associated with a Raft entries. We don't want to +communicate them using the normal mechanism (Raft messages) because we don't +want to send messages on otherwise inactive ranges - in particular, such +messages would unquiesce the range - thus defeating the point of quiescence. So, +we'll use another mechanism: a streaming RPC between each provider node and +subscriber node. This stream would carry compressed closed timestamp updates for +the sender's set of inactive ranges, grouped by the ranges' closed ts policy. In +effect, each node will broadcast information about all of its inactive ranges to +every other node (thus every recipient will only be interested in some of the +updates - namely the ones for which it has a replica). The sender doesn't try to +customize messages for each recipient. + +The first message on each stream will contain full information about the groups: +```json +groups: [ + { + group id: g1, + members: [{range id: 1, log index: 1}, {range id: 2, log index: 2}], + timestamp: 100, + }, + { + group id: g2, + members: [{range id: 3, log index: 3}, {range id: 4, log index: 4}], + timestamp: 200, + }, + ... +] +``` +Inactive ranges are grouped by their closed ts policy and the members of each +group are specified. For each range, the last LAI (as of the time when the +update is sent) is specified - that's the entry that the closed timestamp refers +to. The receiver of this update will bump the closed timestamp associated with +that LAI to the respective group's new closed timestamp. + +After the initial message on a stream, following messages are deltas of the form: +```json +groups: [ + { + group id: g1, + removals: [{range id: 1, log index: 1}], + additions: [{range id: 5, log index: 5}], + timestamp: 200, + }, + ... +] +``` + +The protocol takes advantage of the fact that the streaming transport doesn't +lose messages. + +It may seem that this side-transport is pretty similar to the existing +transport. The fact that it only applies to inactive ranges results in a big +simplification for the receiver: the receiver doesn't need to be particularly +concerned that it hasn't yet applied the log position referenced by the update - +and thus doesn't need to worry that overwriting the `` pair that it +is currently using for the range with this newer info will result in the +follower not being to serve follower-reads at any timestamp until replication +catches up. The current implementation has to maintain a history of closed ts +updates per range for this reason, and all that can go away. + +The recipient of an update will iterate through all the update's ranges and +increment the respective's replica's (if any) closed timestamp. A case to +consider is when the follower hasn't yet applied the LAI referenced by the +update; this is hopefully uncommon, since updates refer to inactive ranges so +replication should generally be complete. When an update with a not-yet-applied +LAI is received, the replica's closed timestamp cannot simply be bumped, so +we'll just ignore such an update (beyond still processing the respective range's +addition to an inactive group). An alternative that was considered is to have +the replica store this prospective update in a dedicated structure, which could +be consulted on command application; when the expected LAI finally applies, the +command application could also bump the closed timestamp and clear the +provisional update structure. We opted against this complication as it would +only provide momentary improvements: without doing it, it takes an extra message +from the side transport after the range's replication catches up to bump the +closed timestamp. + +Note that this scheme switches the tradeoff from the current +implementation: currently, closed timestamp updates on the follower side are +lazy; replicas are not directly updated in any way, instead letting reads +consult a separate structure for figuring out what's closed. The current +proposal makes updates more expensive but reads cheaper. + +## Closing of timestamps + +We've talked about how closed timestamps are communicated by a leaseholder to +followers. Now we'll describe how a timestamp becomes closed. The scheme is +pretty similar to the existing one, but simpler because LAI don't need to be +tracked anymore, and because all the tracking of requests can be done per-range +(instead of per-store). Another difference is that the existing scheme relies on +a timer closing out timestamps, where this proposal relies just on the flow of +requests to do it (for active ranges). + +### Theoretical considerations + +We *can* close a timestamp as soon as there's no in-flight evaluation of a +request at a lower timestamp. As soon as we close a timestamp, any write at a +lower timestamp needs to be bumped. So, we don't want to be too aggressive in +closing timestamps; we want to always trail current time some (this doesn't talk +about leading closed timestamps for non-blocking ranges) such that there's some +slack for recent transactions to come and perform their writes without +interference. + +The scheme that directly encodes what we just said would be tracking in-flight +evaluations in a min-heap, ordered by their evaluation timestamp. As soon as an +evaluation is done, we could take it out of the heap and we could close the +remaining minimum (if it is below the desired slack). Actually maintaining this +heap seems pretty expensive, though. To make it cheaper, we could start +compressing the requests into fewer heap entries: we could discretize the +timestamps into windows of, say, 10ms. Each window could be represented by the +heap by one element, with a ref count. If we'd maintain these buckets in the +simplest way possible - a circular buffer - then their count would still be +unbounded as it depends on the evaluation time of the slowest request. Going a +step further would be placing a limit on the number of buckets and, once the +limit is reached, just have new requests join the last bucket. What should this +limit be? Well, one doesn't really work, and the next smallest number is two. +So, at the limit, we could maintain just two buckets. + +The current implementation uses two buckets, and the narrative above is a +post-rationalization of how we got it. The current proposal keeps a two-buckets +scheme in place. + +#### Analysis + +Let's analyze how the two-buckets scheme behaves vs. the optimal approach of a +min-heap. What we want to know is 1) what is the rate that the tracker advances, +and 2) what is the lag that it imposes. Let's say that each write takes time *L* +to evaluate. The worst case is when a new request joins +*cur* immediately before it shifts. So then *prev* takes *L* to shift again, as it +needs to wait for the entire duration of the newly added request's evaluation. +So the rate of the tracker is *1/L*. +Then there's the lag. We pick a timestamp that we want to close when +initializing *cur*. We then need to wait for one shift for this to be added to log +entries as the closed timestamp. This is then the active closed timestamp for *L*, +so the worst-case lag is *2L*. + +If we compare that to the min-heap, get a rate of ∞ (period of 0) - +the closed timestamp can advance as fast as proposals happen. However, we do get +a lag of *L*, because we need to wait for the earliest entry to finish evaluating +before the closed timestamp can progress. + +So the two-bucket approach leads to twice the lag of the "optimal" min-heap +approach. + +### How the tracker will work + +Each range will have a *tracker* which keeps track of what requests are +currently evaluating - which indirectly means tracking what requests have to +finish evaluating for various timestamps to be closed. The tracker maintains an +ordered set of *buckets* of requests in the process of evaluating. Each bucket +has a timestamp and a reference count associated with it; every request in the +bucket evaluated (or is evaluating) as a higher timestamp than the bucket's +timestamp. The timestamp of bucket *i* is larger than that of bucket *i-1*. The +buckets group requests into "epochs", each epoch being defined by the smallest +timestamp allowed for a request to evaluate at (a request's write timestamp +needs to be strictly greater than its bucket's timestamp). At any point in time +only the last two buckets can be non-empty (and so the implementation will just +maintain two buckets and continuously swap them) - we'll call the last bucket +*cur* and the previous one *prev*. We'll say that we "increment" *prev* and +*cur* to mean that we move the *prev* pointer to the next bucket, and we +similarly create a new bucket and move the *cur* pointer to it. + +The tracker will logically be part of the `ProposalBuffer` - it will be +protected by the buffer's peculiar rwlock. Insertions into the buckets will need +the read lock (similar to insertions into the buffer), and the flushing of the +buffer (done under a write lock) removes the flushed requests from their +buckets. Integrating the tracker with the `ProposalBuffer` helps to ensure that +the closed timestamps carried by successive commands are monotonic. Requests +need to fix the closed timestamp that their proposal will carry either before, +or otherwise atomically with, exiting their bucket. It'd be incorrect to allow a +request to exit its bucket and fix its proposal's timestamp later, because we'd +run the risk of assigning a closed ts that's too high (as it possibly wouldn't +take into account other requests with higher timestamps that have also exited +the bucket after it). + +As opposed to the current implementation, the buckets and their timestamps don't +directly related to the current closed timestamp. The range's closed timestamp +(meaning, the highest timestamp that was attached to a proposal) will be +maintained separately by the tracker. This allows the command corresponding to +the last requests that leaves the bucket (i.e. commands that leave both buckets +empty when they exit) to carry a timestamp relative to the flush time, not one +relative to when the request started evaluating. Any new request exiting the +buckets, though, will carry a closed ts that's higher or equal to prev*'s +timestamp. + +### Rules for maintaining the buckets + +When a request arrives, it enters *cur*. If *cur*'s timestamp is not set, it is +set now to `now() - kv.closed_timestamp.target_duration` (or according to +whatever the range's policy is). If *prev* is found to be empty, both *prev* and +*cur* are incremented - after the increment, the request finds itself in *prev*, +and *cur* is a new, empty bucket. + +When creating a new bucket (by incrementing *cur*), that bucket's timestamp is +not set immediately. Instead, it is set by the first request to enter it. Notice +that a bucket's timestamp does not depend on the request creating the bucket, so +timestamps for buckets are monotonically increasing. + +When a requests exits its bucket (i.e. at buffer flush time), the logic depends +on what bucket it's exiting. A request that's not the last one in its bucket +doesn't do anything special. When the request is the last one: + +1. If exiting *prev*, then *prev* and *cur* are incremented. This way, a new bucket is created +and the next arriving request will get to set its timestamp. + +2. If exiting *cur*, then *cur*'s timestamp is reset to the uninitilized state. +This lets the next request set a new, higher one. Note that we can't necessarily +increment cur* and *prev* because *prev* might not be empty. + + +### Rules for assigning a proposal's closed ts + +Again, the closed ts that a proposal carries is determined at flush time - after all the +requests sequenced in the log before it have exited the tracker, and before any request sequenced +after it has exited. + +1. If both buckets are empty, the timestamp is `now() - +kv.closed_timestamp.target_duration`. Notice how the timestamps of the buckets +don't matter, and the request's evaluation write timestamp (evaluation +timestamp) also doesn't matter. Indeed, the closed timestamp carrier by a +proposal can be higher than its write timestamp (such a case simply means that a +proposal is writing at, say, 10, and is also promising that future proposals +will only write above, say, 20). + +2. If *prev* is empty, then *cur*'s ts is used. *cur*'s timestamp must be set, otherwise +we would have been in case 1. + +3. Otherwise (i.e. if *prev* is not empty), *prev*'s timestamp is used. + +Lease requests are special as they don't carry a closed timestamp. As explained +before, the lease start time acts as the proposal's closed ts. Such requests are +not tracked by the tracker. + +#### Some diagrams: + +In the beginning, there are two empty buckets: + +``` +| | | +|prev;ts= |cur;ts= | +--------------------------------- +``` + +The first request (`r1`) comes and enters *cur*. Since *cur*'s ts is not set, +it gets sets now to `now()-5s = 10(say)` (say that the +range's target is a trail of 5s). Once the bucket timestamp is set, the request bumps +its own timestamp to its buckets timestamp. The request then notices that *prev* +is empty, so it shifts *cur* over (thus now finding itself in *prev*). Requests +from now on will carry a closed ts >= 10. + +``` +|refcnt: 1 (r1) | | +|prev;ts=10 |cur;ts= | +--------------------------------- +``` + +While `r1` is evaluating, 3 other requests come. They all enter *cur*. The first +one among them sets *cur*'s timestamp to, say, 15. + + +``` +|refcnt: 1 (r1) |refcnt:3 | +|prev;ts=10 |cur;ts=15 | +--------------------------------- +``` + +Let's say two of these requests finish evaluating while `r1`'s eval is still in +progress. They exit *cur* and they carry prev*'s `ts=10`). Then let's say `r1` +finishes evaluating and exits (also carrying `10`). Nothing more happens, even +though *prev* is now empty; if *cur* were empty, *prev* and *cur* would now be +incremented. + +``` +|refcnt: 0 |refcnt:1 (r2) | +|prev;ts=10 |cur;ts=15 | +--------------------------------- +``` + +Now say the last request out of *cur* (call it `r2`) exits the bucket. Since +both buckets are empty, it will carry `now - 5s`. We know that this is >= 15 +because 15 was `now - 5s` at some point in the past. Since *cur* is empty, its +timestamp gets reset. + +``` +|refcnt: 0 |refcnt:0 | +|prev;ts=10 |cur;ts= | +--------------------------------- +``` + +At this point the range is technically inactive. If enough time passes, the +background process in charge of publishing updates for inactive ranges using the +side transport will pick it up. But let's say another request comes in quickly. +When the next request comes in, though, the same thing will happen as when the +very first request came: it'll enter *cur*, find *prev* to be empty, shift over +*cur*->*prev* and create a new bucket. + +## Migration + +The existing closed timestamp tracker and transport will be left in place for a +release, working in parallel with the new mechanisms. Followers will only +activate closed timestamps communicated by the new mechanism after a cluster +version check. From e61de155017a1821d08ba0a414967927adabe0eb Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 8 Dec 2022 10:10:45 +0100 Subject: [PATCH 4/6] kvserver: decouple cmd checks in replicaAppBatch This refactors the command application pre-flight checks on replicaAppBatch such that they can move to appBatch once that struct evolves from a stub into an actual implementation of `apply.Batch`. Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/app_batch.go | 99 +++++++++++++++++++ pkg/kv/kvserver/replica_app_batch.go | 64 ++++++------ .../replica_application_state_machine.go | 48 ++++----- 4 files changed, 156 insertions(+), 56 deletions(-) create mode 100644 pkg/kv/kvserver/app_batch.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 7463f349ac22..c34ec8aadf5d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -9,6 +9,7 @@ go_library( srcs = [ "addressing.go", "allocation_op.go", + "app_batch.go", "consistency_queue.go", "debug_print.go", "doc.go", diff --git a/pkg/kv/kvserver/app_batch.go b/pkg/kv/kvserver/app_batch.go new file mode 100644 index 000000000000..5bf64a8b7d83 --- /dev/null +++ b/pkg/kv/kvserver/app_batch.go @@ -0,0 +1,99 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// appBatch is the in-progress foundation for standalone log entry +// application[^1], i.e. the act of applying raft log entries to the state +// machine in a library-style fashion, without a running CockroachDB server. +// +// The intended usage is as follows. Starting with a ReplicatedCmd per Entry, +// +// 1. check it via assertAndCheckCommand followed by toCheckedCmd +// 2. run pre-add triggers (which may augment the WriteBatch) +// 3. stage the WriteBatch into a pebble Batch +// 4. run post-add triggers (metrics, etc) +// +// when all Entries have been added, the batch can be committed. In the course +// of time, appBatch will become an implementation of apply.Batch itself; at the +// time of writing it is only used by the replicaAppBatch implementation of +// apply.Batch, which goes through the above steps while interspersing: +// +// 1a. testing interceptors between assertAndCheckCommand and toCheckedCmd +// 2b. pre-add triggers specific to online command application (e.g. acquiring locks +// during replica-spanning operations), and +// 4b. post-add triggers specific to online command application (e.g. updates to +// Replica in-mem state) +// +// [^1]: https://github.com/cockroachdb/cockroach/issues/75729 +type appBatch struct { + // TODO(tbg): this will absorb the following fields from replicaAppBatch: + // + // - batch + // - state + // - changeRemovesReplica +} + +func (b *appBatch) assertAndCheckCommand( + ctx context.Context, cmd *raftlog.ReplicatedCmd, state *kvserverpb.ReplicaState, isLocal bool, +) (leaseIndex uint64, _ kvserverbase.ProposalRejectionType, forcedErr *roachpb.Error, _ error) { + if log.V(4) { + log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s", + cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp) + } + + if cmd.Index() == 0 { + return 0, 0, nil, errors.AssertionFailedf("processRaftCommand requires a non-zero index") + } + if idx, applied := cmd.Index(), state.RaftAppliedIndex; idx != applied+1 { + // If we have an out-of-order index, there's corruption. No sense in + // trying to update anything or running the command. Simply return. + return 0, 0, nil, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx) + } + + // TODO(sep-raft-log): move the closedts checks from replicaAppBatch here as + // well. This just needs a bit more untangling as they reference *Replica, but + // for no super-convincing reason. + + leaseIndex, rej, forcedErr := kvserverbase.CheckForcedErr(ctx, cmd.ID, &cmd.Cmd, isLocal, state) + return leaseIndex, rej, forcedErr, nil +} + +func (b *appBatch) toCheckedCmd( + ctx context.Context, + cmd *raftlog.ReplicatedCmd, + leaseIndex uint64, + rej kvserverbase.ProposalRejectionType, + forcedErr *roachpb.Error, +) { + cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rej, forcedErr + if cmd.Rejected() { + log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedErr) + + // Apply an empty command. + cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{} + cmd.Cmd.WriteBatch = nil + cmd.Cmd.LogicalOpLog = nil + cmd.Cmd.ClosedTimestamp = nil + } else { + log.Event(ctx, "applying command") + } +} diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 1fb808d0711f..55ae4e342b98 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -94,39 +95,32 @@ func (b *replicaAppBatch) Stage( ctx context.Context, cmdI apply.Command, ) (apply.CheckedCommand, error) { cmd := cmdI.(*replicatedCmd) - if cmd.Index() == 0 { - return nil, errors.AssertionFailedf("processRaftCommand requires a non-zero index") + + // We'll follow the steps outlined in appBatch's comment here, and will call + // into appBatch at appropriate times. + var ab appBatch + + leaseIndex, rej, forcedErr, err := ab.assertAndCheckCommand(ctx, &cmd.ReplicatedCmd, &b.state, cmd.IsLocal()) + if err != nil { + return nil, err } - if idx, applied := cmd.Index(), b.state.RaftAppliedIndex; idx != applied+1 { - // If we have an out of order index, there's corruption. No sense in - // trying to update anything or running the command. Simply return. - return nil, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx) + + // Then, maybe override the result with testing knobs. + if b.r.store.TestingKnobs() != nil { + rej, forcedErr = replicaApplyTestingFilters(ctx, b.r, cmd, rej, forcedErr) } - if log.V(4) { - log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s", - cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp) - } - - // Determine whether the command should be applied to the replicated state - // machine or whether it should be rejected (and replaced by an empty command). - // This check is deterministic on all replicas, so if one replica decides to - // reject a command, all will. - if !b.r.shouldApplyCommand(ctx, cmd, &b.state) { - log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedErr) - - // Apply an empty command. - cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{} - cmd.Cmd.WriteBatch = nil - cmd.Cmd.LogicalOpLog = nil - cmd.Cmd.ClosedTimestamp = nil - } else { - if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil { - return nil, err - } - if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil { - return nil, err - } - log.Event(ctx, "applying command") + + // Now update cmd. We'll either put the lease index in it or zero out + // the cmd in case there's a forced error. + ab.toCheckedCmd(ctx, &cmd.ReplicatedCmd, leaseIndex, rej, forcedErr) + + // TODO(tbg): these assertions should be pushed into + // (*appBatch).assertAndCheckCommand. + if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil { + return nil, err + } + if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil { + return nil, err } // Acquire the split or merge lock, if necessary. If a split or merge @@ -791,8 +785,12 @@ func (mb *ephemeralReplicaAppBatch) Stage( ) (apply.CheckedCommand, error) { cmd := cmdI.(*replicatedCmd) - mb.r.shouldApplyCommand(ctx, cmd, &mb.state) - mb.state.LeaseAppliedIndex = cmd.LeaseIndex + leaseIndex, rejection, forcedErr := kvserverbase.CheckForcedErr( + ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), &mb.state, + ) + rejection, forcedErr = replicaApplyTestingFilters(ctx, mb.r, cmd, rejection, forcedErr) + cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rejection, forcedErr + return cmd, nil } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index b841b4af4f9e..0c0667913a50 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -79,43 +79,45 @@ func (r *Replica) getStateMachine() *replicaStateMachine { return sm } -// shouldApplyCommand determines whether or not a command should be applied to -// the replicated state machine after it has been committed to the Raft log. It -// then sets the provided command's LeaseIndex, Rejection, and ForcedErr -// fields and returns whether command should be applied or rejected. -func (r *Replica) shouldApplyCommand( - ctx context.Context, cmd *replicatedCmd, replicaState *kvserverpb.ReplicaState, -) bool { - cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = kvserverbase.CheckForcedErr( - ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), replicaState, - ) - // Consider testing-only filters. - if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; cmd.ForcedErr != nil || filter != nil { +// TODO(tbg): move this to replica_app_batch.go. +func replicaApplyTestingFilters( + ctx context.Context, + r *Replica, + cmd *replicatedCmd, + rejection kvserverbase.ProposalRejectionType, + forcedErr *roachpb.Error, +) (newRejection kvserverbase.ProposalRejectionType, newForcedErr *roachpb.Error) { + // By default, output is input. + newRejection = rejection + newForcedErr = forcedErr + + // Filters may change that. + if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; forcedErr != nil || filter != nil { args := kvserverbase.ApplyFilterArgs{ CmdID: cmd.ID, ReplicatedEvalResult: *cmd.ReplicatedResult(), StoreID: r.store.StoreID(), RangeID: r.RangeID, - ForcedError: cmd.ForcedErr, + ForcedError: forcedErr, } - if cmd.ForcedErr == nil { + if forcedErr == nil { if cmd.IsLocal() { args.Req = cmd.proposal.Request } - newPropRetry, newForcedErr := filter(args) - cmd.ForcedErr = newForcedErr - if cmd.Rejection == 0 { - cmd.Rejection = kvserverbase.ProposalRejectionType(newPropRetry) + var newRej int + newRej, newForcedErr = filter(args) + if rejection == 0 { + newRejection = kvserverbase.ProposalRejectionType(newRej) } } else if feFilter := r.store.cfg.TestingKnobs.TestingApplyForcedErrFilter; feFilter != nil { - newPropRetry, newForcedErr := filter(args) - cmd.ForcedErr = newForcedErr - if cmd.Rejection == 0 { - cmd.Rejection = kvserverbase.ProposalRejectionType(newPropRetry) + var newRej int + newRej, newForcedErr = filter(args) + if rejection == 0 { + newRejection = kvserverbase.ProposalRejectionType(newRej) } } } - return cmd.ForcedErr == nil + return newRejection, newForcedErr } // NewEphemeralBatch implements the apply.StateMachine interface. From 54e3708c8ccc850e5edd7a77c74bdb70aacde3a5 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 16 Dec 2022 11:41:50 +0100 Subject: [PATCH 5/6] kvserverbase: introduce struct for return val of CheckForcedErr Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/app_batch.go | 20 ++--- pkg/kv/kvserver/kvserverbase/forced_error.go | 81 ++++++++++++++----- pkg/kv/kvserver/raftlog/command.go | 13 ++- pkg/kv/kvserver/replica_app_batch.go | 12 +-- pkg/kv/kvserver/replica_application_result.go | 4 +- .../replica_application_state_machine.go | 31 +++---- pkg/kv/kvserver/replica_test.go | 3 +- 7 files changed, 94 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvserver/app_batch.go b/pkg/kv/kvserver/app_batch.go index 5bf64a8b7d83..6d58d84857f9 100644 --- a/pkg/kv/kvserver/app_batch.go +++ b/pkg/kv/kvserver/app_batch.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -54,39 +53,34 @@ type appBatch struct { func (b *appBatch) assertAndCheckCommand( ctx context.Context, cmd *raftlog.ReplicatedCmd, state *kvserverpb.ReplicaState, isLocal bool, -) (leaseIndex uint64, _ kvserverbase.ProposalRejectionType, forcedErr *roachpb.Error, _ error) { +) (kvserverbase.ForcedErrResult, error) { if log.V(4) { log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s", cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp) } if cmd.Index() == 0 { - return 0, 0, nil, errors.AssertionFailedf("processRaftCommand requires a non-zero index") + return kvserverbase.ForcedErrResult{}, errors.AssertionFailedf("processRaftCommand requires a non-zero index") } if idx, applied := cmd.Index(), state.RaftAppliedIndex; idx != applied+1 { // If we have an out-of-order index, there's corruption. No sense in // trying to update anything or running the command. Simply return. - return 0, 0, nil, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx) + return kvserverbase.ForcedErrResult{}, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx) } // TODO(sep-raft-log): move the closedts checks from replicaAppBatch here as // well. This just needs a bit more untangling as they reference *Replica, but // for no super-convincing reason. - leaseIndex, rej, forcedErr := kvserverbase.CheckForcedErr(ctx, cmd.ID, &cmd.Cmd, isLocal, state) - return leaseIndex, rej, forcedErr, nil + return kvserverbase.CheckForcedErr(ctx, cmd.ID, &cmd.Cmd, isLocal, state), nil } func (b *appBatch) toCheckedCmd( - ctx context.Context, - cmd *raftlog.ReplicatedCmd, - leaseIndex uint64, - rej kvserverbase.ProposalRejectionType, - forcedErr *roachpb.Error, + ctx context.Context, cmd *raftlog.ReplicatedCmd, fr kvserverbase.ForcedErrResult, ) { - cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rej, forcedErr + cmd.ForcedErrResult = fr if cmd.Rejected() { - log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedErr) + log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedError) // Apply an empty command. cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{} diff --git a/pkg/kv/kvserver/kvserverbase/forced_error.go b/pkg/kv/kvserver/kvserverbase/forced_error.go index 55fc20b128be..7b6ae0e8b970 100644 --- a/pkg/kv/kvserver/kvserverbase/forced_error.go +++ b/pkg/kv/kvserver/kvserverbase/forced_error.go @@ -42,6 +42,13 @@ var noopOnEmptyRaftCommandErr = roachpb.NewErrorf("no-op on empty Raft entry") // corresponding to a ProbeRequest is handled. var NoopOnProbeCommandErr = roachpb.NewErrorf("no-op on ProbeRequest") +// ForcedErrResult is the output from CheckForcedErr. +type ForcedErrResult struct { + LeaseIndex uint64 + Rejection ProposalRejectionType + ForcedError *roachpb.Error +} + // CheckForcedErr determines whether or not a command should be applied to the // replicated state machine after it has been committed to the Raft log. This // decision is deterministic on all replicas, such that a command that is @@ -69,13 +76,16 @@ func CheckForcedErr( raftCmd *kvserverpb.RaftCommand, isLocal bool, replicaState *kvserverpb.ReplicaState, -) (uint64, ProposalRejectionType, *roachpb.Error) { +) ForcedErrResult { if raftCmd.ReplicatedEvalResult.IsProbe { // A Probe is handled by forcing an error during application (which // avoids a separate "success" code path for this type of request) // that we can special case as indicating success of the probe above // raft. - return 0, ProposalRejectionPermanent, NoopOnProbeCommandErr + return ForcedErrResult{ + Rejection: ProposalRejectionPermanent, + ForcedError: NoopOnProbeCommandErr, + } } leaseIndex := replicaState.LeaseAppliedIndex isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest @@ -89,7 +99,11 @@ func CheckForcedErr( // Nothing to do here except making sure that the corresponding batch // (which is bogus) doesn't get executed (for it is empty and so // properties like key range are undefined). - return leaseIndex, ProposalRejectionPermanent, noopOnEmptyRaftCommandErr + return ForcedErrResult{ + LeaseIndex: leaseIndex, + Rejection: ProposalRejectionPermanent, + ForcedError: noopOnEmptyRaftCommandErr, + } } // Verify the lease matches the proposer's expectation. We rely on @@ -162,11 +176,15 @@ func CheckForcedErr( // For lease requests we return a special error that // redirectOnOrAcquireLease() understands. Note that these // requests don't go through the DistSender. - return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *replicaState.Lease, - Requested: requestedLease, - Message: "proposed under invalid lease", - }) + return ForcedErrResult{ + LeaseIndex: leaseIndex, + Rejection: ProposalRejectionPermanent, + ForcedError: roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "proposed under invalid lease", + }), + } } // We return a NotLeaseHolderError so that the DistSender retries. // NB: we set proposerStoreID to 0 because we don't know who proposed the @@ -176,7 +194,11 @@ func CheckForcedErr( fmt.Sprintf( "stale proposal: command was proposed under lease #%d but is being applied "+ "under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease)) - return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(nlhe) + return ForcedErrResult{ + LeaseIndex: leaseIndex, + Rejection: ProposalRejectionPermanent, + ForcedError: roachpb.NewError(nlhe), + } } if isLeaseRequest { @@ -188,11 +210,15 @@ func CheckForcedErr( // However, leases get special vetting to make sure we don't give one to a replica that was // since removed (see #15385 and a comment in redirectOnOrAcquireLease). if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { - return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *replicaState.Lease, - Requested: requestedLease, - Message: "replica not part of range", - }) + return ForcedErrResult{ + LeaseIndex: leaseIndex, + Rejection: ProposalRejectionPermanent, + ForcedError: roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "replica not part of range", + }), + } } } else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { // The happy case: the command is applying at or ahead of the minimal @@ -219,9 +245,12 @@ func CheckForcedErr( ) retry = ProposalRejectionIllegalLeaseIndex } - return leaseIndex, retry, roachpb.NewErrorf( - "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, - ) + return ForcedErrResult{ + LeaseIndex: leaseIndex, + Rejection: retry, + ForcedError: roachpb.NewErrorf( + "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, + )} } // Verify that command is not trying to write below the GC threshold. This is @@ -235,10 +264,18 @@ func CheckForcedErr( // the GC threshold has advanced since then? wts := raftCmd.ReplicatedEvalResult.WriteTimestamp if !wts.IsEmpty() && wts.LessEq(*replicaState.GCThreshold) { - return leaseIndex, ProposalRejectionPermanent, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{ - Timestamp: wts, - Threshold: *replicaState.GCThreshold, - }) + return ForcedErrResult{ + LeaseIndex: leaseIndex, + Rejection: ProposalRejectionPermanent, + ForcedError: roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{ + Timestamp: wts, + Threshold: *replicaState.GCThreshold, + }), + } + } + return ForcedErrResult{ + LeaseIndex: leaseIndex, + Rejection: ProposalRejectionPermanent, + ForcedError: nil, } - return leaseIndex, ProposalRejectionPermanent, nil } diff --git a/pkg/kv/kvserver/raftlog/command.go b/pkg/kv/kvserver/raftlog/command.go index 4ae40ff8e910..152a38008c45 100644 --- a/pkg/kv/kvserver/raftlog/command.go +++ b/pkg/kv/kvserver/raftlog/command.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3/raftpb" ) @@ -32,13 +31,11 @@ import ( // changes replica boundaries. type ReplicatedCmd struct { *Entry - // The following fields are set in shouldApplyCommand when we validate that - // a command applies given the current lease and GC threshold. The process - // of setting these fields is what transforms an apply.Command into an + // The following struct is populated in shouldApplyCommand when we validate + // that a command applies given the current lease and GC threshold. The + // process of populating it is what transforms an apply.Command into an // apply.CheckedCommand. - LeaseIndex uint64 - ForcedErr *roachpb.Error - Rejection kvserverbase.ProposalRejectionType + kvserverbase.ForcedErrResult } var _ apply.Command = (*ReplicatedCmd)(nil) @@ -106,7 +103,7 @@ func (c *ReplicatedCmd) AckOutcomeAndFinish(context.Context) error { return nil // A command is rejected if it has a ForcedErr, i.e. if the state machines // (deterministically) apply the associated entry as a no-op. See // kvserverbase.CheckForcedErr. -func (c *ReplicatedCmd) Rejected() bool { return c.ForcedErr != nil } +func (c *ReplicatedCmd) Rejected() bool { return c.ForcedError != nil } // CanAckBeforeApplication implements apply.CheckedCommand. // diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 55ae4e342b98..b5b83736ad01 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -100,19 +100,19 @@ func (b *replicaAppBatch) Stage( // into appBatch at appropriate times. var ab appBatch - leaseIndex, rej, forcedErr, err := ab.assertAndCheckCommand(ctx, &cmd.ReplicatedCmd, &b.state, cmd.IsLocal()) + fr, err := ab.assertAndCheckCommand(ctx, &cmd.ReplicatedCmd, &b.state, cmd.IsLocal()) if err != nil { return nil, err } // Then, maybe override the result with testing knobs. if b.r.store.TestingKnobs() != nil { - rej, forcedErr = replicaApplyTestingFilters(ctx, b.r, cmd, rej, forcedErr) + fr = replicaApplyTestingFilters(ctx, b.r, cmd, fr) } // Now update cmd. We'll either put the lease index in it or zero out // the cmd in case there's a forced error. - ab.toCheckedCmd(ctx, &cmd.ReplicatedCmd, leaseIndex, rej, forcedErr) + ab.toCheckedCmd(ctx, &cmd.ReplicatedCmd, fr) // TODO(tbg): these assertions should be pushed into // (*appBatch).assertAndCheckCommand. @@ -785,11 +785,11 @@ func (mb *ephemeralReplicaAppBatch) Stage( ) (apply.CheckedCommand, error) { cmd := cmdI.(*replicatedCmd) - leaseIndex, rejection, forcedErr := kvserverbase.CheckForcedErr( + fr := kvserverbase.CheckForcedErr( ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), &mb.state, ) - rejection, forcedErr = replicaApplyTestingFilters(ctx, mb.r, cmd, rejection, forcedErr) - cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rejection, forcedErr + fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr) + cmd.ForcedErrResult = fr return cmd, nil } diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 8b1b8f78327b..06164f0a9b86 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -83,14 +83,14 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { StoreID: r.store.StoreID(), RangeID: r.RangeID, Req: cmd.proposal.Request, - ForcedError: cmd.ForcedErr, + ForcedError: cmd.ForcedError, }) if cmd.Rejection == 0 { cmd.Rejection = kvserverbase.ProposalRejectionType(newPropRetry) } } if pErr == nil { - pErr = cmd.ForcedErr + pErr = cmd.ForcedError } if cmd.Rejection != kvserverbase.ProposalRejectionPermanent && pErr == nil { diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 0c0667913a50..536fd3599ab5 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -81,43 +81,38 @@ func (r *Replica) getStateMachine() *replicaStateMachine { // TODO(tbg): move this to replica_app_batch.go. func replicaApplyTestingFilters( - ctx context.Context, - r *Replica, - cmd *replicatedCmd, - rejection kvserverbase.ProposalRejectionType, - forcedErr *roachpb.Error, -) (newRejection kvserverbase.ProposalRejectionType, newForcedErr *roachpb.Error) { + ctx context.Context, r *Replica, cmd *replicatedCmd, fr kvserverbase.ForcedErrResult, +) kvserverbase.ForcedErrResult { // By default, output is input. - newRejection = rejection - newForcedErr = forcedErr + newFR := fr // Filters may change that. - if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; forcedErr != nil || filter != nil { + if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; fr.ForcedError != nil || filter != nil { args := kvserverbase.ApplyFilterArgs{ CmdID: cmd.ID, ReplicatedEvalResult: *cmd.ReplicatedResult(), StoreID: r.store.StoreID(), RangeID: r.RangeID, - ForcedError: forcedErr, + ForcedError: fr.ForcedError, } - if forcedErr == nil { + if fr.ForcedError == nil { if cmd.IsLocal() { args.Req = cmd.proposal.Request } var newRej int - newRej, newForcedErr = filter(args) - if rejection == 0 { - newRejection = kvserverbase.ProposalRejectionType(newRej) + newRej, newFR.ForcedError = filter(args) + if fr.Rejection == 0 { + newFR.Rejection = kvserverbase.ProposalRejectionType(newRej) } } else if feFilter := r.store.cfg.TestingKnobs.TestingApplyForcedErrFilter; feFilter != nil { var newRej int - newRej, newForcedErr = filter(args) - if rejection == 0 { - newRejection = kvserverbase.ProposalRejectionType(newRej) + newRej, newFR.ForcedError = filter(args) + if fr.Rejection == 0 { + newFR.Rejection = kvserverbase.ProposalRejectionType(newRej) } } } - return newRejection, newForcedErr + return newFR } // NewEphemeralBatch implements the apply.StateMachine interface. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 3d64fb8b74ef..e82264637828 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -825,10 +825,11 @@ func TestLeaseReplicaNotInDesc(t *testing.T) { }, } tc.repl.mu.Lock() - _, _, pErr := kvserverbase.CheckForcedErr( + fr := kvserverbase.CheckForcedErr( ctx, makeIDKey(), &raftCmd, false, /* isLocal */ &tc.repl.mu.state, ) + pErr := fr.ForcedError tc.repl.mu.Unlock() if _, isErr := pErr.GetDetail().(*roachpb.LeaseRejectedError); !isErr { t.Fatal(pErr) From 711893e83e40e5d71afd179ff770db7c56943b07 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 19 Dec 2022 15:57:19 +0100 Subject: [PATCH 6/6] kvserver: fix a testing filter invocation We were invoking the wrong filter. Luckily, this has only one user, and for that users, both filters were the same. Epic: none Release note: None --- pkg/kv/kvserver/replica_application_state_machine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 536fd3599ab5..e58eae2677e8 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -106,7 +106,7 @@ func replicaApplyTestingFilters( } } else if feFilter := r.store.cfg.TestingKnobs.TestingApplyForcedErrFilter; feFilter != nil { var newRej int - newRej, newFR.ForcedError = filter(args) + newRej, newFR.ForcedError = feFilter(args) if fr.Rejection == 0 { newFR.Rejection = kvserverbase.ProposalRejectionType(newRej) }