From 25182d3e4af1e5850de088eaae2ba4645bf277a1 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 15 Nov 2021 12:33:57 +0100 Subject: [PATCH] kvserver: document replica lifecycle Part of the research for #72374, which in turn was inspired by #38322. Release note: None --- .../replica_application_state_machine.go | 4 + pkg/kv/kvserver/store.go | 273 +++++++++++++++++- .../kvserver/store_doc_replica_lifecycle.dot | 40 +++ 3 files changed, 315 insertions(+), 2 deletions(-) create mode 100644 pkg/kv/kvserver/store_doc_replica_lifecycle.dot diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 225425da3945..6e5b5bfbd31c 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -469,6 +469,10 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error // Acquire the split or merge lock, if necessary. If a split or merge // command was rejected with a below-Raft forced error then its replicated // result was just cleared and this will be a no-op. + // + // TODO(tbg): can't this happen in splitPreApply which is called from + // b.runPreApplyTriggersAfterStagingWriteBatch and similar for merges? That + // way, it would become less of a one-off. if splitMergeUnlock, err := b.r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { var err error if cmd.raftCmd.ReplicatedEvalResult.Split != nil { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e1a7dd64b968..f0e3c2b457ea 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -411,8 +411,277 @@ func (rs *storeReplicaVisitor) EstimatedCount() int { return len(rs.repls) - rs.visited } -// A Store maintains a map of ranges by start key. A Store corresponds -// to one physical device. +/* +A Store maintains a set of Replicas whose data is stored on a storage.Engine +usually corresponding to a physical device. It also houses a collection of +subsystems that, in broad terms, perform maintenance of each Replica on the +Store when required. In particular, this includes various queues such as the +split, merge, rebalance, GC, and replicaGC queues (to name just a few). + +INVARIANT: the set of Ranges (as determined by, e.g. a transactionally +consistent scan of the meta index ranges) always exactly covers the addressable keyspace +roachpb.KeyMin (inclusive) to roachpb.KeyMax (exclusive). + +Ranges + +Each Replica is part of a Range, i.e. corresponds to what other systems would +call a shard. A Range is a consensus group backed by Raft, i.e. each Replica +is a state machine backed by a replicated log of commands. In CockroachDB, +Ranges own a contiguous chunk of addressable keyspace, where the word +"addressable" is a fine-print that interested readers can learn about in +keys.Addr; in short the Replica data is logically contiguous, but not +contiguous when viewed as Engine kv pairs. + +Considerable complexity is incurred by the features of dynamic re-sharding +and relocation, i.e. range splits, range merges, and range relocation (also +called replication changes, or, in the case of lateral movement, +rebalancing). Each of these interact heavily with the Range a consensus group +(of which each Replica is a member). All of these intricacies are described +at a high level in this comment. + +RangeDescriptor + +A roachpb.RangeDescriptor is the configuration of a Range. It is an +MVCC-backed key-value pair (where the key is derived from the StartKey via +keys.RangeDescriptorKey, which in particular resides on the Range itself) +that is accessible to the transactional KV API much like any other, but is +for internal use only (and in particular plays no role at the SQL layer). + +Splits, Merges, and Rebalances are all carried out as distributed +transactions. They are all complex in their own right but they share the +basic approach of transactionally acting on the RangeDescriptor. Each of +these operations at some point will + +- start a transaction + +- update the RangeDescriptor (for example, to reflect a split, or a change +to the Replicas comprising the members of the Range) + +- update the meta ranges (which form a search index used for request routing) + +- commit with a roachpb.InternalCommitTrigger. + +In particular, note that the RangeDescriptor is the first write issued in the +transaction, and so the transaction record will be created on the affected +range. This allows us to establish a helpful invariant: + +INVARIANT: an intent on keys.RangeDescriptorKey is resolved atomically with +the (application of the) roachpb.EndTxnRequest committing the transaction. + +To understand this invariant, it's best to try to make do without it and fail. +Assume that a descriptor change committed, but that the intent will only get +resolved asynchronously. + +- since the transaction committed, its update to the meta ranges becomes +visible and the cluster will start routing requests according to the new +descriptor. However, the Replicas may yet have to learn that the new +descriptor is committed. This is not a correctness problem, but it is +undesirable. The invariant avoids this, since the first intent to be resolved +will be the RangeDescriptor on the leaseholder (during EndTxn application), +and this will causally precede resolution of auxiliary intents since those +will only be resolved after the leaseholder has applied the EndTxn. + +- each Replica (and its surrounding Store) needs to run a considerable amount +of logic to update its state when switching to the new descriptor, and now +this logic has to be triggered by the intent resolution. This is a lot less +convenient than using instead the roachpb.EndTxnRequest as a direct trigger +due to the need to somehow pass the data through the intent resolution +process. + +Thanks to the invariant, a Replica can switch to the new RangeDescriptor when +it applies the corresponding EndTxnRequest. + +TODO(tbg): is that it? I thought there was more. Seems that even the raft +config change stuff doesn't "really" get more involved if we didn't have +the invariant. + +TODO: write something somewhere and link raft way of doing things and this: - +https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 Also make +sure the comment clearly has the invariants we need for apply-time activation. + +INVARIANT: A Store never contains two Replicas from the same Range, nor do +the key ranges for any two of its Replicas overlap. (Note that there +is no requirement that these Replicas come from a consistent set of Ranges). + +To illustrate this last invariant, consider a split of a Replica [a-z) into two +Replicas, [a-c) and [c,z). The Store will never contain both; it has to swap +directly from [a-z) to [a,c)+[c,z). For a similar example, consider accepting +a new Replica [b,d) when a Replica [a,c) is already present, which is similarly +not allowed. To understand how such situations could arise, consider that +a series of changes to the RangeDescriptor is observed asynchronously by +followers. This means that a follower may have Replica [a-z) while any number +of splits and replication changes are already known to the leaseholder, and +the new Ranges created in the process may be attempting to add a Replica to +the slow follower. + +With the invariant, we effectively allow looking up a unique (if it exists) +Replica for any given key, and ensure that no two Replicas on a Store operate +on shared keyspace (as seen by the storage.Engine). Refer to the Replica +Lifecycle diagram below for details on how this invariant is upheld. + +Replica Lifecycle + +A Replica should be thought of primarily as a State Machine applying commands +from a replicated log (the log being replicated across the members of the +Range). The Store's RaftTransport receives Raft messages from Replicas +residing on other Stores and routes them to the appropriate Replicas via +Store.HandleRaftRequest (which is part of the RaftMessageHandler interface), +ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which +houses the integration with the etcd/raft library (raft.RawNode). This may +generate Raft messages to be sent to other Stores; these are handed to +Replica.sendRaftMessages which ultimately hands them to the Store's +RaftTransport.SendAsync method. This is relatively straightforward unless +when Ranges are being reconfigured, which is where understanding the Replica +Lifecycle becomes important and upholding the Store's invariants becomes more +complex. + +A first phenomenon to understand is that of uninitialized Replicas. Such a +Replica corresponds to a State Machine that yet has to apply any entries, +and in particular has no notion of an active RangeDescriptor yet. This state +exists for various reasons: + +- A newly added voter already needs to be able to cast a vote even before it has +had time to be caught up by other nodes. In the extreme case, the very first +Raft message received at a Store leading to the creation of an (uninitialized) +Replica may be a request for a vote that is required for quorum. In practice we +mostly sidestep this voting requirement by adding new members through an +intermediate state (see roachpb.LEARNER) that allows them to catch up and become +initialized before making them voters, but at the protocol level this still +relies on the concept of a Replica without state that can act as a Raft peer. + +- For practical reasons, we don't preserve the entire committed replicated log +for eternity. It will periodically get truncated, i.e. a prefix discarded, and +this leads to followers occasionally being forced to catch up on the log via a +Raft Snapshot, i.e. a copy of the replicated data in the State Machine as of +some log position, from which the recipient Replica can then continue to receive +and apply log entries. + +- In CockroachDB, a newly created Range (say at cluster bootstrap, or during a +Range split) is never empty - it has to contain at least the RangeDescriptor. +For the split case, indeed it contains around half of the data originally housed +in the pre-split Range, and it is burdensome to distribute potentially hundreds +of megabytes through the Raft log. So what we do in CockroachDB is to never use +Raft entries zero through nine (see raftInitialLogIndex), which means that a +newly added Replica will need a first Snapshot to obtain access to the log. + +Externally, uninitialized Replicas should be viewed as an implementation +detail that is (or should be!) invisible to most access to a Store in the +context of processing requests coming from the KV API, as uninitialized +Replicas cannot serve such requests. + +The diagram is a lot to take in. The various transitions are discussed in +prose below, and the source .dot file is in store_doc_replica_lifecycle.dot. + + +---------------------+ + +------------------ | Absent | ---------------------------------------------------------------------------------------------------+ + | +---------------------+ | + | | Subsume Crash applySnapshot | + | | Store.Start +---------------+ +---------+ +---------------+ | + | v v | v | v | | + | +-----------------------------------------------------------------------------------------------------------------------+ | + +---------+------------------ | | | + | | | Initialized | | + | | | | | + | +----+------------------ | | -+----+ + | | | +-----------------------------------------------------------------------------------------------------------------------+ | | + | | | | ^ ^ | | | | | + | | | Raft msg | Crash | applySnapshot | post-split | | | | | + | | | v | | | | | | | + | | | +---------------------------------------------------------+ pre-split | | | | | + | | +-----------------> | | <---------------------+--------------+--------------------+----+ | + | | | | | | | | + | | | Uninitialized | Raft msg | | | | + | | | | -----------------+ | | | | + | | | | | | | | | + | | | | <----------------+ | | | | + | | +---------------------------------------------------------+ | | apply removal | | + | | | | | | | | + | | | ReplicaTooOldError | higher ReplicaID | Replica GC | | | + | | v v v | | | + | | Merged (snapshot) +---------------------------------------------------------------------------------------------+ | | | + | +----------------------> | | <+ | | + | | | | | + | apply Merge | | ReplicaTooOld | | + +---------------------------> | Removed | <---------------------+ | + | | | + | | higher ReplicaID | + | | <-------------------------------+ + +---------------------------------------------------------------------------------------------+ + + +When a Store starts, it iterates through all RangeDescriptors it can find on its +Engine. Finding a RangeDescriptor by definition implies that the Replica is +initialized. Raft state (a raftpb.HardState) for uninitialized Replicas may +exist, however it would be ignored until a message arrives addressing that +Replica, or a split trigger applies that instantiates and then initializes it. + +Uninitialized Replicas principally arise when a replication change occurs and a +new member is added to a Range. This new member will be contacted by the Raft +leader, creating an uninitialized Replica on the recipient which will then +negotiate a snapshot to become initialized and to receive the replicated log. A +split can be understood as a special case of that, except that all members of +the Range are created at around the same time (though in practice with arbitrary +delays due to the usual distributed systems reasons), and the uninitialized +Replica creation is triggered by the split trigger executing on the left-hand +side of the split, or a Replica of the right-hand side (already initialized by +the split trigger on another Store) reaching out, whichever occurs first. + +An uninitialized Replica requires a snapshot to become initialized. The case in +which the Replica is the right-hand side of a split can be understood as the +application of a snapshot as well (though this is not reflected in code at the +time of writing) where the left-hand side applies the split by (logically) +moving any data past the split point to the right-hand side Replica, thus +initializing it. In principle, since writes to the state machine do not need to +be made durable, it is conceivable that a split or snapshot could "unapply" due +to an ill-timed crash (though snapshots currently use SST ingestion, which the +storage engine performs durably). Similarly, entry application (which only +occurs on initialized Replicas) is not synced and so a suffix of applied entries +may need to be re-applied following a crash. + +If an uninitialized Replica receives a Raft message from a peer informing it +that it is no longer part of a more up-to-date Range configuration (via a +ReplicaTooOldError) or that it has been removed and re-added under a higher +ReplicaID, the uninitialized Replica is removed. There is currently no +general-purpose mechanism to determine whether an uninitialized Replica is +outdated; an uninitialized Replica could in principle leak "forever" if the +Range quickly changes its members such that the triggers mentioned here don't +apply A full scan of meta2 would be required as there is no RangeID-keyed index +on the RangeDescriptors. The fact that uninitialized Replicas can be removed has +to be taken into account by splits as well; the split trigger may find that the +right-hand side uninitialized Replica has already been removed, in which case +the right half of the split has to be discarded (see acquireSplitLock and +splitPostApply). + +Initialized Replicas represent the common case. They can apply snapshots +(required if they get cut off from the raft log via log truncation) which will +always move the applied log position forward, however this is rare. A Replica +typically spends most of its life applying log entries and/or serving reads. The +mechanisms that can lead to removal of uninitialized Replicas apply to +initialized Replicas in the exact same way, but there are additional ways for an +initialized Replica to be removed. The most general such mechanism is the +replica GC queue, which periodically checks the meta2 copy of the Range's +descriptor (using a consistent read, i.e. reading the latest version). If this +indicates that the Replica on the local Store should no longer exist, the +Replica is removed. Additionally, the Replica may directly witness a change that +indicates a need for a removal. For example, it may apply a change to the range +descriptor that removes it, or it may be destroyed by the application of a merge +on its left neighboring Replica, which may also occur through a snapshot. Merges +are the single most complex reconfiguration operation and can only be touched +upon here. At their core, they will at some point "freeze" the right-hand side +Replicas (via roachpb.SubsumeRequest) to prevent additional read or write +activity, and also ensure that the two sets of Ranges to be merged are +co-located on the same Stores as well as are all initialized. + +INVARIANT: An initialized Replica's RangeDescriptor always includes it as a member. + +INVARIANT: A Replica's ReplicaID is constant. + +Together, these invariants significantly reduce complexity since we do not need +to handle the excluded situations. Particularly a changing replicaID is +bug-inducing since at the Replication layer a change in replica ID is a complete +change of identity, and re-use of in-memory structures poses the threat of +erroneously re-using cached information. +*/ type Store struct { Ident *roachpb.StoreIdent // pointer to catch access before Start() is called cfg StoreConfig diff --git a/pkg/kv/kvserver/store_doc_replica_lifecycle.dot b/pkg/kv/kvserver/store_doc_replica_lifecycle.dot new file mode 100644 index 000000000000..27644dff6069 --- /dev/null +++ b/pkg/kv/kvserver/store_doc_replica_lifecycle.dot @@ -0,0 +1,40 @@ +digraph finite_state_machine { + +/* +This is the source file for the diagram on the `type Store` +comment. + +Generated via: + +docker run -i tsub/graph-easy --from=dot --as=ascii < \ + store_doc_replica_lifecycle.dot +*/ + +Absent -> Uninitialized [label = "Raft msg"]; +Absent -> Initialized [label = "Store.Start"]; + +Uninitialized -> Uninitialized [label = "Raft msg"]; + +Uninitialized -> Initialized [label = "applySnapshot"]; +Initialized -> Initialized [label = "applySnapshot"]; + +Absent -> Uninitialized [label = "pre-split "]; +Uninitialized -> Initialized [label = "post-split"]; + +Initialized -> Initialized [label = "Subsume"]; + +Initialized -> Initialized [label = "Crash"]; +Initialized -> Uninitialized [label = "Crash"]; + +Initialized -> Removed [label = "Replica GC"]; +Initialized -> Removed [label = "apply removal"]; +Initialized -> Removed [label = "ReplicaTooOld"]; +Initialized -> Removed [label = "Merged (snapshot)"]; +Initialized -> Removed [label = "apply Merge"]; +Initialized -> Removed [label = "higher ReplicaID"]; + +Uninitialized -> Removed [label = "ReplicaTooOldError"]; +Uninitialized -> Removed [label = "higher ReplicaID"]; + +} +