Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: interface for ReplicasStorage #72795

Merged
merged 1 commit into from
Dec 15, 2021
Merged

Conversation

sumeerbhola
Copy link
Collaborator

ReplicasStorage provides an interface to manage the persistent state that
includes the lifecycle of a range replica, its raft log, and the state
machine state. The implementation(s) are expected to be a stateless wrapper
around persistent state in the underlying engine(s) (any state they
maintain in-memory would be simply a performance optimization and always
be in-sync with the persistent state).

We consider the following distinct kinds of persistent state:

  • State machine state: It contains all replicated keys: replicated range-id
    local keys, range local keys, range lock keys, lock table keys, global
    keys. This includes the RangeAppliedState and the RangeDescriptor.

  • Raft and replica life-cycle state: This includes all the unreplicated
    range-ID local key names prefixed by Raft, and the RangeTombstoneKey.
    We will loosely refer to all of these as "raft state".

The interface requires that any mutation (batch or sst) only touch one of
these kinds of state. This discipline will allow us to eventually separate
the engines containing these two kinds of state. This interface is not
relevant for store local keys though they will be in the latter engine. The
interface does not allow the caller to specify whether to sync a mutation
to the raft log or state machine state -- that decision is left to the
implementation of ReplicasStorage. So the hope is that even when we don't
separate the state machine and raft engines, this abstraction will force us
to reason more carefully about effects of crashes, and when to sync, and
allow us to test more thoroughly (including "crash" testing using
strict-mem FS).

ReplicasStorage does not interpret most of the data in the state machine.
It expects mutations to that state to be provided as an opaque batch, or a
set of files to be ingested. There are a few exceptions where it can read
state machine state, mainly when recovering from a crash, so as to make
changes to get to a consistent state.

  • RangeAppliedStateKey: needs to read this in order to truncate the log,
    both as part of regular log truncation and on crash recovery.
  • RangeDescriptorKey: needs to read this to discover ranges whose state
    machine state needs to be discarded on crash recovery.

A corollary to this lack of interpretation is that reads of the state
machine are not handled by this interface, though it does expose some
metadata in case the reader want to be sure that the range it is trying to
read actually exists in storage. ReplicasStorage also does not offer an
interface to construct changes to the state machine state. It simply
applies changes, and requires the caller to obey some simple invariants to
not cause inconsistencies. It is aware of the keyspace occupied by a range
and the difference between rangeID keys and range keys -- it needs this
awareness to restore internal consistency when initializing (say after a
crash), by clearing the state machine state for replicas that should no
longer exist.

ReplicasStorage does interpret the raft state (all the unreplicated
range-ID local key names prefixed by Raft), and the RangeTombstoneKey. This
is necessary for it to be able to maintain invariants spanning the raft log
and the state machine (related to raft log truncation, replica lifetime
etc.), including reapplying raft log entries on restart to the state
machine. All accesses (read or write) to the raft log and RangeTombstoneKey
must happen via ReplicasStorage.

Since this abstraction is mutating the same underlying engine state that
was previously mutated via lower-level interfaces, and is not a
data-structure in the usual sense, we should be able to migrate callers
incrementally to use this interface. That is, callers that use this
interface, and those that use the lower-level engine interfaces could
co-exist correctly.

Informs #38322

Release note: None

@sumeerbhola sumeerbhola requested a review from a team as a code owner November 16, 2021 00:48
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@sumeerbhola sumeerbhola marked this pull request as draft November 16, 2021 00:48
Copy link
Collaborator Author

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also a number of questions, marked as TODO* that I need answers or code pointers for.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten and @tbg)

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a partial review, but I did get to all of your TODO*s, hope this helps a bit.

I am still wrapping my head around this interface. I am unsure whether the initialized/uninitialized distinction should show up here. Without having thought it through, I think it shouldn't. I think the interface would be simpler if it "simply" assumed that there is always a state, but it just so happens that the state at AppliedIndex=0 is completely empty, and that this is then taken into account for the internal fixup procedures as needed. But I might be wrong here.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten and @sumeerbhola)


