diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 225425da3945..a82fa62cbad8 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -469,6 +469,10 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error // Acquire the split or merge lock, if necessary. If a split or merge // command was rejected with a below-Raft forced error then its replicated // result was just cleared and this will be a no-op. + // + // TODO(tbg): can't this happen in splitPreApply which is called from + // b.runPreApplyTriggersAfterStagingWriteBatch and similar for merges? That + // way, it would become less of a one-off. if splitMergeUnlock, err := b.r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { var err error if cmd.raftCmd.ReplicatedEvalResult.Split != nil { @@ -1259,6 +1263,47 @@ func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *re return nil } return sm.r.withRaftGroup(true, func(rn *raft.RawNode) (bool, error) { + // NB: `etcd/raft` configuration changes diverge from the official Raft way + // in that a configuration change becomes active when the corresponding log + // entry is applied (rather than appended). This ultimately enables the way + // we do things where the state machine's view of the range descriptor always + // dictates the active replication config but it is much trickier to prove + // correct. See: + // + // https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 + // + // INVARIANT: a leader will not append a config change to its logs when it + // hasn't applied all previous config changes in its logs. + // + // INVARIANT: a node will not campaign until it has applied any + // configuration changes with indexes less than or equal to its committed + // index. + // + // INVARIANT: appending a config change to the log (at leader or follower) + // implies that any previous config changes are durably known to be + // committed. That is, a commit index is persisted (and synced) that + // encompasses any earlier config changes before a new config change is + // appended. + // + // Together, these invariants ensure that a follower that is behind by + // multiple configuration changes will be using one of the two most recent + // configuration changes "by the time it matters", which is what is + // required for correctness (configuration changes are sequenced so that + // neighboring configurations are mutually compatible, i.e. don't cause + // split brain). To see this, consider a follower that is behind by + // multiple configuration changes. This is fine unless this follower + // becomes the leader (as it would then make quorum determinations based + // on its active config). To become leader, it needs to campaign, and + // thanks to the second invariant, it will only do so once it has applied + // all the configuration changes in its committed log. If it is to win the + // election, it will also have all committed configuration changes in its + // log (though not necessarily knowing that they are all committed). But + // the third invariant implies that when the follower received the most + // recent configuration change into its log, the one preceding it was + // durably marked as committed on the follower. In summary, we now know + // that it will apply all the way up to and including the second most + // recent configuration change, which is compatible with the most recent + // one. rn.ApplyConfChange(cmd.confChange.ConfChangeI) return true, nil }) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 5d90512eb330..2564a9d1b9a5 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -413,8 +413,257 @@ func (rs *storeReplicaVisitor) EstimatedCount() int { return len(rs.repls) - rs.visited } -// A Store maintains a map of ranges by start key. A Store corresponds -// to one physical device. +/* +A Store maintains a set of Replicas whose data is stored on a storage.Engine +usually corresponding to a dedicated storage medium. It also houses a collection +of subsystems that, in broad terms, perform maintenance of each Replica on the +Store when required. In particular, this includes various queues such as the +split, merge, rebalance, GC, and replicaGC queues (to name just a few). + +INVARIANT: the set of all Ranges (as determined by, e.g. a transactionally +consistent scan of the meta index ranges) always exactly covers the addressable +keyspace roachpb.KeyMin (inclusive) to roachpb.KeyMax (exclusive). + +Ranges + +Each Replica is part of a Range, i.e. corresponds to what other systems would +call a shard. A Range is a consensus group backed by Raft, i.e. each Replica is +a state machine backed by a replicated log of commands. In CockroachDB, Ranges +own a contiguous chunk of addressable keyspace, where the word "addressable" is +a fine-print that interested readers can learn about in keys.Addr; in short the +Replica data is logically contiguous, but not contiguous when viewed as Engine +kv pairs. + +Considerable complexity is incurred by the features of dynamic re-sharding and +relocation, i.e. range splits, range merges, and range relocation (also called +replication changes, or, in the case of lateral movement, rebalancing). Each of +these interact heavily with the Range as a consensus group (of which each +Replica is a member). All of these intricacies are described at a high level in +this comment. + +RangeDescriptor + +A roachpb.RangeDescriptor is the configuration of a Range. It is an +MVCC-backed key-value pair (where the key is derived from the StartKey via +keys.RangeDescriptorKey, which in particular resides on the Range itself) +that is accessible to the transactional KV API much like any other, but is +for internal use only (and in particular plays no role at the SQL layer). + +Splits, Merges, and Rebalances are all carried out as distributed +transactions. They are all complex in their own right but they share the +basic approach of transactionally acting on the RangeDescriptor. Each of +these operations at some point will + +- start a transaction + +- update the RangeDescriptor (for example, to reflect a split, or a change +to the Replicas comprising the members of the Range) + +- update the meta ranges (which form a search index used for request routing) + +- commit with a roachpb.InternalCommitTrigger. + +In particular, note that the RangeDescriptor is the first write issued in the +transaction, and so the transaction record will be created on the affected +range. This allows us to establish a helpful invariant: + +INVARIANT: an intent on keys.RangeDescriptorKey is resolved atomically with +the (application of the) roachpb.EndTxnRequest committing the transaction. + +A Replica's active configuration is dictated by its visible version of the +RangeDescriptor, and the above invariant simplifies this. Without the invariant, +the new RangeDescriptor would come into effect when the intent is resolved, +which is a) later (so requests might get routed to this Replica without it being +ready to handle them appropriately) and b) requires special-casing around +ResolveIntent acting on the RangeDescriptor. + +INVARIANT: A Store never contains two Replicas from the same Range, nor do the +key ranges for any two of its Replicas overlap. (Note that there is no +requirement that these Replicas come from a consistent set of Ranges). + +To illustrate this last invariant, consider a split of a Replica [a-z) into two +Replicas, [a-c) and [c,z). The Store will never contain both; it has to swap +directly from [a-z) to [a,c)+[c,z). For a similar example, consider accepting a +new Replica [b,d) when a Replica [a,c) is already present, which is similarly +not allowed. To understand how such situations could arise, consider that a +series of changes to the RangeDescriptor is observed asynchronously by +followers. This means that a follower may have Replica [a-z) while any number of +splits and replication changes are already known to the leaseholder, and the new +Ranges created in the process may be attempting to add a Replica to the slow +follower. + +With the invariant, we effectively allow looking up a unique (if it exists) +Replica for any given key, and ensure that no two Replicas on a Store operate on +shared keyspace (as seen by the storage.Engine). Refer to the Replica Lifecycle +diagram below for details on how this invariant is upheld. + +Replica Lifecycle + +A Replica should be thought of primarily as a State Machine applying commands +from a replicated log (the log being replicated across the members of the +Range). The Store's RaftTransport receives Raft messages from Replicas residing +on other Stores and routes them to the appropriate Replicas via +Store.HandleRaftRequest (which is part of the RaftMessageHandler interface), +ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which +houses the integration with the etcd/raft library (raft.RawNode). This may +generate Raft messages to be sent to other Stores; these are handed to +Replica.sendRaftMessages which ultimately hands them to the Store's +RaftTransport.SendAsync method. Raft uses message passing (not +request-response), and outgoing messages will use a gRPC stream that differs +from that used for incoming messages (which makes asymmetric partitions more +likely in case of stream-specific problems). The steady state is relatively +straightforward but when Ranges are being reconfigured, an understanding the +Replica Lifecycle becomes important and upholding the Store's invariants becomes +more complex. + +A first phenomenon to understand is that of uninitialized Replicas. Such a +Replica corresponds to a State Machine that has yet to apply any entries, and in +particular has no notion of an active RangeDescriptor yet. This state exists for +various reasons: + +- A newly added voter already needs to be able to cast a vote even before it has +had time to be caught up by other nodes. In the extreme case, the very first +Raft message received at a Store leading to the creation of an (uninitialized) +Replica may be a request for a vote that is required for quorum. In practice we +mostly sidestep this voting requirement by adding new members through an +intermediate state (see roachpb.LEARNER) that allows them to catch up and become +initialized before making them voters, but at the protocol level this still +relies on the concept of a Replica without state that can act as a Raft peer. + +- For practical reasons, we don't preserve the entire committed replicated log +for eternity. It will periodically get truncated, i.e. a prefix discarded, and +this leads to followers occasionally being forced to catch up on the log via a +Raft Snapshot, i.e. a copy of the replicated data in the State Machine as of +some log position, from which the recipient Replica can then continue to receive +and apply log entries. + +- In CockroachDB, a newly created Range (say at cluster bootstrap, or during a +Range split) is never empty - it has to contain at least the RangeDescriptor. +For the split case, indeed it contains around half of the data originally housed +in the pre-split Range, and it is burdensome to distribute potentially hundreds +of megabytes through the Raft log. So what we do in CockroachDB is to never use +Raft entries zero through nine (see raftInitialLogIndex), which means that a +newly added Replica will need a first Snapshot to obtain access to the log. + +Externally, uninitialized Replicas should be viewed as an implementation +detail that is (or should be!) invisible to most access to a Store in the +context of processing requests coming from the KV API, as uninitialized +Replicas cannot serve such requests. + +The diagram is a lot to take in. The various transitions are discussed in +prose below, and the source .dot file is in store_doc_replica_lifecycle.dot. + + +---------------------+ + +------------------ | Absent | ---------------------------------------------------------------------------------------------------+ + | +---------------------+ | + | | Subsume Crash applySnapshot | + | | Store.Start +---------------+ +---------+ +---------------+ | + | v v | v | v | | + | +-----------------------------------------------------------------------------------------------------------------------+ | + +---------+------------------ | | | + | | | Initialized | | + | | | | | + | +----+------------------ | | -+----+ + | | | +-----------------------------------------------------------------------------------------------------------------------+ | | + | | | | ^ ^ | | | | | + | | | Raft msg | Crash | applySnapshot | post-split | | | | | + | | | v | | | | | | | + | | | +---------------------------------------------------------+ pre-split | | | | | + | | +-----------------> | | <---------------------+--------------+--------------------+----+ | + | | | | | | | | + | | | Uninitialized | Raft msg | | | | + | | | | -----------------+ | | | | + | | | | | | | | | + | | | | <----------------+ | | | | + | | +---------------------------------------------------------+ | | apply removal | | + | | | | | | | | + | | | ReplicaTooOldError | higher ReplicaID | Replica GC | | | + | | v v v | | | + | | Merged (snapshot) +---------------------------------------------------------------------------------------------+ | | | + | +----------------------> | | <+ | | + | | | | | + | apply Merge | | ReplicaTooOld | | + +---------------------------> | Removed | <---------------------+ | + | | | + | | higher ReplicaID | + | | <-------------------------------+ + +---------------------------------------------------------------------------------------------+ + + +When a Store starts, it iterates through all RangeDescriptors it can find on its +Engine. Finding a RangeDescriptor by definition implies that the Replica is +initialized. Raft state (a raftpb.HardState) for uninitialized Replicas may +exist, however it would be ignored until a message arrives addressing that +Replica, or a split trigger applies that instantiates and then initializes it. + +Uninitialized Replicas principally arise when a replication change occurs and a +new member is added to a Range. This new member will be contacted by the Raft +leader, creating an uninitialized Replica on the recipient which will then +negotiate a snapshot to become initialized and to receive the replicated log. A +split can be understood as a special case of that, except that all members of +the Range are created at around the same time (though in practice with arbitrary +delays due to the usual distributed systems reasons), and the uninitialized +Replica creation is triggered by the split trigger executing on the left-hand +side of the split, or a Replica of the right-hand side (already initialized by +the split trigger on another Store) reaching out, whichever occurs first. + +An uninitialized Replica requires a snapshot to become initialized. The case in +which the Replica is the right-hand side of a split can be understood as the +application of a snapshot as well (though this is not reflected in code at the +time of writing) where the left-hand side applies the split by (logically) +moving any data past the split point to the right-hand side Replica, thus +initializing it. In principle, since writes to the state machine do not need to +be made durable, it is conceivable that a split or snapshot could "unapply" due +to an ill-timed crash (though snapshots currently use SST ingestion, which the +storage engine performs durably). Similarly, entry application (which only +occurs on initialized Replicas) is not synced and so a suffix of applied entries +may need to be re-applied following a crash. + +If an uninitialized Replica receives a Raft message from a peer informing it +that it is no longer part of a more up-to-date Range configuration (via a +ReplicaTooOldError) or that it has been removed and re-added under a higher +ReplicaID, the uninitialized Replica is removed. There is currently no +general-purpose mechanism to determine whether an uninitialized Replica is +outdated; an uninitialized Replica could in principle leak "forever" if the +Range quickly changes its members such that the triggers mentioned here don't +apply A full scan of meta2 would be required as there is no RangeID-keyed index +on the RangeDescriptors (the meta ranges are keyed on the EndKey, which allows +routing requests based on the key ranges they touch). The fact that +uninitialized Replicas can be removed has to be taken into account by splits as +well; the split trigger may find that the right-hand side uninitialized Replica +has already been removed, in which case the right half of the split has to be +discarded (see acquireSplitLock and splitPostApply). + +Initialized Replicas represent the common case. They can apply snapshots +(required if they get cut off from the raft log via log truncation) which will +always move the applied log position forward, however this is rare. A Replica +typically spends most of its life applying log entries and/or serving reads. The +mechanisms that can lead to removal of uninitialized Replicas apply to +initialized Replicas in the exact same way, but there are additional ways for an +initialized Replica to be removed. The most general such mechanism is the +replica GC queue, which periodically checks the meta2 copy of the Range's +descriptor (using a consistent read, i.e. reading the latest version). If this +indicates that the Replica on the local Store should no longer exist, the +Replica is removed. Additionally, the Replica may directly witness a change that +indicates a need for a removal. For example, it may apply a change to the range +descriptor that removes it, or it may be destroyed by the application of a merge +on its left neighboring Replica, which may also occur through a snapshot. Merges +are the single most complex reconfiguration operation and can only be touched +upon here. At their core, they will at some point "freeze" the right-hand side +Replicas (via roachpb.SubsumeRequest) to prevent additional read or write +activity, and also ensure that the two sets of Ranges to be merged are +co-located on the same Stores as well as are all initialized. + +INVARIANT: An initialized Replica's RangeDescriptor always includes it as a member. + +INVARIANT: A Replica's ReplicaID is constant. + +Together, these invariants significantly reduce complexity since we do not need +to handle the excluded situations. Particularly a changing replicaID is +bug-inducing since at the Replication layer a change in replica ID is a complete +change of identity, and re-use of in-memory structures poses the threat of +erroneously re-using cached information. +*/ type Store struct { Ident *roachpb.StoreIdent // pointer to catch access before Start() is called cfg StoreConfig diff --git a/pkg/kv/kvserver/store_doc_replica_lifecycle.dot b/pkg/kv/kvserver/store_doc_replica_lifecycle.dot new file mode 100644 index 000000000000..27644dff6069 --- /dev/null +++ b/pkg/kv/kvserver/store_doc_replica_lifecycle.dot @@ -0,0 +1,40 @@ +digraph finite_state_machine { + +/* +This is the source file for the diagram on the `type Store` +comment. + +Generated via: + +docker run -i tsub/graph-easy --from=dot --as=ascii < \ + store_doc_replica_lifecycle.dot +*/ + +Absent -> Uninitialized [label = "Raft msg"]; +Absent -> Initialized [label = "Store.Start"]; + +Uninitialized -> Uninitialized [label = "Raft msg"]; + +Uninitialized -> Initialized [label = "applySnapshot"]; +Initialized -> Initialized [label = "applySnapshot"]; + +Absent -> Uninitialized [label = "pre-split "]; +Uninitialized -> Initialized [label = "post-split"]; + +Initialized -> Initialized [label = "Subsume"]; + +Initialized -> Initialized [label = "Crash"]; +Initialized -> Uninitialized [label = "Crash"]; + +Initialized -> Removed [label = "Replica GC"]; +Initialized -> Removed [label = "apply removal"]; +Initialized -> Removed [label = "ReplicaTooOld"]; +Initialized -> Removed [label = "Merged (snapshot)"]; +Initialized -> Removed [label = "apply Merge"]; +Initialized -> Removed [label = "higher ReplicaID"]; + +Uninitialized -> Removed [label = "ReplicaTooOldError"]; +Uninitialized -> Removed [label = "higher ReplicaID"]; + +} +