diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index fae38378cc50..4440d4d36110 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -2981,34 +2981,6 @@ func splitTrigger( return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to account for enginepb.MVCCStats's own stats impact") } - // Writing the initial state is subtle since this also seeds the Raft - // group. We are writing to the right hand side's Raft group state in this - // batch so we need to synchronize with anything else that could be - // touching that replica's Raft state. Specifically, we want to prohibit an - // uninitialized Replica from receiving a message for the right hand side - // range and performing raft processing. This is achieved by serializing - // execution of uninitialized Replicas in Store.processRaft and ensuring - // that no uninitialized Replica is being processed while an initialized - // one (like the one currently being split) is being processed. - // - // Note also that it is crucial that writeInitialState *absorbs* an - // existing HardState (which might contain a cast vote). We load the - // existing HardState from the underlying engine instead of the batch - // because batch reads are from a snapshot taken at the point in time when - // the first read was performed on the batch. This last requirement is not - // currently needed due to the uninitialized Replica synchronization - // mentioned above, but future work will relax that synchronization, moving - // it from before the point that batch was created to this method. We want - // to see any writes to the hard state that were performed between the - // creation of the batch and that synchronization point. The only drawback - // to not reading from the batch is that we won't see any writes to the - // right hand side's hard state that were previously made in the batch - // (which should be impossible). - oldHS, err := loadHardState(ctx, rec.Engine(), split.RightDesc.RangeID) - if err != nil { - return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to load hard state") - } - // Initialize the right-hand lease to be the same as the left-hand lease. // Various pieces of code rely on a replica's lease never being unitialized, // but it's more than that - it ensures that we properly initialize the // timestamp cache, which is only populated on the lease holder, from that @@ -3061,8 +3033,37 @@ func splitTrigger( log.VEventf(ctx, 1, "LHS's TxnSpanGCThreshold of split is not set") } - rightMS, err = writeInitialState( - ctx, batch, rightMS, split.RightDesc, oldHS, rightLease, gcThreshold, txnSpanGCThreshold, + // Writing the initial state is subtle since this also seeds the Raft + // group. It becomes more subtle due to proposer-evaluated Raft. + // + // We are writing to the right hand side's Raft group state in this + // batch so we need to synchronize with anything else that could be + // touching that replica's Raft state. Specifically, we want to prohibit + // an uninitialized Replica from receiving a message for the right hand + // side range and performing raft processing. This is achieved by + // serializing execution of uninitialized Replicas in Store.processRaft + // and ensuring that no uninitialized Replica is being processed while + // an initialized one (like the one currently being split) is being + // processed. + // + // Since the right hand side of the split's Raft group may already + // exist, we must be prepared to absorb an existing HardState. The Raft + // group may already exist because other nodes could already have + // processed the split and started talking to our node, prompting the + // creation of a Raft group that can vote and bump its term, but not + // much else: it can't receive snapshots because those intersect the + // pre-split range; it can't apply log commands because it needs a + // snapshot first. + // + // However, we can't absorb the right-hand side's HardState here because + // we only *evaluate* the proposal here, but by the time it is + // *applied*, the HardState could have changed. We do this downstream of + // Raft, in splitPostApply, where we write the last index and the + // HardState via a call to synthesizeRaftState. Here, we only call + // writeInitialReplicaState which essentially writes a ReplicaState + // only. + rightMS, err = writeInitialReplicaState( + ctx, batch, rightMS, split.RightDesc, rightLease, gcThreshold, txnSpanGCThreshold, ) if err != nil { return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to write initial state") diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 8a5d7a2a31ec..d65c1a285659 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -771,6 +771,10 @@ func (r *Replica) applySnapshot( // increases the committed index as a result of the snapshot, but who is to // say it isn't going to accept a snapshot which is identical to the current // state? + // + // Note that since this snapshot comes from Raft, we don't have to synthesize + // the HardState -- Raft wouldn't ask us to update the HardState in incorrect + // ways. if !raft.IsEmptyHardState(hs) { if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatch, hs); err != nil { return errors.Wrapf(err, "unable to persist HardState %+v", &hs) diff --git a/pkg/storage/replica_state.go b/pkg/storage/replica_state.go index c4f88a8abad7..850c065e528e 100644 --- a/pkg/storage/replica_state.go +++ b/pkg/storage/replica_state.go @@ -459,21 +459,50 @@ func (rsl replicaStateLoader) setHardState( rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st) } +// synthesizeRaftState creates a Raft state which synthesizes both a HardState +// and a lastIndex from pre-seeded data in the engine (typically created via +// writeInitialReplicaState and, on a split, perhaps the activity of an +// uninitialized Raft group) +func (rsl replicaStateLoader) synthesizeRaftState( + ctx context.Context, eng engine.ReadWriter, +) error { + hs, err := rsl.loadHardState(ctx, eng) + if err != nil { + return err + } + truncState, err := rsl.loadTruncatedState(ctx, eng) + if err != nil { + return err + } + raftAppliedIndex, _, err := rsl.loadAppliedIndex(ctx, eng) + if err != nil { + return err + } + if err := rsl.synthesizeHardState(ctx, eng, hs, truncState, raftAppliedIndex); err != nil { + return err + } + return rsl.setLastIndex(ctx, eng, truncState.Index) +} + // synthesizeHardState synthesizes a HardState from the given ReplicaState and -// any existing on-disk HardState in the context of a snapshot, while verifying +// any existing on-disk HardState in the context of a split, while verifying // that the application of the snapshot does not violate Raft invariants. It -// must be called after the supplied state and ReadWriter have been updated -// with the result of the snapshot. -// If there is an existing HardState, we must respect it and we must not apply -// a snapshot that would move the state backwards. +// must be called after the supplied state and ReadWriter have been updated with +// the result of the snapshot. If there is an existing HardState, we must +// respect it and we must not apply a snapshot that would move the state +// backwards. func (rsl replicaStateLoader) synthesizeHardState( - ctx context.Context, eng engine.ReadWriter, s storagebase.ReplicaState, oldHS raftpb.HardState, + ctx context.Context, + eng engine.ReadWriter, + oldHS raftpb.HardState, + truncState roachpb.RaftTruncatedState, + raftAppliedIndex uint64, ) error { newHS := raftpb.HardState{ - Term: s.TruncatedState.Term, + Term: truncState.Term, // Note that when applying a Raft snapshot, the applied index is // equal to the Commit index represented by the snapshot. - Commit: s.RaftAppliedIndex, + Commit: raftAppliedIndex, } if oldHS.Commit > newHS.Commit { @@ -495,19 +524,18 @@ func (rsl replicaStateLoader) synthesizeHardState( return errors.Wrapf(err, "writing HardState %+v", &newHS) } -// writeInitialState bootstraps a new Raft group (i.e. it is called when we -// bootstrap a Range, or when setting up the right hand side of a split). -// Its main task is to persist a consistent Raft (and associated Replica) state -// which does not start from zero but presupposes a few entries already having -// applied. -// The supplied MVCCStats are used for the Stats field after adjusting for -// persisting the state itself, and the updated stats are returned. -func writeInitialState( +// writeInitialReplicaState sets up a new Range, but without writing an +// associated Raft state (which must be written separately via +// synthesizeRaftState before instantiating a Replica). The main task is to +// persist a ReplicaState which does not start from zero but presupposes a few +// entries already having applied. The supplied MVCCStats are used for the Stats +// field after adjusting for persisting the state itself, and the updated stats +// are returned. +func writeInitialReplicaState( ctx context.Context, eng engine.ReadWriter, ms enginepb.MVCCStats, desc roachpb.RangeDescriptor, - oldHS raftpb.HardState, lease roachpb.Lease, gcThreshold hlc.Timestamp, txnSpanGCThreshold hlc.Timestamp, @@ -551,14 +579,29 @@ func writeInitialState( return enginepb.MVCCStats{}, err } - if err := rsl.synthesizeHardState(ctx, eng, s, oldHS); err != nil { + return newMS, nil +} + +// writeInitialState calls writeInitialReplicaState followed by +// synthesizeRaftState. It is typically called during bootstrap. The supplied +// MVCCStats are used for the Stats field after adjusting for persisting the +// state itself, and the updated stats are returned. +func writeInitialState( + ctx context.Context, + eng engine.ReadWriter, + ms enginepb.MVCCStats, + desc roachpb.RangeDescriptor, + lease roachpb.Lease, + gcThreshold hlc.Timestamp, + txnSpanGCThreshold hlc.Timestamp, +) (enginepb.MVCCStats, error) { + newMS, err := writeInitialReplicaState(ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold) + if err != nil { return enginepb.MVCCStats{}, err } - - if err := rsl.setLastIndex(ctx, eng, s.TruncatedState.Index); err != nil { + if err := makeReplicaStateLoader(desc.RangeID).synthesizeRaftState(ctx, eng); err != nil { return enginepb.MVCCStats{}, err } - return newMS, nil } diff --git a/pkg/storage/replica_state_test.go b/pkg/storage/replica_state_test.go index 0e1652c414aa..221d943b0a65 100644 --- a/pkg/storage/replica_state_test.go +++ b/pkg/storage/replica_state_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -60,12 +59,7 @@ func TestSynthesizeHardState(t *testing.T) { func() { batch := eng.NewBatch() defer batch.Close() - testState := storagebase.ReplicaState{ - Desc: testRangeDescriptor(), - TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm}, - RaftAppliedIndex: test.RaftAppliedIndex, - } - rsl := makeReplicaStateLoader(testState.Desc.RangeID) + rsl := makeReplicaStateLoader(1) if test.OldHS != nil { if err := rsl.setHardState(context.Background(), batch, *test.OldHS); err != nil { @@ -78,7 +72,9 @@ func TestSynthesizeHardState(t *testing.T) { t.Fatal(err) } - err = rsl.synthesizeHardState(context.Background(), batch, testState, oldHS) + err = rsl.synthesizeHardState( + context.Background(), batch, oldHS, roachpb.RaftTruncatedState{Term: test.TruncTerm}, test.RaftAppliedIndex, + ) if !testutils.IsError(err, test.Err) { t.Fatalf("%d: expected %q got %v", i, test.Err, err) } else if err != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 4c6fa9ef0853..91c46edb3556 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/gogo/protobuf/proto" "github.com/kr/pretty" "github.com/pkg/errors" @@ -200,7 +199,6 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, tc.store.Engine(), enginepb.MVCCStats{}, *testDesc, - raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 82eae67cff06..25bb63970c56 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1743,7 +1743,7 @@ func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error { return err } - updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}) + updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}) if err != nil { return err } @@ -1832,6 +1832,16 @@ func splitPostApply( } } + // Finish up the initialization of the RHS' RaftState now that we have + // committed the split Batch (which included the initialization of the + // ReplicaState). This will synthesize and persist the correct lastIndex and + // HardState. + if err := makeReplicaStateLoader(split.RightDesc.RangeID).synthesizeRaftState( + ctx, r.store.Engine(), + ); err != nil { + log.Fatal(ctx, err) + } + // Finish initialization of the RHS. r.mu.Lock() diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 92b034850b17..e8b7ef3b0e71 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1008,7 +1008,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep // Minimal amount of work to keep this deprecated machinery working: Write // some required Raft keys. if _, err := writeInitialState( - context.Background(), store.engine, enginepb.MVCCStats{}, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, + context.Background(), store.engine, enginepb.MVCCStats{}, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, ); err != nil { t.Fatal(err) } @@ -2263,7 +2263,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { } if _, err := writeInitialState( - ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, + ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, ); err != nil { t.Fatal(err) }