-- commits, line 18 at r1:
there's a third category, unreplicated keys that are neither raft nor tombstone, namely RangeLastReplicaGCTimestamp. It makes sense for this key to be unreplicated but we have to decide which engine it should live on.

  • state engine: this is where the durability guarantees are right (this write doesn't need syncing) but I think so far we have the property that the state engine has "exactly" the replicated data which is nice.
  • raft engine (which is by definition the "unreplicated engine"): they don't hurt here and you already say that it will house the store local keys, which are similar.

So raft engine it is.


-- commits, line 30 at r1:
👍 that will be fun.


-- commits, line 48 at r1:
by a replica?


pkg/storage/replicas_storage.go, line 47 at r1 (raw file):

// - alter this interface to include a more pragmatic once we have settled on
//   the ideal interface.
// - ensure that the minimal integration steo includes ReplicasStorage.Init,

step


pkg/storage/replicas_storage.go, line 59 at r1 (raw file):

//   range-ID local key names prefixed by Raft, and the RangeTombstoneKey.
//   We will loosely refer to all of these as "raft state".
//   RangeLastReplicaGCTimestamp changes are ignored below, since it is

Ah here we go, 👍


pkg/storage/replicas_storage.go, line 122 at r1 (raw file):

// - [C1*] creation of RaftHardStateKey in raft state with {Term:0, Vote:0,
//   Commit:0}.
// - [C2*] creation of state machine state (via snapshot or some synthesized

We discussed not ever syncing the state machine, are you intentionally leaving this out of scope for now? I agree this is tricky without the sync. If a replica gets initialized via a non-durable snapshot and then we write the truncated state & accept some log entries followed by a crash, we may come up with an applied index of zero (i.e. uninitialized), a truncated index of 10, and a log (say) 11, 12, 13. I believe this is is actually in violation of raft invariants. A snapshot at applied index N embodies the log entries <= N. So going from [Snapshot at N, N+1, N+2] to [N+1, N+2] or even just [Snapshot at N-1, N+1, N+2] means that the node has lost the ability to catch up followers. So there is something wrong with the angle that I was looking at this from, the snapshot is really more like a log than we would have liked for the purposes of not having to sync the state machine.
There is maybe a silver lining here which is that our snapshots use SST ingestion, so this is actually durable. We'll have to think hard about splits where this is not the case, but perhaps we can ingest splits as SSTs on the state machine too? That still seems better than maintaining a WAL on the state machine just for use by splits, and I'd rather stay out of the game of mixing wal- and non-wal writes.


pkg/storage/replicas_storage.go, line 141 at r1 (raw file):

// raft log entries. So a precondition for applying such a snapshot is:
// - The raft log does not have entries beyond the snapshot's
//   RangeAppliedState.RaftAppliedIndex. If it did, there would be no benefit

You could make the argument that a snapshot could be a faster way to catch up over a very long log. But I agree that we should refuse snapshots that are not necessary and thus have this invariant locked in. (It's morally true today; it better be, or we're violating Raft's invariants)


pkg/storage/replicas_storage.go, line 165 at r1 (raw file):

//   provisional one or not (though I believe a provisional RangeDescriptor
//   only exists if there is also a committed one). This step needs to sync
//   because of step D3 after it.

Why exactly? If we don't sync this, we may restart after a crash and find a replica state, no tombstone key, and no hard state. Not finding a hard state means we can delete everything, no?


pkg/storage/replicas_storage.go, line 186 at r1 (raw file):

// - Consistency at creation: we need to maintain the following invariants at
//   all times.
//   - HardState.Commit >= RangeAppliedState.RaftAppliedIndex

This isn't strictly necessary, right? We could just forward to the RaftAppliedIndex if necessary. But there are some invariants about syncing committed indexes imposed on us by configuration changes, see https://github.com/cockroachdb/cockroach/pull/72745/files#diff-50e458584d176deae52b20a7c04461b3e4110795c8c9a307cf7ee6696ba6bc60


pkg/storage/replicas_storage.go, line 216 at r1 (raw file):

//   snapshot).
//   TODO*: can we say something stronger than "awkward" here. I would be
//   surprised if this didn't land us in trouble in some manner.

It violates the raft invariant of putting entries onto durable storage. If we have [1 2 3] and apply a snapshot <= N, where before we could catch up a follower to entry 3 we can now catch them up all the way to N (by sending the snapshot). If we had [1 2 3] and after the crash have nothing, we are in effect like an unavailable replica until someone catches us up again. Not good.


pkg/storage/replicas_storage.go, line 252 at r1 (raw file):

//     spans for the live ranges will be subtracted from the dead key spans,
//     and the resulting key spans will be removed.
//     TODO*: the key spans are in the RangeDescriptor, but RangeDescriptors

The active replica configuration (including key bounds) is always the most recent version (excluding an intent) of the range descriptor known to the replica.


pkg/storage/replicas_storage.go, line 373 at r1 (raw file):

	//   DoRaftMutation with the HardState returned by RawNode.Ready potentially
	//   regress the HardState.Commit, or is it guaranteed to be consistent with
	//   what we've done when applying the snapshot?

When a snapshot is stepped into raft, it does this (sorry, Goland doesn't manage to easily produce Github links from vendored files):

func (u *unstable) restore(s pb.Snapshot) {
	u.offset = s.Metadata.Index + 1
	u.entries = nil
	u.snapshot = &s
}

Before it gets there, it updates committed:

	// Now go ahead and actually restore.

	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}

	r.raftLog.restore(s)

So the Ready that asks us to apply the snapshot should also hand us a HardState whose Committed field matches s.Metadata.Index.


pkg/storage/replicas_storage.go, line 390 at r1 (raw file):

	// ApplyCommittedBatch applies committed changes to the state machine state.
	// Does not sync.
	// REQUIRES: range is in state InitializedStateMachine.

This isn't "really" a requirement, but it is always true in our use of raft (due to raftInitialLogIndex). If it were ok to catch up from index zero (i.e. the empty log), uninitialized replicas could in principle do that. I find it a bit surprising that whether a replica is initialized or not is part of the interface here, but maybe it has to be or you are really just reinventing a replicaRaftStorage backed by an interface:

// replicaRaftStorage implements the raft.Storage interface.
type replicaRaftStorage Replica
var _ raft.Storage = (*replicaRaftStorage)(nil)


pkg/storage/replicas_storage.go, line 493 at r1 (raw file):

	//
	// TODO*: can we receive a post-merge snapshot for the LHS, and apply it
	// instead of applying the merge via raft (which is what calls MergeRange)?

No this can totally happen, see

// The on-disk state is now committed, but the corresponding in-memory state
// has not yet been updated. Any errors past this point must therefore be
// treated as fatal.
if err := r.clearSubsumedReplicaInMemoryData(ctx, subsumedRepls, mergedTombstoneReplicaID); err != nil {
log.Fatalf(ctx, "failed to clear in-memory data of subsumed replicas while applying snapshot: %+v", err)
}

There are complicated invariants that ensure that a replica can only be replicaGC'ed if we're sure it isn't waiting to catch up on a merge, which also relies on the fact that we won't commit the merge unless both ranges are colocated with all replicas initialized.


pkg/storage/replicas_storage.go, line 505 at r1 (raw file):

	//
	// Called below Raft -- this is being called when the merge transaction commits.
	MergeRange(lhsRS RangeStorage, rhsRS RangeStorage, smBatch MutationBatch) error

Like below, MergeReplicas seems more apt. The range has merged, but here we are merging two Replicas.


pkg/storage/replicas_storage.go, line 509 at r1 (raw file):

	// DiscardRange that has been rebalanced away. The range is not necessarily
	// initialized.
	DiscardRange(r RangeStorage) error

We're discarding a replica, so the word Range isn't right.

Copy link
Collaborator Author

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can cleanly handle the state transitions that span splits and merges, or snapshot application, which is where the subtleties are, without talking about uninitialized state.

I've made revisions here based on your comments. And added a few more questions. The main changes are:

  • MutationBatch now exposes the underlying Batch so that ReplicasStorage can add more mutations. This is used in the implementation of SplitRange to classify what the RHS state should be and decide whether to clear it completely. I think it is cleaner to do this inside the ReplicasStorage implementation since it anyway needs to handle this case in Init.
  • Init can apply entries that do splits and merges.
  • The merge via snapshot case is now handled.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


-- commits, line 18 at r1:

Previously, tbg (Tobias Grieger) wrote…

there's a third category, unreplicated keys that are neither raft nor tombstone, namely RangeLastReplicaGCTimestamp. It makes sense for this key to be unreplicated but we have to decide which engine it should live on.

  • state engine: this is where the durability guarantees are right (this write doesn't need syncing) but I think so far we have the property that the state engine has "exactly" the replicated data which is nice.
  • raft engine (which is by definition the "unreplicated engine"): they don't hurt here and you already say that it will house the store local keys, which are similar.

So raft engine it is.

Agreed. The code comment says something about this already, and I've now added to it.


-- commits, line 30 at r1:

Previously, tbg (Tobias Grieger) wrote…

👍 that will be fun.

It has been very useful in the Pebble metamorphic test.
Our invariants seem most shaky if we can lose unsynced parts of the WAL, based on discussions in [1], [2], [3] which I suspect almost never happens during a typical "node failure", so I suspect we would find bugs if we could run such tests. But without an interface like ReplicasStorage it seems harder to construct and run such tests.

[1] #72745 (review)
[2] https://cockroachlabs.slack.com/archives/C02KHQMF2US/p1635859782038200?thread_ts=1635799562.036400&cid=C02KHQMF2US
[3] new comment with Init() below about merge and RHS regression despite waitForApplication.


pkg/storage/replicas_storage.go, line 47 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

step

Done


pkg/storage/replicas_storage.go, line 122 at r1 (raw file):

We discussed not ever syncing the state machine, are you intentionally leaving this out of scope for now?

Yes, this is intentional. I think we should first strengthen the invariants, and thoroughly test, then transition to the new interface, and then later consider optimizing for no WAL on the state machine. So everything here assumes a WAL for the state machine state. I wouldn't be surprised (both based on @nvanbenschoten's old experiments, and back-of-the-envelope calculations of write amplification) if we find that separating the engines gives us most of the performance benefit, and end up stopping there.

We'll have to think hard about splits where this is not the case, but perhaps we can ingest splits as SSTs on the state machine too?

Yes, we can definitely try that.

That still seems better than maintaining a WAL on the state machine just for use by splits, and I'd rather stay out of the game of mixing wal- and non-wal writes.

I agree that we shouldn't mix wal and non-wal in the same engine. I have a hard enough time reasoning about mixing of regular writes and ingests, where the latter can "race ahead" wrt durability (which is a similar issue), but those at least satisfy a key non-overlap invariant. The mix of wal and non-wal writes would not even have that invariant.


pkg/storage/replicas_storage.go, line 141 at r1 (raw file):

It's morally true today; it better be, or we're violating Raft's invariants

Just to make sure I understand -- is there something in our code or etcd/raft that currently prevents such snapshot application from happening?


pkg/storage/replicas_storage.go, line 165 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Why exactly? If we don't sync this, we may restart after a crash and find a replica state, no tombstone key, and no hard state. Not finding a hard state means we can delete everything, no?

You raise a very good point.

  • I am now wondering whether RangeTombstoneKey has become redundant in this scheme, since Init doesn't look at it (described below) when deciding whether an initialized range is live.
  • The real reason to sync here is not due to D3, but because we could later execute C1 when adding the range back to this node, and then crash. On crash recovery we'd find the raft HardState and old state machine state and incorrectly think this is an initialized replica.

pkg/storage/replicas_storage.go, line 186 at r1 (raw file):

This isn't strictly necessary, right?

I was interpreting what you had said on the thread https://cockroachlabs.slack.com/archives/C02KHQMF2US/p1636710202051500?thread_ts=1636021850.043600&cid=C02KHQMF2US

But there are some invariants about syncing committed indexes imposed on us

Are there cases where we don't sync?


pkg/storage/replicas_storage.go, line 216 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

It violates the raft invariant of putting entries onto durable storage. If we have [1 2 3] and apply a snapshot <= N, where before we could catch up a follower to entry 3 we can now catch them up all the way to N (by sending the snapshot). If we had [1 2 3] and after the crash have nothing, we are in effect like an unavailable replica until someone catches us up again. Not good.

Ah yes, that makes sense. I've updated the text.


pkg/storage/replicas_storage.go, line 252 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

The active replica configuration (including key bounds) is always the most recent version (excluding an intent) of the range descriptor known to the replica.

Ack.

  • I am currently assuming that the span parameter in IngestRangeSnapshot is this most recent version. And since ReplicasStorage is explicitly informed of SplitRange (with the rhsSpan) and MergeRange, the implementation of ReplicasStorage (if it chooses to) can stay up-to-date about the latest span for a range without having to reread it from the RangeDescriptor in the engine.
  • The bullet starting with "Merge of R and R2 ..." has some more unanswered questions for this TODO*

pkg/storage/replicas_storage.go, line 336 at r1 (raw file):

	// committed with sync=true before returning.
	// REQUIRES: if rBatch.Lo < rBatch.Hi, the range is in state
	// InitializedStateMachine.

I've added a few questions here.

	// TODO*: I'd overlooked raft.Ready.MustSync. Is it false only when the
	// there is no change to HardState and log entries are being appended at the
	// leader, since it is allowed to send out new log entries before locally
	// syncing? We would need to capture that with a parameter here. Do we
	// additionally add a method to be able to sync the raft state, or can we
	// rely on the fact that when the leader has to mark these entries
	// committed, it will need to update HardState and that will set
	// MustSync=true.

pkg/storage/replicas_storage.go, line 373 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

When a snapshot is stepped into raft, it does this (sorry, Goland doesn't manage to easily produce Github links from vendored files):

func (u *unstable) restore(s pb.Snapshot) {
	u.offset = s.Metadata.Index + 1
	u.entries = nil
	u.snapshot = &s
}

Before it gets there, it updates committed:

	// Now go ahead and actually restore.

	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}

	r.raftLog.restore(s)

So the Ready that asks us to apply the snapshot should also hand us a HardState whose Committed field matches s.Metadata.Index.

Thanks, that's good to know.


pkg/storage/replicas_storage.go, line 390 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This isn't "really" a requirement, but it is always true in our use of raft (due to raftInitialLogIndex). If it were ok to catch up from index zero (i.e. the empty log), uninitialized replicas could in principle do that. I find it a bit surprising that whether a replica is initialized or not is part of the interface here, but maybe it has to be or you are really just reinventing a replicaRaftStorage backed by an interface:

// replicaRaftStorage implements the raft.Storage interface.
type replicaRaftStorage Replica
var _ raft.Storage = (*replicaRaftStorage)(nil)

Yes, the goal here was to capture more of the CockroachDB context that is not in the raft.Storage interface. I've adjusted the REQUIRES comment to include raftInitialLogIndex.


pkg/storage/replicas_storage.go, line 404 at r1 (raw file):

	//   - Applying committed entries from raft logs to the state machines when
	//     the state machine is behind (and possibly regressed from before the
	//     crash) because of not syncing.

I added some questions here

	//   - Applying committed entries from raft logs to the state machines when
	//     the state machine is behind (and possibly regressed from before the
	//     crash) because of not syncing. There is a potential subtlety here for
	//     merges of R1 and R2: it cannot apply the merge in R1's raft log until
	//     R2's state machine is up-to-date, so this merge application in R1
	//     would need to pause and find R2 and bring it up to date, and only
	//     then continue (if possible). Since AdminMerge ensures *all* R2
	//     replicas are up-to-date before it creates the log entry in R1 that
	//     will do the merge (by calling waitForApplication), one can be sure
	//     that if this store can apply the merge, the R2 replica here has all
	//     the log entries in order to become up-to-date. So after it has
	//     applied R2 up to the committed index it can resume applying R1.
	//     TODO*:
	//     - confirm my understanding of this merge invariant.
	//     - waitForApplication cannot prevent regression of R2's state machine
	//       in a node that crashes and recovers. Since we currently don't have
	//       an Init() functionality, what if the raft group for R1 gets
	//       reactivated and starts applying the committed entries and applies
	//       the merge with regressed R2 state? Is changeRemovesReplica=true
	//       (which forces a sync on state machine application) for the first
	//       batch in a merge transaction?


pkg/storage/replicas_storage.go, line 493 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

No this can totally happen, see

// The on-disk state is now committed, but the corresponding in-memory state
// has not yet been updated. Any errors past this point must therefore be
// treated as fatal.
if err := r.clearSubsumedReplicaInMemoryData(ctx, subsumedRepls, mergedTombstoneReplicaID); err != nil {
log.Fatalf(ctx, "failed to clear in-memory data of subsumed replicas while applying snapshot: %+v", err)
}

There are complicated invariants that ensure that a replica can only be replicaGC'ed if we're sure it isn't waiting to catch up on a merge, which also relies on the fact that we won't commit the merge unless both ranges are colocated with all replicas initialized.

Thanks. I've fixed this. And added another TODO*

	// TODO*: What happens to the transactional delete of the RangeDescriptor of
	// the RHS in a merge. It can't be getting applied here, since that is a different raft
	// group. I am guessing the following (confirm with tbg):
	// - The merge txns transaction record is on the LHS, so when it commits,
	//   we also delete the state of the RHS that is not relevant to the merged
	//   range (and do this deletion below raft). This deletion does not include
	//   the RangeDescriptor for the RHS since it is range local state that is
	//   expected to be relevant to the merged range (though the RHS
	//   RangeDescriptor actually isn't relevant).
	// - After the previous step, the LHR RangeDescriptor is resolved synchronously,
	//   but the RHS is not. Someone could observe it and do resolution, which will
	//   delete it.
	// - QueueLastProcessedKey is another range local key that is anchored to the
	//   start of a range an no longer relevant when the RHS is deleted. I don't
	//   see it being deleted in AdminMerge. How is that cleaned up?

pkg/storage/replicas_storage.go, line 505 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Like below, MergeReplicas seems more apt. The range has merged, but here we are merging two Replicas.

Done


pkg/storage/replicas_storage.go, line 509 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

We're discarding a replica, so the word Range isn't right.

Done

Copy link
Collaborator Author

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


pkg/storage/replicas_storage.go, line 336 at r1 (raw file):
comment by @nvanbenschoten on #72745 says

We don't sync the HardState when the only thing that changed is the commit index

So if we had two separate engines, it is possible that due to a crash we have HardState.Commit < RangeAppliedState.RaftAppliedIndex?
This by itself is easy to fix up, in ReplicasStorage.Init assuming the log entries corresponding to RaftAppliedIndex are guaranteed to be durable.

  • I'd still like to know the full list of cases where we don't sync due to MustSync=false.
  • It is possible that both HardState.Commit and RaftAppliedIndex regress, which means if we did some application using side-loaded files (that has become durable), ReplicasStorage.Init does not have enough information to restore the state machine to a consistent state. I was hoping we would be able to fix this as part of introducing ReplicasStorage. Is it ok if we sync HardState when we realize there are side-loaded files in the committed entries?

pkg/storage/replicas_storage.go, line 178 at r2 (raw file):
Based on the following comment by @nvanbenschoten on the other PR, maybe I am wrong about deleting this. I suppose we are ok with leaking these?

For instance, thanks to the range tombstone key, I believe we have an invariant that replica IDs for the same range but across replicas are monotonically increasing on a given store.

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


pkg/storage/replicas_storage.go, line 122 at r1 (raw file):

but perhaps we can ingest splits as SSTs on the state machine too? That still seems better than maintaining a WAL on the state machine just for use by splits, and I'd rather stay out of the game of mixing wal- and non-wal writes.

If the goal of using SST ingestion is durability in a world where we have no WAL, isn't this equivalent performance-wise, if not even less performant, than just writing and immediately flushing the memtable? An SST ingestion will flush the memtable(s) in almost all cases that I think we care about this for. And at least in this scheme, we don't then need to ingest and compact away another small SST after the memtable flush.

Sumeer, do you have a sense of the relative cost of a memtable flush call (vs. a write, a write + fsync, an sst ingestion)? I know in the past you've been concerned about flushing on the hot path of Raft application (e.g. during splits). Is this because the average case is too costly, or because we're a lot more susceptible to blocking in the worst-cast in the event of an LSM write stall?


pkg/storage/replicas_storage.go, line 141 at r1 (raw file):

Previously, sumeerbhola wrote…

It's morally true today; it better be, or we're violating Raft's invariants

Just to make sure I understand -- is there something in our code or etcd/raft that currently prevents such snapshot application from happening?

By "beyond", do we mean at a lower log index or at a higher log index? I don't think it would be a problem for a recipient of a snapshot to retain uncommitted log entries at indexes above the snapshot index, but I think it would also be useless because they will necessarily be replaced.


pkg/storage/replicas_storage.go, line 336 at r1 (raw file):

I'd still like to know the full list of cases where we don't sync due to MustSync=false.

The logic that sets MustSync is here: https://github.com/etcd-io/etcd/blob/573e055cd3bab9ddfa1572c5a05755073cbe5ab5/raft/node.go#L592

So the only case where HardState changes but we don't require a sync is the case where only the commit index changes.

It is possible that both HardState.Commit and RaftAppliedIndex regress, which means if we did some application using side-loaded files (that has become durable)

If we did some application using side-loaded files, wouldn't the RaftAppliedIndex also become durable?

Are you concerned about the case where we don't flush the memtable on sst ingestion? If so, what did you make of my comment about always flushing memtables on sst ingestion in #72745?


pkg/storage/replicas_storage.go, line 493 at r1 (raw file):

Previously, sumeerbhola wrote…

Thanks. I've fixed this. And added another TODO*

	// TODO*: What happens to the transactional delete of the RangeDescriptor of
	// the RHS in a merge. It can't be getting applied here, since that is a different raft
	// group. I am guessing the following (confirm with tbg):
	// - The merge txns transaction record is on the LHS, so when it commits,
	//   we also delete the state of the RHS that is not relevant to the merged
	//   range (and do this deletion below raft). This deletion does not include
	//   the RangeDescriptor for the RHS since it is range local state that is
	//   expected to be relevant to the merged range (though the RHS
	//   RangeDescriptor actually isn't relevant).
	// - After the previous step, the LHR RangeDescriptor is resolved synchronously,
	//   but the RHS is not. Someone could observe it and do resolution, which will
	//   delete it.
	// - QueueLastProcessedKey is another range local key that is anchored to the
	//   start of a range an no longer relevant when the RHS is deleted. I don't
	//   see it being deleted in AdminMerge. How is that cleaned up?

I believe that this is all correct, but @tbg should confirm.


pkg/storage/replicas_storage.go, line 49 at r2 (raw file):

// both raft engine state or state machine engine state. Which means transient
// inconsistencies can develop. We will either
// - alter this interface to include a more pragmatic once we have settled on

A more pragmatic what?


pkg/storage/replicas_storage.go, line 70 at r2 (raw file):

//   replicated state machine state.
//
// The interface requires that any mutation (batch or sst) only touch one of

Would it be worth asserting dynamically that SSTs or MutationBatches that are passed through this interface only touch one engine or the other? Maybe in a race-build-only implementation of this interface.


pkg/storage/replicas_storage.go, line 300 at r2 (raw file):

//   that the caller is unaware of what is durable in the state machine. Hence
//   the advise provided by the caller serves as an upper bound of what can be
//   truncated. Log truncation does not need to be synced.

Somewhat off-topic, but the comment above got me thinking about how this separation of the local and replicated state for a given replica would relate to the concepts of witness replicas. Has this crossed your mind? I'm wondering whether it's accurate to say that a witness replica will only contain Raft state and no state machine state. In other words, its interactions will be entirely confined to the Raft engine.

I think this would be a nice validation of the separation of concerns here.


pkg/storage/replicas_storage.go, line 424 at r2 (raw file):

	// In handleRaftReadyRaftMuLocked, if there is a snapshot, it will first
	// call IngestRangeSnapshot, and then DoRaftMutation to change the
	// HardState.{Term,Vote,Commit}. We are doing not doing 2 syncs here, since

"doing not doing"


pkg/storage/replicas_storage.go, line 477 at r2 (raw file):

	//     TODO*:
	//     - confirm my understanding of this merge invariant.
	//     - waitForApplication cannot prevent regression of R2's state machine

I think it would be reasonable to change waitForApplication to waitForDurableApplication if that was needed to simplify some of the states that we can get in with merges and with migrations. The implementation of waitForApplication is already polling the range's LeaseAppliedIndex, so it would be reasonable to wait for the durable LeaseApplied index. In fact, we already deal with durability directly in waitForApplication.

@tbg tbg requested a review from nvanbenschoten November 24, 2021 12:09
Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


pkg/storage/replicas_storage.go, line 141 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

By "beyond", do we mean at a lower log index or at a higher log index? I don't think it would be a problem for a recipient of a snapshot to retain uncommitted log entries at indexes above the snapshot index, but I think it would also be useless because they will necessarily be replaced.

I thought Raft would accept a snapshot if it moves the AppliedIndex forward and that's it. So it seemed possible for a misguided snapshot to arrive at a replica and to lead to the deletion of acked log entries that aren't covered by the snapshot (i.e. higher indexes than the snapshot's applied index). But I just checked the code and there is this:

	// Now go ahead and actually restore.

	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}

I.e. if a replica has entries [100,200] and applied=150 and a snapshot arrives at applied=170, then instead of actually applying the snapshot, the replica will just make sure it considers 170 committed. In particular it won't discard [170,200].


pkg/storage/replicas_storage.go, line 186 at r1 (raw file):

I was interpreting what you had said on the thread https://cockroachlabs.slack.com/archives/C02KHQMF2US/p1636710202051500?thread_ts=1636021850.043600&cid=C02KHQMF2US

We don't need to jump through hoops to keep this invariant true on disk. But we have to make sure that we don't "run" etcd/raft in a state where commit < applied, by forwarding it when necessary.

Are there cases where we don't sync?

We don't generally sync unless either entries are appended, or a vote and/or term change has to be recorded:

// MustSync returns true if the hard state and count of Raft entries indicate
// that a synchronous write to persistent storage is required.
func MustSync(st, prevst pb.HardState, entsnum int) bool {
	// Persistent state on all servers:
	// (Updated on stable storage before responding to RPCs)
	// currentTerm
	// votedFor
	// log entries[]
	return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

In particular, a change in the commit index alone would never be synced.


pkg/storage/replicas_storage.go, line 493 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I believe that this is all correct, but @tbg should confirm.

I think the RHS intent gets resolved synchronously with the LHS intent when applying the merge trigger, see:

if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
// which locks are local (note that for a split, we want to use the
// pre-split one instead because it's larger).
desc = &mergeTrigger.LeftDesc
}

When the merge trigger applies, this means the merge committed, which means the RHS was frozen (not serving requests) for some time, so this is the time to clean everything up as nobody will observe anything about the RHS after.

It doesn't seem like QueueLastProcessedKey will ever be cleaned up. So we'll have it sit on the (now wider) LHS anchored at a key "in the middle" (the start key of the former LHS). Not clean, but benign. If we wanted to clear it, we'd have to do it as part of merge trigger evaluation. Honestly, I'd rather get rid of this key, it does't seem very useful.

cockroach/pkg/keys/keys.go

Lines 308 to 310 in 4df8ac2

func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeLastReplicaGCTimestampKey()
}
has a similar problem, I don't think we'll remove this key on the RHS if it exists there. This is an unreplicated key though, so we don't even have to be careful about cleaning it up through the replication layer, we can nuke it whenever we feel like it. This key too I think should not actually be used by CRDB.


pkg/storage/replicas_storage.go, line 103 at r2 (raw file):

// interface to construct changes to the state machine state. It simply
// applies changes, and requires the caller to obey some simple invariants to
// not cause inconsistencies. It is aware of the keyspace occupied by a range

replica


pkg/storage/replicas_storage.go, line 286 at r2 (raw file):

//       would delete R2's data. I suspect we're not changing raft membership
//       in the middle of a merge transaction, since the merge requires both
//       ranges to be on the same nodes, but need to confirm this.

The right-hand side of a merge is immovable once in the critical phase (i.e. past the SubsumeRequest is handled), until the merge txn aborts (if it ever does). On the leaseholder handling Subsume, this is done by the Subsume. But we also prevent all future leaseholders from doing anything that would violate the critical phase by observing the deletion intent on the range descriptor. For the latter mechanism see:

if leaseChangingHands && iAmTheLeaseHolder {
// When taking over the lease, we need to check whether a merge is in
// progress, as only the old leaseholder would have been explicitly notified
// of the merge. If there is a merge in progress, maybeWatchForMerge will
// arrange to block all traffic to this replica unless the merge aborts.
if _, err := r.maybeWatchForMergeLocked(ctx); err != nil {

The bottom line is, if a merge commits, regardless of which replicas know about this yet, the LHS and RHS will be fully colocated.


pkg/storage/replicas_storage.go, line 300 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Somewhat off-topic, but the comment above got me thinking about how this separation of the local and replicated state for a given replica would relate to the concepts of witness replicas. Has this crossed your mind? I'm wondering whether it's accurate to say that a witness replica will only contain Raft state and no state machine state. In other words, its interactions will be entirely confined to the Raft engine.

I think this would be a nice validation of the separation of concerns here.

I'm not 100% sure about witness replicas because I'm not 100% sure of their definition. If witness replicas are just the raft log without the state machine, then the question becomes whether they need to be able to catch up other replicas. If so - they need to maintain an infinite log. I don't think anyone wants to do that in practice though, so that's probably not what a witness replica actually is. If we say it's a log-only replica that does occasionally truncate the log, then yes, this seems to match up. We should be able to implement a witness replica via this interface by ignoring everything related to state handling.


pkg/storage/replicas_storage.go, line 477 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I think it would be reasonable to change waitForApplication to waitForDurableApplication if that was needed to simplify some of the states that we can get in with merges and with migrations. The implementation of waitForApplication is already polling the range's LeaseAppliedIndex, so it would be reasonable to wait for the durable LeaseApplied index. In fact, we already deal with durability directly in waitForApplication.

Writes wouldn't be expected to get durable within any reasonable amount of time though, right? We're waiting for a memtable flush, which may not happen anytime soon on an otherwise idle system. Not a show-stopper, but we'd need to schedule a sync at some point.

@tbg tbg requested review from tbg November 24, 2021 12:09
Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


pkg/storage/replicas_storage.go, line 145 at r1 (raw file):

// - Corollary: since HardState.Commit cannot refer to log entries beyond the
//   locally persisted ones, the existing HardState.Commit <=
//   RangeAppliedState.RaftAppliedIndex, so step C3 will only need to increase

It will also throw away historical raft log entries and update truncated state. I think you don't want to say that it doesn't, but the text sounds a bit like it. You'll have log [5,6,7] with a truncatedstate pointing at 5, and after the snapshot at 10 there will be no log, and a truncated state pointing at 10.


pkg/storage/replicas_storage.go, line 165 at r1 (raw file):

Previously, sumeerbhola wrote…

You raise a very good point.

  • I am now wondering whether RangeTombstoneKey has become redundant in this scheme, since Init doesn't look at it (described below) when deciding whether an initialized range is live.
  • The real reason to sync here is not due to D3, but because we could later execute C1 when adding the range back to this node, and then crash. On crash recovery we'd find the raft HardState and old state machine state and incorrectly think this is an initialized replica.

The D2->C1->crash scenario makes sense. Could it be better to delete using D2*->D1 (Note no sync)? That way we are going from initialized replica -> uninitialized replica -> no replica (could revert back to uninitialized), and it's never in an undefined state as it would be after D1*->D2->crash.
It's not like the extra sync matters (since we're talking about removal, which is rare enough) but my heuristic is that minimizing the number of required syncs leads to the "right" sequence of operations. Probably there are some reasons around merges that make us want to add the D1* sync back in for now, but even then this order seems better in that it seems to avoid some of the cleanup scenarios you describe below.


pkg/storage/replicas_storage.go, line 178 at r2 (raw file):

Previously, sumeerbhola wrote…

Based on the following comment by @nvanbenschoten on the other PR, maybe I am wrong about deleting this. I suppose we are ok with leaking these?

For instance, thanks to the range tombstone key, I believe we have an invariant that replica IDs for the same range but across replicas are monotonically increasing on a given store.

Yeah, that seems fine. I do wonder if we rely on that invariant at all though.


pkg/storage/replicas_storage.go, line 228 at r2 (raw file):

//   make a change in what we store in RangeAppliedState: RangeAppliedState
//   additionally contains the Term of the index corresponding to
//   RangeAppliedState.RaftAppliedIndex. We will see below that we need this

That makes a lot of sense. The state machine is "part of the log", it represents the compaction of the log from index zero all the way up to the applied index, and has to retain both the index and term corresponding to that log position. When we implement this it probably makes sense to use a different key since the term is stable.


pkg/storage/replicas_storage.go, line 269 at r2 (raw file):

//   - The dead ranges will have all their rangeID key spans in the state
//     machine removed.
//   - The union of the range (local, global, lock table) key spans of the

I don't understand (any more) what's going on here. It seems that there is some interaction between different replicas here (which may overlap after an ill-timed crash?)
Could you remind me how this can happen? Since all range descriptors are on the same engine, and they are only ever mutated in tandem so that there is no overlap, I'd say it can't. So crash recovery after deletion should be straightforward, pairing up descriptors for which there's no hard state, and nuking those? Might be missing something. I remember we talked about overlap before, but I'm not sure in which context any more.


pkg/storage/replicas_storage.go, line 290 at r2 (raw file):

// Normal operation of an initialized replica is straightforward:
// - ReplicasStorage will be used to append/replace log entries and update HardState.
//   Currently these always sync to disk.

as noted elsewhere, there's MustSync which basically says don't sync unless one of these actually changed. (In particular, don't sync changes to only commit index).


pkg/storage/replicas_storage.go, line 315 at r2 (raw file):

}

type RangeInfo struct {

ReplicaInfo?


pkg/storage/replicas_storage.go, line 336 at r2 (raw file):

type RaftMutationBatch struct {
	MutationBatch
	// [Lo, Hi) represents the raft log entries, if any in the MutationBatch.

Clarify whether this is appending the entries to the log (it is) or applying them to the state machine (it is not).


pkg/storage/replicas_storage.go, line 392 at r2 (raw file):

	// initialize a range except for the RHS of a split.
	//
	// Snapshot ingestion will not be accepted if:

This sounds as though the method will check all these, but it won't, right? These are effectively requirements the caller has to check before calling the method.


pkg/storage/replicas_storage.go, line 404 at r2 (raw file):

	// For reference, this will do:
	// - Step D1 for all subsumed ranges. This syncs, but subsumed ranges will
	//   usually be empty.

What do you mean by that? Merging empty ranges away is one prominent use of range merges, but I don't think we can say it's the "usual" case.


pkg/storage/replicas_storage.go, line 420 at r2 (raw file):

	// for those merges (in any of the raft groups that survived after each of
	// those merges). It did possibly contribute to the quorum for the raft
	// groups of the RHS of those merges, but those groups no longer exist.

log=[10,11,12] applied=10 committed=12
index 12 is the merge, subsuming a neighboring range r2
snapshot at 13 comes in, we crash in a way that cleans up r2 at init
at restart, we delete r2, but now r1 is applying the merge and r2 is absent, so we explode

What am I missing?

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


pkg/storage/replicas_storage.go, line 438 at r2 (raw file):

	// ApplyCommittedUsingIngest applies committed changes to the state machine
	// state by ingesting sstPaths. The ssts may not contain an update to
	// RangeAppliedState, in which case this call should be immediately followed

Is this called only during sideloading? What is the caller that has an AppliedIndex change in here?

@tbg tbg self-requested a review November 24, 2021 12:57
Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


pkg/storage/replicas_storage.go, line 467 at r2 (raw file):

	//     crash) because of not syncing. There is a potential subtlety here for
	//     merges of R1 and R2: it cannot apply the merge in R1's raft log until
	//     R2's state machine is up-to-date, so this merge application in R1

ah, this is the interdependency stuff that I remember us talking about. So it wasn't about deleting replicas directly.

@tbg tbg self-requested a review November 24, 2021 12:59
Copy link
Collaborator Author

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read through all the answers and comments and responded. There are no significant questions left at this point. Thanks for being patient through this!

I need to go through and fix all the existing and new TODOs that I've added. I'll ping when that is done.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


pkg/storage/replicas_storage.go, line 122 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

but perhaps we can ingest splits as SSTs on the state machine too? That still seems better than maintaining a WAL on the state machine just for use by splits, and I'd rather stay out of the game of mixing wal- and non-wal writes.

If the goal of using SST ingestion is durability in a world where we have no WAL, isn't this equivalent performance-wise, if not even less performant, than just writing and immediately flushing the memtable? An SST ingestion will flush the memtable(s) in almost all cases that I think we care about this for. And at least in this scheme, we don't then need to ingest and compact away another small SST after the memtable flush.

Sumeer, do you have a sense of the relative cost of a memtable flush call (vs. a write, a write + fsync, an sst ingestion)? I know in the past you've been concerned about flushing on the hot path of Raft application (e.g. during splits). Is this because the average case is too costly, or because we're a lot more susceptible to blocking in the worst-cast in the event of an LSM write stall?

We use 64MB memtables, so mean of 32MB when we force a flush -- it will take a few 100ms (at least) to flush. I don't have actual numbers. And yes it is likely that 1 flush + 1 ingest is worse than writing to memtable and then flushing.

We have a Pebble issue to reduce hiccups for ingests cockroachdb/pebble#25 but that work assumes that there is a WAL.


pkg/storage/replicas_storage.go, line 141 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I thought Raft would accept a snapshot if it moves the AppliedIndex forward and that's it. So it seemed possible for a misguided snapshot to arrive at a replica and to lead to the deletion of acked log entries that aren't covered by the snapshot (i.e. higher indexes than the snapshot's applied index). But I just checked the code and there is this:

	// Now go ahead and actually restore.

	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}

I.e. if a replica has entries [100,200] and applied=150 and a snapshot arrives at applied=170, then instead of actually applying the snapshot, the replica will just make sure it considers 170 committed. In particular it won't discard [170,200].

Thanks for the pointer that confirms this behavior. I've added it as a comment here.


pkg/storage/replicas_storage.go, line 145 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

It will also throw away historical raft log entries and update truncated state. I think you don't want to say that it doesn't, but the text sounds a bit like it. You'll have log [5,6,7] with a truncatedstate pointing at 5, and after the snapshot at 10 there will be no log, and a truncated state pointing at 10.

I updated this text to only talk about the HardState manipulation done in C3 (and not the other things that happen in C3).


pkg/storage/replicas_storage.go, line 165 at r1 (raw file):

Could it be better to delete using D2*->D1 (Note no sync)? That way we are going from initialized replica -> uninitialized replica -> no replica (could revert back to uninitialized), and it's never in an undefined state as it would be after D1*->D2->crash.
It's not like the extra sync matters (since we're talking about removal, which is rare enough) but my heuristic is that minimizing the number of required syncs leads to the "right" sequence of operations.

This is a good point. I think we need this change for other reasons too (later comment).
One question: say we remove the RangeDescriptor in D2, and then crash. On crash recovery we have an uninitialized replica. Will the replicaGCQueue eventually remove it?


pkg/storage/replicas_storage.go, line 186 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I was interpreting what you had said on the thread https://cockroachlabs.slack.com/archives/C02KHQMF2US/p1636710202051500?thread_ts=1636021850.043600&cid=C02KHQMF2US

We don't need to jump through hoops to keep this invariant true on disk. But we have to make sure that we don't "run" etcd/raft in a state where commit < applied, by forwarding it when necessary.

Are there cases where we don't sync?

We don't generally sync unless either entries are appended, or a vote and/or term change has to be recorded:

// MustSync returns true if the hard state and count of Raft entries indicate
// that a synchronous write to persistent storage is required.
func MustSync(st, prevst pb.HardState, entsnum int) bool {
	// Persistent state on all servers:
	// (Updated on stable storage before responding to RPCs)
	// currentTerm
	// votedFor
	// log entries[]
	return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

In particular, a change in the commit index alone would never be synced.

This is very useful -- I'll adjust this.


pkg/storage/replicas_storage.go, line 336 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I'd still like to know the full list of cases where we don't sync due to MustSync=false.

The logic that sets MustSync is here: https://github.com/etcd-io/etcd/blob/573e055cd3bab9ddfa1572c5a05755073cbe5ab5/raft/node.go#L592

So the only case where HardState changes but we don't require a sync is the case where only the commit index changes.

It is possible that both HardState.Commit and RaftAppliedIndex regress, which means if we did some application using side-loaded files (that has become durable)

If we did some application using side-loaded files, wouldn't the RaftAppliedIndex also become durable?

Are you concerned about the case where we don't flush the memtable on sst ingestion? If so, what did you make of my comment about always flushing memtables on sst ingestion in #72745?

The problem here is that we first sideload via ingestion in runPreApplyTriggersAfterStagingWriteBatch and then apply the batch that updates RaftAppliedIndex so we could crash after the first step.

@tbg and I discussed this and there are currently two options:

  • change to also ingesting the batch and do both in the same atomic call.
  • keep track of the durableCommitted value and ensure that durableCommitted is at or beyond the raft entry with sideloaded file that is being applied.

I'm leaning towards the latter since:

  • we are not adding another small file
  • this should not increase the total sync rate, given that we are already syncing when appending raft entries and running many raft groups, and syncs get batched inside Pebble
  • sync latency of ~2ms should be tiny compared to the cost of flushing the memtable before ingesting, so shouldn't significantly increase the application latency.

pkg/storage/replicas_storage.go, line 493 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I think the RHS intent gets resolved synchronously with the LHS intent when applying the merge trigger, see:

if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
// which locks are local (note that for a split, we want to use the
// pre-split one instead because it's larger).
desc = &mergeTrigger.LeftDesc
}

When the merge trigger applies, this means the merge committed, which means the RHS was frozen (not serving requests) for some time, so this is the time to clean everything up as nobody will observe anything about the RHS after.

It doesn't seem like QueueLastProcessedKey will ever be cleaned up. So we'll have it sit on the (now wider) LHS anchored at a key "in the middle" (the start key of the former LHS). Not clean, but benign. If we wanted to clear it, we'd have to do it as part of merge trigger evaluation. Honestly, I'd rather get rid of this key, it does't seem very useful.

cockroach/pkg/keys/keys.go

Lines 308 to 310 in 4df8ac2

func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeLastReplicaGCTimestampKey()
}
has a similar problem, I don't think we'll remove this key on the RHS if it exists there. This is an unreplicated key though, so we don't even have to be careful about cleaning it up through the replication layer, we can nuke it whenever we feel like it. This key too I think should not actually be used by CRDB.

Synchronous resolution for the RHS is going to be helpful.


pkg/storage/replicas_storage.go, line 49 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

A more pragmatic what?

More of an intermediate step towards the final interface. I've rewritten the comment.
Hopefully, we won't have to do this and small changes in existing code + ReplicasStorage.Init will be sufficient.


pkg/storage/replicas_storage.go, line 70 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Would it be worth asserting dynamically that SSTs or MutationBatches that are passed through this interface only touch one engine or the other? Maybe in a race-build-only implementation of this interface.

Good idea. I added this to the TODO list at the top.


pkg/storage/replicas_storage.go, line 103 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

replica

Done


pkg/storage/replicas_storage.go, line 178 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Yeah, that seems fine. I do wonder if we rely on that invariant at all though.

I've added a TODO for myself to fix D3. I'd prefer to keep this invariant since I am worried about changing too much. I'll also add a comment to examine the necessity of this invariant in the future.


pkg/storage/replicas_storage.go, line 228 at r2 (raw file):

When we implement this it probably makes sense to use a different key since the term is stable.

The Term is for the log entry corresponding to RaftAppliedIndex, so it seems preferable to keep it in RangeAppliedState, just to be sure that we don't accidentally introduce code that updates RaftAppliedIndex and not the Term or vice versa. Are you worried that we're bloating RangeAppliedState that is being written to on each application, and since the Term will not change often we could optimize?


pkg/storage/replicas_storage.go, line 269 at r2 (raw file):

It seems that there is some interaction between different replicas here (which may overlap after an ill-timed crash?)
Could you remind me how this can happen?

The overlap is only in the merge case where the state machine batch has resolved the provisional LHS RangeDescriptor, so now the LHS has become the merged range, but we've not yet removed the RHS RangeDescriptor. The current text is written to fix this up in Init. Though I think there is a different flaw there, that could be addressed by ordering D2 before D1 (as you suggested above for a different reason).

Since all range descriptors are on the same engine, and they are only ever mutated in tandem so that there is no overlap, I'd say it can't

The text here doesn't do the "in tandem" part, but it can be made to.

I remember we talked about overlap before, but I'm not sure in which context any more.

We also talked about overlap in the context of master, where my understanding is that the RHS RangeDescriptor stays provisional until async intent resolution. So there we do have overlapping committed RangeDescriptors, but we tolerate that because higher level code prevents the RHS RangeDescriptor from being used.


pkg/storage/replicas_storage.go, line 286 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

The right-hand side of a merge is immovable once in the critical phase (i.e. past the SubsumeRequest is handled), until the merge txn aborts (if it ever does). On the leaseholder handling Subsume, this is done by the Subsume. But we also prevent all future leaseholders from doing anything that would violate the critical phase by observing the deletion intent on the range descriptor. For the latter mechanism see:

if leaseChangingHands && iAmTheLeaseHolder {
// When taking over the lease, we need to check whether a merge is in
// progress, as only the old leaseholder would have been explicitly notified
// of the merge. If there is a merge in progress, maybeWatchForMerge will
// arrange to block all traffic to this replica unless the merge aborts.
if _, err := r.maybeWatchForMergeLocked(ctx); err != nil {

The bottom line is, if a merge commits, regardless of which replicas know about this yet, the LHS and RHS will be fully colocated.

I've recorded part of your description in the comment for MergeReplicas.


pkg/storage/replicas_storage.go, line 290 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

as noted elsewhere, there's MustSync which basically says don't sync unless one of these actually changed. (In particular, don't sync changes to only commit index).

Ack


pkg/storage/replicas_storage.go, line 300 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I'm not 100% sure about witness replicas because I'm not 100% sure of their definition. If witness replicas are just the raft log without the state machine, then the question becomes whether they need to be able to catch up other replicas. If so - they need to maintain an infinite log. I don't think anyone wants to do that in practice though, so that's probably not what a witness replica actually is. If we say it's a log-only replica that does occasionally truncate the log, then yes, this seems to match up. We should be able to implement a witness replica via this interface by ignoring everything related to state handling.

It seems from the comments here that we could extend this to handle witness replicas in the future.
Since this interface currently documents invariants relating to consistency of state machine + raft log, I'll ignore witness replicas for now.


pkg/storage/replicas_storage.go, line 315 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

ReplicaInfo?

Done


pkg/storage/replicas_storage.go, line 336 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Clarify whether this is appending the entries to the log (it is) or applying them to the state machine (it is not).

Done


pkg/storage/replicas_storage.go, line 392 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This sounds as though the method will check all these, but it won't, right? These are effectively requirements the caller has to check before calling the method.

I was planning to maintain a small amount of in-memory state per replica

  • committed span
  • log entries interval
    so as to check these. And yes, the caller should also be ensuring these are true.

pkg/storage/replicas_storage.go, line 404 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

What do you mean by that? Merging empty ranges away is one prominent use of range merges, but I don't think we can say it's the "usual" case.

Poor phrasing. I meant the list of subsumed ranges will usually be empty for this call. Fixed.


pkg/storage/replicas_storage.go, line 420 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

log=[10,11,12] applied=10 committed=12
index 12 is the merge, subsuming a neighboring range r2
snapshot at 13 comes in, we crash in a way that cleans up r2 at init
at restart, we delete r2, but now r1 is applying the merge and r2 is absent, so we explode

What am I missing?

You are right.
I think we need to do ingestion as the first step. That will remove the RangeDescriptors for the subsumed ranges. Their raft state still exists, so they are an uninitialized range.


pkg/storage/replicas_storage.go, line 424 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

"doing not doing"

Done


pkg/storage/replicas_storage.go, line 438 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Is this called only during sideloading? What is the caller that has an AppliedIndex change in here?

Yes, only sideloading. It is ok if the RaftAppliedIndex change is also in the sst (but IIUC that is not our current way of sideloading).


pkg/storage/replicas_storage.go, line 477 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Writes wouldn't be expected to get durable within any reasonable amount of time though, right? We're waiting for a memtable flush, which may not happen anytime soon on an otherwise idle system. Not a show-stopper, but we'd need to schedule a sync at some point.

This clarifies things. Even with ReplicasStorage.Init we can have a regression for R2's HardState.Commit so for now we'll need to continue to do this WriteSyncNoop.

@sumeerbhola sumeerbhola force-pushed the raft_sm branch 3 times, most recently from 25c5c4f to 015f0b9 Compare December 3, 2021 04:50
@sumeerbhola sumeerbhola marked this pull request as ready for review December 3, 2021 04:52
Copy link
Collaborator Author

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a complete pass and extensively changed the description, and removed the TODOs.

The only surprising change may be that the RangeTombstone has moved to the engine that contains the state machine (even though the RangeTombstone is unreplicated). I spent a lot of time agonizing over this and trying alternatives but they were all complicated.

The reason for this change is that both replica creation and deletion now make a durable atomic change to the state machine as their first step, and they can roll forward from there in case of a crash before the rest of the steps are executed. The RangeTombstone is placed in this first step when it is a delete (due to merge or rebalance). For a merge, we could probably roll forward even if the RangeTombstone was not written in this first step, since it writes a constant value of mergeTombstoneReplicaID. But we'd need to know that this replica was removed due to a merge and not due to a rebalance. And for a removal due to rebalance, we don't always have the state to compute the nextReplicaID (if the removal was due to ReplicaGCQueue or some means other than applying a raft entry). Even if the removal was due to applying a raft entry with ChangeReplicas that removed this store, once we've removed the state machine (and crashed before writing the RangeTombstone), we no longer know which raft log entry had the ChangeReplicas.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)

@sumeerbhola
Copy link
Collaborator Author

Also updated the text to clear out the span not covered by the latest RangeDescriptor of a snapshot of an already initialized range, to handle the case when the LHS is moving past a split using a snapshot (as discussed in-person following the discussion on https://cockroachlabs.slack.com/archives/C02KHQMF2US/p1638412757069600

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always more to do with a doc like this, but I think this looks good. We can sort out the remaining questions in the impl phase.

Reviewed 1 of 1 files at r5, all commit messages.
Dismissed @nvanbenschoten and @sumeerbhola from 2 discussions.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten, @sumeerbhola, and @tbg)


-- commits, line 17 at r5:
The tombstone key has moved.


pkg/storage/replicas_storage.go, line 165 at r1 (raw file):

One question: say we remove the RangeDescriptor in D2, and then crash. On crash recovery we have an uninitialized replica. Will the replicaGCQueue eventually remove it?

No, see #73424, but this is a general problem so we'll address it in general and until we do it won't be an issue if an ill-timed crash occasionally leaks an uninited replica.


pkg/storage/replicas_storage.go, line 404 at r1 (raw file):

Previously, sumeerbhola wrote…

I added some questions here

	//   - Applying committed entries from raft logs to the state machines when
	//     the state machine is behind (and possibly regressed from before the
	//     crash) because of not syncing. There is a potential subtlety here for
	//     merges of R1 and R2: it cannot apply the merge in R1's raft log until
	//     R2's state machine is up-to-date, so this merge application in R1
	//     would need to pause and find R2 and bring it up to date, and only
	//     then continue (if possible). Since AdminMerge ensures *all* R2
	//     replicas are up-to-date before it creates the log entry in R1 that
	//     will do the merge (by calling waitForApplication), one can be sure
	//     that if this store can apply the merge, the R2 replica here has all
	//     the log entries in order to become up-to-date. So after it has
	//     applied R2 up to the committed index it can resume applying R1.
	//     TODO*:
	//     - confirm my understanding of this merge invariant.
	//     - waitForApplication cannot prevent regression of R2's state machine
	//       in a node that crashes and recovers. Since we currently don't have
	//       an Init() functionality, what if the raft group for R1 gets
	//       reactivated and starts applying the committed entries and applies
	//       the merge with regressed R2 state? Is changeRemovesReplica=true
	//       (which forces a sync on state machine application) for the first
	//       batch in a merge transaction?

This is probably outdated, right? Since we have resolved to not needing to apply splits/merges in Init. I think the invariant above holds though. Definitely fairly subtle, would be nice to avoid relying on it now.


pkg/storage/replicas_storage.go, line 228 at r2 (raw file):

Previously, sumeerbhola wrote…

When we implement this it probably makes sense to use a different key since the term is stable.

The Term is for the log entry corresponding to RaftAppliedIndex, so it seems preferable to keep it in RangeAppliedState, just to be sure that we don't accidentally introduce code that updates RaftAppliedIndex and not the Term or vice versa. Are you worried that we're bloating RangeAppliedState that is being written to on each application, and since the Term will not change often we could optimize?

Yeah, that was my thinking. I don't think there's much of a danger that we'd skew the writes to both keys, since there is only one way to update the RangeAppliedState (using the stateloader) and so if

func (rsl StateLoader) SetRangeAppliedState(
ctx context.Context,
readWriter storage.ReadWriter,
appliedIndex, leaseAppliedIndex uint64,
newMS *enginepb.MVCCStats,
raftClosedTimestamp *hlc.Timestamp,
) error {
was put in charge of peeling the state into two writes it would not be likely to cause any bugs.

That said, can discuss when the PR is out. I expect Nathan to have a good opinion on whether it's worth it.


pkg/storage/replicas_storage.go, line 269 at r2 (raw file):

The overlap is only in the merge case where the state machine batch has resolved the provisional LHS RangeDescriptor, so now the LHS has become the merged range, but we've not yet removed the RHS RangeDescriptor. The current text is written to fix this up in Init. Though I think there is a different flaw there, that could be addressed by ordering D2 before D1 (as you suggested above for a different reason).

This is outdated, right? The batch committing the merge will atomically delete the RHS descriptor and widen the LHS descriptor (plus write tombstone). I think we'll never see overlap and this should become one of the invariants.


pkg/storage/replicas_storage.go, line 438 at r2 (raw file):

Previously, sumeerbhola wrote…

Yes, only sideloading. It is ok if the RaftAppliedIndex change is also in the sst (but IIUC that is not our current way of sideloading).

Consider tightening this to the actual use case then, if we're not going to call it in that way let's write that down to not leave readers guessing how we might actually be using it.


pkg/storage/replicas_storage.go, line 87 at r5 (raw file):

// replica lifecycle. Since it is unreplicated, it is not part of the state
// machine. However, placing it in the category of "raft state" with the other
// unreplicated keys turns out to be complicated:

Just to make sure I understand correctly, is the main complication that arises when keeping the tombstone in the raft engine that we would be worried about cases in which the state removal becomes durable but the tombstone doesn't, and for some reason we don't want to sync the tombstone? Because naively you could

  • write durable tombstone (local engine)
  • perform deletion (non-durable on state engine)

On crash recovery, delete any replica state for which a tombstone is present in Init. This would work for b). I can see how a) would still be problematic because in crash recovery, we'll have the unmerged LHS and RHS present, and we cannot remove the RHS data yet. So when we see the tombstone for the RHS we need to realize there is a pending merge on the RHS and not do recovery. This convinces me that it's better to have the tombstone on the state engine.

Whatever the answer is, could you explain it in the text?


pkg/storage/replicas_storage.go, line 110 at r5 (raw file):

// changes to get to a consistent state.
// - RangeAppliedStateKey: needs to read this in order to truncate the log,
//   both as part of regular log truncation and on crash recovery.

Can you give more details about these situations or give the reader a pointer to the section that discusses this.


pkg/storage/replicas_storage.go, line 112 at r5 (raw file):

//   both as part of regular log truncation and on crash recovery.
// - RangeDescriptorKey: needs to read this to discover the spans of
//   initialized replicas.

Give an idea of why it needs those or a forward-reference deeper into the doc.


pkg/storage/replicas_storage.go, line 122 at r5 (raw file):

due to rebalancing

More generally,

when replicas are moved or merged away.


pkg/storage/replicas_storage.go, line 130 at r5 (raw file):

must happen

Not really, right? Certainly writes would have to. But reads?


pkg/storage/replicas_storage.go, line 133 at r5 (raw file):
Maybe with less potential for confusion around what "normal" operation means:

ReplicasStorage does not participate in applying committed entries in the running system (other than allowing a caller to load entries from the log), but it may occasionally need to apply "simple" entries directly to the state machine during Init to uphold atomicity of SST ingestion operations, see TODO.

(was there any other reason? I don't think we need to do anything for conf changes)


pkg/storage/replicas_storage.go, line 137 at r5 (raw file):

// ============================================================================
// Invariants:
// Replicas are in one of 4 states:

The descriptions are a little fuzzy. I think we want to either some of the invariants to subsystems (for example, saying "there is an internally consistent state for etcd/raft" instead of spelling out all of the conditions that this entails) or be exhaustive, right now we're neither. I prefer the former to keep it brief while at the same time making clear what we need. Here's an attempt:


INVARIANT (RaftConsistency): when there is any data on the state machine associated to a given RangeID with the exception of the RangeTombstone key, there is a corresponding internally consistent (according to etcd/raft) RaftState. After Init() has been called, this Raft state will additionally be consistent with the AppliedState on the state machine (i.e. the latter references a valid log position).
COROLLARY (ReplicaExistence): we can (and do) define that a Replica exists if and only if a Raft HardState exists.
Definition (DeletedReplica): it can be convenient to reference Replicas that once existed but no longer do, as evidenced by the presence of a RangeTombstone for a RangeID for which (RaftConsistency) does not hold, i.e. the RangeTombstone is the only reference to the RangeID on both the raft and state machine engines.
INVARIANT (StateConsistency): after Init() has been called, if there is any data associated to a given RangeID on the state engine with the exception of the RangeTombstone key, it will reflect the replicated state at the corresponding AppliedIndex (i.e. materializes the replicated log at this index).
INVARIANT (AppliedStateNontriviality): the AppliedIndex stored for any Replica on the state machine engine is never zero.
DEFINITION: we say that a Replica has AppliedIndex zero if the Replica exists (i.e. HardState exists) but no corresponding AppliedState is stored on the state machine engine.

INVARIANT (RangeDescriptor): a nonzero AppliedState always refers to a log position that materializes a non-provisional RangeDescriptor.
A Range is first created with a RangeDescriptor present (external invariant) and neither the callers nor ReplicasStorage will never selectively delete it.
NOTE: subsumption as part of a range merge does delete the RangeDescriptor, but the Replica atomically ceases to exist in the process.

DEFINITION (UninitializedReplica): a Replica at AppliedIndex zero (or equivalently: no AppliedIndex), i.e. there is Raft state but at most a RangeTombstone on the state machine engine. In particular, there is no RangeDescriptor and so an UninitializedReplica has no key span associated to it yet.

INVARIANT: An UninitializedReplica has a zero HardState.Commit and no log entries.
This is upheld externally by a combination of mostly external invariants:

  • A new Range is initialized with all Replicas at TruncatedIndex 10 (so they are not uninitialized), and any future Replicas will be initialized via a snapshot reflecting a nonzero AppliedIndex. In particular, prior to receiving the snapshot, no log entries can be sent to the Replica.
  • etcd/raft only communicates Commit entries for which the recipient has the log entry.
    NOTE: we can make further statements such as there is no explicit TruncatedState, etc, but we omit them as they play no role for ReplicasStorage.

DEFINITION (InitializedReplica): a Replica at AppliedIndex larger than zero (or due to (AppliedStateNontriviality), equivalently: AppliedIndex exists).
Due to (StateConsistency) and (RangeDescriptor), this Replica has a key span associated with it. Post Init(), (RaftConsistency) and (StateConsistency) hold; in particular AppliedIndex is a valid raft log position.

In addition to the above states in which all invariants hold, there are also internal states which may violate certain invariants. These are not exposed to the user of the interface as they are rectified in Init(). They are:

DEFINITION (RecoveryDeletingReplica): a Replica whose Raft state requires a nonzero AppliedIndex, but for which the only state machine key is a RangeTombstone. This occurs when crash recovering from a replica removal operation. Init() turns such a Replica into a (DeletedReplica).

DEFINITION (RecoveryIngestingReplica): an (InitializedReplica) which violates (StateConsistency) by having made visible an SSTable ingestion that is not reflected in the AppliedIndex. Init() moves such a Replica back to (InitializedReplica) by re-applying log entries to re-establish (StateConsistency).

TODO: there might be more internal states

TODO: section for internal invariants (keep them separate from "external" invariants as well)


pkg/storage/replicas_storage.go, line 140 at r5 (raw file):
This is not correct, an uninitialized replica can vote and so it can have a term and vote. The fact that Commit is zero is maintained externally as well, it is a consequence of the fact that we force a snapshot to move away from log index zero, and that raft won't communicate a commit index that the follower does not have in the log. It would be helpful to document this here as well.

Also, unrelated suggestion for clarity (as is, we're vague about what replica state could exist):

There is no data (and in particular, no RangeDescriptor) for the Replica on the state machine with the possible exception of a RangeTombstoneKey.


pkg/storage/replicas_storage.go, line 183 at r5 (raw file):
"applying" or the statement is ambiguous (is the descriptor "created" when the intent is laid down, or when the txn commits, or when the range learns about this, or when the update becomes visible? It's the latter)

, i.e. must sync application of split trigger


pkg/storage/replicas_storage.go, line 185 at r5 (raw file):
ditto, and:

, i.e. must sync application of merge trigger


pkg/storage/replicas_storage.go, line 192 at r5 (raw file):

//   https://github.com/cockroachdb/cockroach/pull/72745#pullrequestreview-807989047
//
// Some of the above invariants may be violated when non-durable state is

Can you list this specifically? I made an attempt at that in my reformulation of the definitions/invariants above but I'm sure it's not complete.


pkg/storage/replicas_storage.go, line 234 at r5 (raw file):

//
// Every step above needs to be atomic. Note that we are doing 2 syncs, in
// steps C1 and C2, for the split case, where we currently do 1 sync -- splits

it would be helpful to name C1-C3 and similarly for D*. It will be a bit more to type but it's much easier for the readers to remember what's what if we pick good names.


pkg/storage/replicas_storage.go, line 275 at r5 (raw file):

// unavailable replica, since it no longer has [21, 25).
//
// Rolling forward if crash after C2 and before C3:

This looks like it should become one of the (RecoveryX) states in my version of the definitions. Probably RecoveryCreating


pkg/storage/replicas_storage.go, line 307 at r5 (raw file):

//   is the RHS of a split where the split has not yet happened, but we've
//   created an uninitialized RHS. So we don't want to delete the state
//   machine state for the RHS since it doesn't own that state yet. By

The way to think about this is that the state for the RHS doesn't exist. The state exists once there's a committed range descriptor indicating the span. There isn't yet. So I would not talk about it in that way. The tombstone is however necessary, as you point out, to prevent the split from recreating the replica later. But this is no different from how the tombstone is used in general with incoming snapshots or raft messages; if there's a tombstone we can't recreate the replica (at that replicaID). So there is nothing special about the splits here, except of course there is in that they need to then throw away data in a more complicated way than just refusing a snapshot does it.


pkg/storage/replicas_storage.go, line 311 at r5 (raw file):

//   - This step will only delete the RangeID local keys, when this replica
//     deletion is due to a merge (the range itself is being deleted).
// - [D2] deletion of RaftHardStateKey, RaftTruncatedStateKey, log entries,

It would be helpful to say something like "deletion of all raft state for this RangeID" to make clear that nothing will be left over on the raft state engine.


pkg/storage/replicas_storage.go, line 327 at r5 (raw file):

// A crash after D1 will result in a replica either in state
// DeletedStateMachine or UninitializedStateMachine. For the latter, some code
// above ReplicasStorage will eventually ask for the replica to be cleaned up.

Can reference #73424 here.


pkg/storage/replicas_storage.go, line 350 at r5 (raw file):

//   state machine using ApplyCommittedUsingIngest and ApplyCommittedBatch.
//   This application excludes entries that are performing splits or merges --
//   those happen via SplitReplica and MergeReplicas. The callee decides when

Can you say more here? This reads like you're saying that splits and merges will write directly to the engines (bypassing ReplicasStorage) but I don't think that's right.


pkg/storage/replicas_storage.go, line 350 at r5 (raw file):

//   state machine using ApplyCommittedUsingIngest and ApplyCommittedBatch.
//   This application excludes entries that are performing splits or merges --
//   those happen via SplitReplica and MergeReplicas. The callee decides when

the callee being ReplicasStorage? Might be better to refer to it by that name.


pkg/storage/replicas_storage.go, line 360 at r5 (raw file):

//   usually does not update the RaftAppliedIndex and then ApplyCommittedBatch
//   which updates the RaftAppliedIndex. A crash after the first and before
//   the second leaves the state machine in an inconsistent state which needs

reference this state by name once we've named it.


pkg/storage/replicas_storage.go, line 363 at r5 (raw file):

//   to be fixed by ReplicasStorage.Init. For this reason, ReplicasStorage
//   keeps track of the highest HardState.Commit known to be durable, and
//   requires ApplyCommittedUsingIngest to provide the highestRaftIndex of the

I think we need more (or need to write it up better). We can't offline-apply arbitrary log entries, so if we need to reapply log entries after a crash none of them can be, say, splits or merges. So an ingestion at index 100 must a) make sure applied index 99 is durable b) ensure committed index 100 is durable c) do the ingestion.
And in general, in Init, we need to make sure we don't crash when we're asked to apply a command we're not ready to handle. If we see this, if all invariants hold, it means there isn't an SST further ahead in the log that we need to catch up on, so we can just stop applying right there. So the start-up algorithm would be "apply as many log entries as you can and stop when you see one that you can't handle". Is that what you were thinking?


pkg/storage/replicas_storage.go, line 377 at r5 (raw file):

// - Range merges impose an additional requirement: the merge protocol (at a
//   higher layer) needs the RHS replica of a merge to have applied all raft
//   entries and that this application is durable. To ensure the durability we

up to a specified index


pkg/storage/replicas_storage.go, line 397 at r5 (raw file):

//     we can additionally iterate over all the RangeAppliedStateKeys, which are
//     RangeID local keys -- this iteration can be accomplished by seeking using
//     current RangeID+1. If we find RangeAppliedStateKeys whole RangeID is not

Good idea! You might also like cockroach debug check-store which does some of this stuff. It hasn't ~ever been used much so it probably isn't a good authority but it may give more assertion code ideas we can lift once you start coding.


pkg/storage/replicas_storage.go, line 403 at r5 (raw file):

// - Remove DeletedStateMachine replicas by transitioning them to DeletedReplica
//   by executing D2.
// - The set R_s - R_r should be empty.

This should be true before the cleamnup step already, right? Better to move it up then since the cleanup step changes the sets. (Which I would also point out, transitioning to DeletedReplica effectively removes DeletedStateMachine replicas from R_r)


pkg/storage/replicas_storage.go, line 416 at r5 (raw file):

//   operation.
// - InitializedStateMachine replicas:
//   - using the latest non-provisional RangeDescriptors, check that the spans

FWIW Store.Start does this I think, so it's not something that we've seen violated in practice (except on some customer deployments when they had pretty bad replica inconsistencies). Not saying we shouldn't check it here, just that it's one of the invariants that holds.


pkg/storage/replicas_storage.go, line 420 at r5 (raw file):

//   - for ranges whose RangeAppliedState.RaftAppliedIndex < HardState.Commit,
//     apply log entries, including those that remove this replica, until one
//     encounters a log entry that is performing a split or merge.

👍 looks like my understanding in a comment higher up was correct then.


pkg/storage/replicas_storage.go, line 429 at r5 (raw file):

	UninitializedStateMachine ReplicaState = 0
	InitializedStateMachine
	DeletedReplica

In the final impl, is the plan that this code is in a subpackage and unexported so that the world will never see it? If so, 👍 if not let's discuss. I think a clear line between external states/invariants and internal ones is super helpful.


pkg/storage/replicas_storage.go, line 432 at r5 (raw file):

)

type RangeAndReplica struct {

Might just be me but this name confuses me a bit. How about FullReplicaID because that is what this is - it's a fully qualified replicaID which uniquely identifies a replica.


pkg/storage/replicas_storage.go, line 437 at r5 (raw file):

}

type ReplicaInfo struct {

when this is in a subpkg, could also consider calling this Replica.


pkg/storage/replicas_storage.go, line 459 at r5 (raw file):

	MutationBatch
	// [Lo, Hi) represents the raft log entries, if any in the MutationBatch.
	// This is appending/overwriting entries in the raft log.

Do you want to point out the behavior that if the log is [a, b, c, d] and one appends e at index 2, the result will be [a, e]? I.e. there are semantics that are upheld by the caller here. Probably best to say that the caller is expected to follow Raft semantics here and that ReplicasStorage won't try to check or even understand the details more than required by it.


pkg/storage/replicas_storage.go, line 498 at r5 (raw file):

	// TODO(sumeer):
	// - add raft log read methods.
	// - what raft log stats do we need to maintain and expose (raftLogSize?)?

Yeah, we need to maintain the raft log size (as nobody is really planning on overhauling this in time). This will primarily affect the truncations (the appends can be tracked outside). Perhaps for now the storage "just" gets a callback that it invokes when it actually truncates entries, I think that would be the main change from the status quo.


pkg/storage/replicas_storage.go, line 505 at r5 (raw file):

	// - The latest non-provisional RangeDescriptor in the snapshot describes
	//   the range as equal to span.
	// - The snapshot corresponds to (raftAppliedIndex,raftAppliedIndexTerm).

what do you mean here and what are these variables? The snapshot contains an index and this references the applied index it represents, that's it. The index and term in the HardState should remain unchanged (if they don't today, I think that's ill-advised). The snapshot also contains the log index and log term for AppliedIndex which we need to write the truncated state.


pkg/storage/replicas_storage.go, line 510 at r5 (raw file):

	//   outside span (after accounting for range local keys) and RangeID keys.
	// - sstPaths include a RANGEDEL that will clear all the existing state
	//   machine state in the store "before" adding the snapshot state.

Might want to point out explicitly that if the existing range is wider than the original, we'll clear the wider span. That is, we'll always replace all of the state machine state.


pkg/storage/replicas_storage.go, line 516 at r5 (raw file):

	//
	// Snapshot ingestion will not be accepted if:
	// - span overlaps with the latest non-provisional span of another range,

It would be easier to define "span of a replica" as "span of most recent visible version of range descriptor at the current applied state" to avoid having to define this ad-hoc multiple times.


pkg/storage/replicas_storage.go, line 522 at r5 (raw file):

	//   MergeRange, so ReplicasStorage can keep track of all spans without
	//   resorting to repeated reading from the engine.
	// - the raft log already has entries beyond the snapshot.

This isn't valid, as it should be precluded by invariants. Make clear which of these cases are expected refusals vs invariant violations. I think both here are invariant violations.


pkg/storage/replicas_storage.go, line 557 at r5 (raw file):

	// changes are included in the sstPaths. It is possible that the ssts may
	// not contain an update to RangeAppliedState, in which case this call
	// should be immediately followed by a call to ApplyCommittedBatch that does

must?

@tbg tbg self-requested a review December 7, 2021 11:58
Copy link
Collaborator Author

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the comments! I've made a full pass.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten and @tbg)


-- commits, line 48 at r1:

Previously, tbg (Tobias Grieger) wrote…

by a replica?

Done


-- commits, line 17 at r5:

Previously, tbg (Tobias Grieger) wrote…

The tombstone key has moved.

Done


pkg/storage/replicas_storage.go, line 59 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Ah here we go, 👍

Ack


pkg/storage/replicas_storage.go, line 165 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

One question: say we remove the RangeDescriptor in D2, and then crash. On crash recovery we have an uninitialized replica. Will the replicaGCQueue eventually remove it?

No, see #73424, but this is a general problem so we'll address it in general and until we do it won't be an issue if an ill-timed crash occasionally leaks an uninited replica.

Ack


pkg/storage/replicas_storage.go, line 404 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This is probably outdated, right? Since we have resolved to not needing to apply splits/merges in Init. I think the invariant above holds though. Definitely fairly subtle, would be nice to avoid relying on it now.

Yes, outdated.
Some of this was mistaken because the caller will be syncing the state machine to ensure R2 does not regress.
And yes we don't apply merges in Init.


pkg/storage/replicas_storage.go, line 228 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Yeah, that was my thinking. I don't think there's much of a danger that we'd skew the writes to both keys, since there is only one way to update the RangeAppliedState (using the stateloader) and so if

func (rsl StateLoader) SetRangeAppliedState(
ctx context.Context,
readWriter storage.ReadWriter,
appliedIndex, leaseAppliedIndex uint64,
newMS *enginepb.MVCCStats,
raftClosedTimestamp *hlc.Timestamp,
) error {
was put in charge of peeling the state into two writes it would not be likely to cause any bugs.

That said, can discuss when the PR is out. I expect Nathan to have a good opinion on whether it's worth it.

Ack


pkg/storage/replicas_storage.go, line 269 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

The overlap is only in the merge case where the state machine batch has resolved the provisional LHS RangeDescriptor, so now the LHS has become the merged range, but we've not yet removed the RHS RangeDescriptor. The current text is written to fix this up in Init. Though I think there is a different flaw there, that could be addressed by ordering D2 before D1 (as you suggested above for a different reason).

This is outdated, right? The batch committing the merge will atomically delete the RHS descriptor and widen the LHS descriptor (plus write tombstone). I think we'll never see overlap and this should become one of the invariants.

Yes, this discussion is outdated. We will never see overlap, even in the various Recovery* states. The invariant is named InterReplicaStateConsistency and we don't list it among the invariants that are violated by the Recovery* states.


pkg/storage/replicas_storage.go, line 438 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Consider tightening this to the actual use case then, if we're not going to call it in that way let's write that down to not leave readers guessing how we might actually be using it.

Done


pkg/storage/replicas_storage.go, line 467 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

ah, this is the interdependency stuff that I remember us talking about. So it wasn't about deleting replicas directly.

Resolved


pkg/storage/replicas_storage.go, line 87 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Just to make sure I understand correctly, is the main complication that arises when keeping the tombstone in the raft engine that we would be worried about cases in which the state removal becomes durable but the tombstone doesn't, and for some reason we don't want to sync the tombstone? Because naively you could

  • write durable tombstone (local engine)
  • perform deletion (non-durable on state engine)

On crash recovery, delete any replica state for which a tombstone is present in Init. This would work for b). I can see how a) would still be problematic because in crash recovery, we'll have the unmerged LHS and RHS present, and we cannot remove the RHS data yet. So when we see the tombstone for the RHS we need to realize there is a pending merge on the RHS and not do recovery. This convinces me that it's better to have the tombstone on the state engine.

Whatever the answer is, could you explain it in the text?

Yes we'd need to do 3 writes, local/raft engine, then state machine engine, and then raft engine and both of the first two writes would need to sync.
It would work for (b) with the extra sync.
You are spot on regarding why this gets annoying for merge. And if the merge was via the state machine, maybe we can apply it in Init. But if it was a snapshot we don't have the snapshot anymore so can't roll forward, but we've put the tombstone for the RHS so need to roll it back. We have the replicaID of the RHS so we can just use replicaID-1. So doable but annoying, and the recovery code path for the merge looks different from the other one, so we need to know whether we are in case (a) or (b).


pkg/storage/replicas_storage.go, line 110 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Can you give more details about these situations or give the reader a pointer to the section that discusses this.

added a pointer


pkg/storage/replicas_storage.go, line 112 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Give an idea of why it needs those or a forward-reference deeper into the doc.

added pointers to sections


pkg/storage/replicas_storage.go, line 122 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

due to rebalancing

More generally,

when replicas are moved or merged away.

Done


pkg/storage/replicas_storage.go, line 130 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

must happen

Not really, right? Certainly writes would have to. But reads?

I am inclined towards reads too, since ReplicasStorage has knowledge of what it has truncated. The interface seems narrow enough.


pkg/storage/replicas_storage.go, line 133 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Maybe with less potential for confusion around what "normal" operation means:

ReplicasStorage does not participate in applying committed entries in the running system (other than allowing a caller to load entries from the log), but it may occasionally need to apply "simple" entries directly to the state machine during Init to uphold atomicity of SST ingestion operations, see TODO.

(was there any other reason? I don't think we need to do anything for conf changes)

Updated.

We need to apply conf changes too, because they may precede one of these sst ingests.

i am rereading https://cockroachlabs.slack.com/archives/C02KHQMF2US/p1635523384013200 where we discussed configuration changes, and rereading etcd-io/etcd#7625 (comment). The argument I had made in the slack thread was that Init would roll forward to HardState.Commit so there was no chance that the state machine would regress. But at that time I did not realize that HardState.Commit updates are not always synced, so HardState.Commit itself could regress. You made the following point on the issue:

(Append) is the interesting one. On the leader, it is straightforward since the
commit index is available and is guaranteed above that of the previous config
change due to (Apply); we just need to make sure that when we append the new
entry, we also persist the known commit index durably.

ReplicasStorage is not changing anything regarding the Commit index corresponding to B being durable when C is appended (it's the caller's responsibility). So if we do roll forward all the way to the durable Committed (except for splits and merges which stall us, which is fine since we know we could not have applied them prior to the crash), then the regression is bounded.
What do you think about shoring up that correctness argument by making the committed index durable for a conf change before applying it? We could introduce a ApplyConfChange(MutationBatch, highestRaftIndex uint64) method, like we have for ingestion, and first sync the Commit state if needed. That way we will not have any conf change regression.


pkg/storage/replicas_storage.go, line 137 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

The descriptions are a little fuzzy. I think we want to either some of the invariants to subsystems (for example, saying "there is an internally consistent state for etcd/raft" instead of spelling out all of the conditions that this entails) or be exhaustive, right now we're neither. I prefer the former to keep it brief while at the same time making clear what we need. Here's an attempt:


INVARIANT (RaftConsistency): when there is any data on the state machine associated to a given RangeID with the exception of the RangeTombstone key, there is a corresponding internally consistent (according to etcd/raft) RaftState. After Init() has been called, this Raft state will additionally be consistent with the AppliedState on the state machine (i.e. the latter references a valid log position).
COROLLARY (ReplicaExistence): we can (and do) define that a Replica exists if and only if a Raft HardState exists.
Definition (DeletedReplica): it can be convenient to reference Replicas that once existed but no longer do, as evidenced by the presence of a RangeTombstone for a RangeID for which (RaftConsistency) does not hold, i.e. the RangeTombstone is the only reference to the RangeID on both the raft and state machine engines.
INVARIANT (StateConsistency): after Init() has been called, if there is any data associated to a given RangeID on the state engine with the exception of the RangeTombstone key, it will reflect the replicated state at the corresponding AppliedIndex (i.e. materializes the replicated log at this index).
INVARIANT (AppliedStateNontriviality): the AppliedIndex stored for any Replica on the state machine engine is never zero.
DEFINITION: we say that a Replica has AppliedIndex zero if the Replica exists (i.e. HardState exists) but no corresponding AppliedState is stored on the state machine engine.

INVARIANT (RangeDescriptor): a nonzero AppliedState always refers to a log position that materializes a non-provisional RangeDescriptor.
A Range is first created with a RangeDescriptor present (external invariant) and neither the callers nor ReplicasStorage will never selectively delete it.
NOTE: subsumption as part of a range merge does delete the RangeDescriptor, but the Replica atomically ceases to exist in the process.

DEFINITION (UninitializedReplica): a Replica at AppliedIndex zero (or equivalently: no AppliedIndex), i.e. there is Raft state but at most a RangeTombstone on the state machine engine. In particular, there is no RangeDescriptor and so an UninitializedReplica has no key span associated to it yet.

INVARIANT: An UninitializedReplica has a zero HardState.Commit and no log entries.
This is upheld externally by a combination of mostly external invariants:

  • A new Range is initialized with all Replicas at TruncatedIndex 10 (so they are not uninitialized), and any future Replicas will be initialized via a snapshot reflecting a nonzero AppliedIndex. In particular, prior to receiving the snapshot, no log entries can be sent to the Replica.
  • etcd/raft only communicates Commit entries for which the recipient has the log entry.
    NOTE: we can make further statements such as there is no explicit TruncatedState, etc, but we omit them as they play no role for ReplicasStorage.

DEFINITION (InitializedReplica): a Replica at AppliedIndex larger than zero (or due to (AppliedStateNontriviality), equivalently: AppliedIndex exists).
Due to (StateConsistency) and (RangeDescriptor), this Replica has a key span associated with it. Post Init(), (RaftConsistency) and (StateConsistency) hold; in particular AppliedIndex is a valid raft log position.

In addition to the above states in which all invariants hold, there are also internal states which may violate certain invariants. These are not exposed to the user of the interface as they are rectified in Init(). They are:

DEFINITION (RecoveryDeletingReplica): a Replica whose Raft state requires a nonzero AppliedIndex, but for which the only state machine key is a RangeTombstone. This occurs when crash recovering from a replica removal operation. Init() turns such a Replica into a (DeletedReplica).

DEFINITION (RecoveryIngestingReplica): an (InitializedReplica) which violates (StateConsistency) by having made visible an SSTable ingestion that is not reflected in the AppliedIndex. Init() moves such a Replica back to (InitializedReplica) by re-applying log entries to re-establish (StateConsistency).

TODO: there might be more internal states

TODO: section for internal invariants (keep them separate from "external" invariants as well)

Thanks for this suggestion. I have borrowed some of the organization, and names from your comment and the text, and merged in the specifics of the invariants that I had previously. I want to preserve the specifics of the invariant, i.e., an attempt to be exhaustive, even though it may miss something (we can fix that iteratively), since I find it very hard to remember these without them being explicitly written down.


pkg/storage/replicas_storage.go, line 140 at r5 (raw file):

This is not correct, an uninitialized replica can vote and so it can have a term and vote.

Ah yes, that was silly. I've fixed the text and added comments that you suggested.
Technically, RangeTombstoneKey is not state machine state or raft state, so I haven't used the "possible exception" phrasing.


pkg/storage/replicas_storage.go, line 183 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

"applying" or the statement is ambiguous (is the descriptor "created" when the intent is laid down, or when the txn commits, or when the range learns about this, or when the update becomes visible? It's the latter)

, i.e. must sync application of split trigger

Done


pkg/storage/replicas_storage.go, line 185 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

ditto, and:

, i.e. must sync application of merge trigger

Done


pkg/storage/replicas_storage.go, line 192 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Can you list this specifically? I made an attempt at that in my reformulation of the definitions/invariants above but I'm sure it's not complete.

Done (this statement now comes before the list of transient internal states, where the list of invariants that are violated are listed).


pkg/storage/replicas_storage.go, line 234 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

it would be helpful to name C1-C3 and similarly for D*. It will be a bit more to type but it's much easier for the readers to remember what's what if we pick good names.

Good point. I've added a TODO at the top, and will do it in a later pass after this PR is merged.


pkg/storage/replicas_storage.go, line 275 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This looks like it should become one of the (RecoveryX) states in my version of the definitions. Probably RecoveryCreating

I've rolled this into RecoveryInconsistentReplica, since something like this can also happen because HardState.Commit regressed and RaftAppliedIndex did not, so it isn't necessarily "creation" (the "Crash Recovery" section talks about how we do similar things for both). One could separate the RecoveryRaftAndStateInconsistent and RecoveryStateInconsistent states. I've added a TODO at the top to consider that, in case it helps with sketching proofs.


pkg/storage/replicas_storage.go, line 307 at r5 (raw file):
Tweaked the text.

So there is nothing special about the splits here, except of course there is in that they need to then throw away data in a more complicated way

This part is unfortunately not very cleanly specified. The split A11 and A12 cases are doing state machine deletion for replicas that are currently in state DeletedStateMachine or UninitializedStateMachine, while the text here says we only do it for replica in InitializedStateMachine state. I'll see if the description can be more unified in the future.


pkg/storage/replicas_storage.go, line 311 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

It would be helpful to say something like "deletion of all raft state for this RangeID" to make clear that nothing will be left over on the raft state engine.

Done


pkg/storage/replicas_storage.go, line 327 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Can reference #73424 here.

Done


pkg/storage/replicas_storage.go, line 350 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Can you say more here? This reads like you're saying that splits and merges will write directly to the engines (bypassing ReplicasStorage) but I don't think that's right.

Done


pkg/storage/replicas_storage.go, line 350 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

the callee being ReplicasStorage? Might be better to refer to it by that name.

Done


pkg/storage/replicas_storage.go, line 360 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

reference this state by name once we've named it.

Done


pkg/storage/replicas_storage.go, line 363 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I think we need more (or need to write it up better). We can't offline-apply arbitrary log entries, so if we need to reapply log entries after a crash none of them can be, say, splits or merges. So an ingestion at index 100 must a) make sure applied index 99 is durable b) ensure committed index 100 is durable c) do the ingestion.
And in general, in Init, we need to make sure we don't crash when we're asked to apply a command we're not ready to handle. If we see this, if all invariants hold, it means there isn't an SST further ahead in the log that we need to catch up on, so we can just stop applying right there. So the start-up algorithm would be "apply as many log entries as you can and stop when you see one that you can't handle". Is that what you were thinking?

Yes, this is consistent with what I was thinking. I've added some text here to highlight that splits/merges are durable and those are the only log entries that ReplicasStorage.Init does not apply.


pkg/storage/replicas_storage.go, line 377 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

up to a specified index

Done


pkg/storage/replicas_storage.go, line 397 at r5 (raw file):

cockroach debug check-store

Added a TODO at the top to look at it.


pkg/storage/replicas_storage.go, line 403 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This should be true before the cleamnup step already, right? Better to move it up then since the cleanup step changes the sets. (Which I would also point out, transitioning to DeletedReplica effectively removes DeletedStateMachine replicas from R_r)

Yep. Moved it up.


pkg/storage/replicas_storage.go, line 416 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

FWIW Store.Start does this I think, so it's not something that we've seen violated in practice (except on some customer deployments when they had pretty bad replica inconsistencies). Not saying we shouldn't check it here, just that it's one of the invariants that holds.

Good to know


pkg/storage/replicas_storage.go, line 420 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

👍 looks like my understanding in a comment higher up was correct then.

Yep, correct.


pkg/storage/replicas_storage.go, line 429 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

In the final impl, is the plan that this code is in a subpackage and unexported so that the world will never see it? If so, 👍 if not let's discuss. I think a clear line between external states/invariants and internal ones is super helpful.

Yes, no one should see it, unless we decide that the Replica data-structure in kvserver wants to confirm that what it thinks about the replica's state agrees with what ReplicasStorage thinks.


pkg/storage/replicas_storage.go, line 432 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Might just be me but this name confuses me a bit. How about FullReplicaID because that is what this is - it's a fully qualified replicaID which uniquely identifies a replica.

Done


pkg/storage/replicas_storage.go, line 437 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

when this is in a subpkg, could also consider calling this Replica.

Ack


pkg/storage/replicas_storage.go, line 459 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Do you want to point out the behavior that if the log is [a, b, c, d] and one appends e at index 2, the result will be [a, e]? I.e. there are semantics that are upheld by the caller here. Probably best to say that the caller is expected to follow Raft semantics here and that ReplicasStorage won't try to check or even understand the details more than required by it.

Done


pkg/storage/replicas_storage.go, line 498 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Yeah, we need to maintain the raft log size (as nobody is really planning on overhauling this in time). This will primarily affect the truncations (the appends can be tracked outside). Perhaps for now the storage "just" gets a callback that it invokes when it actually truncates entries, I think that would be the main change from the status quo.

Added this suggestion.


pkg/storage/replicas_storage.go, line 505 at r5 (raw file):

what do you mean here and what are these variables?

I've updated the text to say: The snapshot corresponds to application of the log up to raftAppliedIndex. raftAppliedIndexTerm is the term of that log entry.

The snapshot contains an index and this references the applied index it represents, that's it. The index and term in the HardState should remain unchanged (if they don't today, I think that's ill-advised). The snapshot also contains the log index and log term for AppliedIndex which we need to write the truncated state.

Yes, this is for RaftTruncatedState and not for the HardState. Optionally, ReplicasStorage could read the engine after applying the snapshot to get the raftAppliedIndex and raftAppliedIndexTerm. IIRC, we have the former handy in the calling code, which is why it became a parameter. Additionally, it has the advantage that ReplicasStorage can do the error checking mentioned in the "Snapshot ingestion will not be accepted if" comment below.


pkg/storage/replicas_storage.go, line 510 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Might want to point out explicitly that if the existing range is wider than the original, we'll clear the wider span. That is, we'll always replace all of the state machine state.

I've clarified here that everything in span plus local keys will be cleared. If the existing range is wider than the original, that is handled in the implementation of IngestRangeSnapshot as indicated below (there is now a forward reference here). It is somewhat debatable whether the caller should already know, and setup the wider span. For now, I've made the caller oblivious to that change since it seems reasonable that ReplicasStorage, which is maintaining some of the invariants, handle it. We can change our mind later if needed.


pkg/storage/replicas_storage.go, line 516 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

It would be easier to define "span of a replica" as "span of most recent visible version of range descriptor at the current applied state" to avoid having to define this ad-hoc multiple times.

I've now defined "replica-descriptor" to be latest non-provisional RangeDescriptor and used it in the other places.


pkg/storage/replicas_storage.go, line 522 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This isn't valid, as it should be precluded by invariants. Make clear which of these cases are expected refusals vs invariant violations. I think both here are invariant violations.

I find it relatively easier to write comments that refer to Raft invariants, since they can be considered more universal (even if they are specific to etcd/raft), versus invariants in code in kvserver. My understanding is that the second invariant here is due to Raft -- I've called that out. The first one is only prevented if some kvserver code notices that an existing range overlaps. I have a mild preference to not refer to this as an invariant that the caller must maintain -- it is of course free to do so.


pkg/storage/replicas_storage.go, line 557 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

must?

Done

Copy link
Collaborator Author

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten and @tbg)


pkg/storage/replicas_storage.go, line 133 at r5 (raw file):

Previously, sumeerbhola wrote…

Updated.

We need to apply conf changes too, because they may precede one of these sst ingests.

i am rereading https://cockroachlabs.slack.com/archives/C02KHQMF2US/p1635523384013200 where we discussed configuration changes, and rereading etcd-io/etcd#7625 (comment). The argument I had made in the slack thread was that Init would roll forward to HardState.Commit so there was no chance that the state machine would regress. But at that time I did not realize that HardState.Commit updates are not always synced, so HardState.Commit itself could regress. You made the following point on the issue:

(Append) is the interesting one. On the leader, it is straightforward since the
commit index is available and is guaranteed above that of the previous config
change due to (Apply); we just need to make sure that when we append the new
entry, we also persist the known commit index durably.

ReplicasStorage is not changing anything regarding the Commit index corresponding to B being durable when C is appended (it's the caller's responsibility). So if we do roll forward all the way to the durable Committed (except for splits and merges which stall us, which is fine since we know we could not have applied them prior to the crash), then the regression is bounded.
What do you think about shoring up that correctness argument by making the committed index durable for a conf change before applying it? We could introduce a ApplyConfChange(MutationBatch, highestRaftIndex uint64) method, like we have for ingestion, and first sync the Commit state if needed. That way we will not have any conf change regression.

I've added a TODO to resolve this conf change question.

ReplicasStorage provides an interface to manage the persistent state that
includes the lifecycle of a range replica, its raft log, and the state
machine state. The implementation(s) are expected to be a stateless wrapper
around persistent state in the underlying engine(s) (any state they
maintain in-memory would be simply a performance optimization and always
be in-sync with the persistent state).

We consider the following distinct kinds of persistent state:
- State machine state: It contains all replicated keys: replicated range-id
  local keys, range local keys, range lock keys, lock table keys, global
  keys. This includes the RangeAppliedState and the RangeDescriptor.

- Raft and replica life-cycle state: This includes all the unreplicated
  range-ID local key names prefixed by Raft. We refer to this as
  "raft state".

The interface requires that any mutation (batch or sst) only touch one of
these kinds of state. This discipline will allow us to eventually separate
the engines containing these two kinds of state. This interface is not
relevant for store local keys though they will be in the latter engine. The
interface does not allow the caller to specify whether to sync a mutation
to the raft log or state machine state -- that decision is left to the
implementation of ReplicasStorage. So the hope is that even when we don't
separate the state machine and raft engines, this abstraction will force us
to reason more carefully about effects of crashes, and when to sync, and
allow us to test more thoroughly (including "crash" testing using
strict-mem FS). The RangeTombstoneKey, even though it is unreplicated,
is placed in the engine with the state machine -- this is necessary to
preserve consistency in case of a crash.

ReplicasStorage does not interpret most of the data in the state machine.
It expects mutations to that state to be provided as an opaque batch, or a
set of files to be ingested. There are a few exceptions where it can read
state machine state, mainly when recovering from a crash, so as to make
changes to get to a consistent state.
- RangeAppliedStateKey: needs to read this in order to truncate the log,
  both as part of regular log truncation and on crash recovery.
- RangeDescriptorKey: needs to read this to discover spans of initialized
  replicas.

A corollary to this lack of interpretation is that reads of the state
machine are not handled by this interface, though it does expose some
metadata in case the reader want to be sure that the replica it is trying to
read actually exists in storage. ReplicasStorage also does not offer an
interface to construct changes to the state machine state. It simply
applies changes, and requires the caller to obey some simple invariants to
not cause inconsistencies. It is aware of the keyspace occupied by a replica
and the difference between rangeID keys and range keys -- it needs this
awareness to restore internal consistency when initializing (say after a
crash), by clearing the state machine state for replicas that should no
longer exist.

ReplicasStorage does interpret the raft state (all the unreplicated
range-ID local key names prefixed by Raft), and the RangeTombstoneKey. This
is necessary for it to be able to maintain invariants spanning the raft log
and the state machine (related to raft log truncation, replica lifetime
etc.), including reapplying raft log entries on restart to the state
machine. All accesses (read or write) to the raft log and RangeTombstoneKey
must happen via ReplicasStorage.

Since this abstraction is mutating the same underlying engine state that
was previously mutated via lower-level interfaces, and is not a
data-structure in the usual sense, we should be able to migrate callers
incrementally to use this interface. That is, callers that use this
interface, and those that use the lower-level engine interfaces could
co-exist correctly.

Informs cockroachdb#38322

Release note: None
@sumeerbhola
Copy link
Collaborator Author

I'll merge this and resolve new comments in the next PR.

@sumeerbhola
Copy link
Collaborator Author

bors r=tbg

@craig
Copy link
Contributor

craig bot commented Dec 15, 2021

Build succeeded:

@craig craig bot merged commit ef789b1 into cockroachdb:master Dec 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants