From dfc176892bc2b756af4ffe377183d61a8a195ef1 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 17 Jul 2017 14:15:56 -0400 Subject: [PATCH] storage: don't clobber HardState on splits Since the move to proposer-evaluated KV, we were potentially clobbering the HardState on splits as we accidentally moved HardState synthesis upstream of Raft as well. This change moves it downstream again. Though not strictly necessary, writing lastIndex was moved as well. This is cosmetic, though it aids @irfansharif's PR #16809, which moves lastIndex to the Raft engine. After this PR, neither HardState nor last index keys are added to the WriteBatch, so that pre-#16993 `TruncateLog` is the only remaining command that does so (and it, too, won't keep doing that for long). Note that there is no migration concern. Fixes #16749. --- pkg/storage/replica_command.go | 61 ++++++++++----------- pkg/storage/replica_raftstorage.go | 4 ++ pkg/storage/replica_state.go | 85 ++++++++++++++++++++++-------- pkg/storage/replica_state_test.go | 12 ++--- pkg/storage/replica_test.go | 2 - pkg/storage/store.go | 12 ++++- pkg/storage/store_test.go | 4 +- 7 files changed, 116 insertions(+), 64 deletions(-) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index fae38378cc50..4440d4d36110 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -2981,34 +2981,6 @@ func splitTrigger( return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to account for enginepb.MVCCStats's own stats impact") } - // Writing the initial state is subtle since this also seeds the Raft - // group. We are writing to the right hand side's Raft group state in this - // batch so we need to synchronize with anything else that could be - // touching that replica's Raft state. Specifically, we want to prohibit an - // uninitialized Replica from receiving a message for the right hand side - // range and performing raft processing. This is achieved by serializing - // execution of uninitialized Replicas in Store.processRaft and ensuring - // that no uninitialized Replica is being processed while an initialized - // one (like the one currently being split) is being processed. - // - // Note also that it is crucial that writeInitialState *absorbs* an - // existing HardState (which might contain a cast vote). We load the - // existing HardState from the underlying engine instead of the batch - // because batch reads are from a snapshot taken at the point in time when - // the first read was performed on the batch. This last requirement is not - // currently needed due to the uninitialized Replica synchronization - // mentioned above, but future work will relax that synchronization, moving - // it from before the point that batch was created to this method. We want - // to see any writes to the hard state that were performed between the - // creation of the batch and that synchronization point. The only drawback - // to not reading from the batch is that we won't see any writes to the - // right hand side's hard state that were previously made in the batch - // (which should be impossible). - oldHS, err := loadHardState(ctx, rec.Engine(), split.RightDesc.RangeID) - if err != nil { - return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to load hard state") - } - // Initialize the right-hand lease to be the same as the left-hand lease. // Various pieces of code rely on a replica's lease never being unitialized, // but it's more than that - it ensures that we properly initialize the // timestamp cache, which is only populated on the lease holder, from that @@ -3061,8 +3033,37 @@ func splitTrigger( log.VEventf(ctx, 1, "LHS's TxnSpanGCThreshold of split is not set") } - rightMS, err = writeInitialState( - ctx, batch, rightMS, split.RightDesc, oldHS, rightLease, gcThreshold, txnSpanGCThreshold, + // Writing the initial state is subtle since this also seeds the Raft + // group. It becomes more subtle due to proposer-evaluated Raft. + // + // We are writing to the right hand side's Raft group state in this + // batch so we need to synchronize with anything else that could be + // touching that replica's Raft state. Specifically, we want to prohibit + // an uninitialized Replica from receiving a message for the right hand + // side range and performing raft processing. This is achieved by + // serializing execution of uninitialized Replicas in Store.processRaft + // and ensuring that no uninitialized Replica is being processed while + // an initialized one (like the one currently being split) is being + // processed. + // + // Since the right hand side of the split's Raft group may already + // exist, we must be prepared to absorb an existing HardState. The Raft + // group may already exist because other nodes could already have + // processed the split and started talking to our node, prompting the + // creation of a Raft group that can vote and bump its term, but not + // much else: it can't receive snapshots because those intersect the + // pre-split range; it can't apply log commands because it needs a + // snapshot first. + // + // However, we can't absorb the right-hand side's HardState here because + // we only *evaluate* the proposal here, but by the time it is + // *applied*, the HardState could have changed. We do this downstream of + // Raft, in splitPostApply, where we write the last index and the + // HardState via a call to synthesizeRaftState. Here, we only call + // writeInitialReplicaState which essentially writes a ReplicaState + // only. + rightMS, err = writeInitialReplicaState( + ctx, batch, rightMS, split.RightDesc, rightLease, gcThreshold, txnSpanGCThreshold, ) if err != nil { return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to write initial state") diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 8a5d7a2a31ec..d65c1a285659 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -771,6 +771,10 @@ func (r *Replica) applySnapshot( // increases the committed index as a result of the snapshot, but who is to // say it isn't going to accept a snapshot which is identical to the current // state? + // + // Note that since this snapshot comes from Raft, we don't have to synthesize + // the HardState -- Raft wouldn't ask us to update the HardState in incorrect + // ways. if !raft.IsEmptyHardState(hs) { if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatch, hs); err != nil { return errors.Wrapf(err, "unable to persist HardState %+v", &hs) diff --git a/pkg/storage/replica_state.go b/pkg/storage/replica_state.go index c4f88a8abad7..850c065e528e 100644 --- a/pkg/storage/replica_state.go +++ b/pkg/storage/replica_state.go @@ -459,21 +459,50 @@ func (rsl replicaStateLoader) setHardState( rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st) } +// synthesizeRaftState creates a Raft state which synthesizes both a HardState +// and a lastIndex from pre-seeded data in the engine (typically created via +// writeInitialReplicaState and, on a split, perhaps the activity of an +// uninitialized Raft group) +func (rsl replicaStateLoader) synthesizeRaftState( + ctx context.Context, eng engine.ReadWriter, +) error { + hs, err := rsl.loadHardState(ctx, eng) + if err != nil { + return err + } + truncState, err := rsl.loadTruncatedState(ctx, eng) + if err != nil { + return err + } + raftAppliedIndex, _, err := rsl.loadAppliedIndex(ctx, eng) + if err != nil { + return err + } + if err := rsl.synthesizeHardState(ctx, eng, hs, truncState, raftAppliedIndex); err != nil { + return err + } + return rsl.setLastIndex(ctx, eng, truncState.Index) +} + // synthesizeHardState synthesizes a HardState from the given ReplicaState and -// any existing on-disk HardState in the context of a snapshot, while verifying +// any existing on-disk HardState in the context of a split, while verifying // that the application of the snapshot does not violate Raft invariants. It -// must be called after the supplied state and ReadWriter have been updated -// with the result of the snapshot. -// If there is an existing HardState, we must respect it and we must not apply -// a snapshot that would move the state backwards. +// must be called after the supplied state and ReadWriter have been updated with +// the result of the snapshot. If there is an existing HardState, we must +// respect it and we must not apply a snapshot that would move the state +// backwards. func (rsl replicaStateLoader) synthesizeHardState( - ctx context.Context, eng engine.ReadWriter, s storagebase.ReplicaState, oldHS raftpb.HardState, + ctx context.Context, + eng engine.ReadWriter, + oldHS raftpb.HardState, + truncState roachpb.RaftTruncatedState, + raftAppliedIndex uint64, ) error { newHS := raftpb.HardState{ - Term: s.TruncatedState.Term, + Term: truncState.Term, // Note that when applying a Raft snapshot, the applied index is // equal to the Commit index represented by the snapshot. - Commit: s.RaftAppliedIndex, + Commit: raftAppliedIndex, } if oldHS.Commit > newHS.Commit { @@ -495,19 +524,18 @@ func (rsl replicaStateLoader) synthesizeHardState( return errors.Wrapf(err, "writing HardState %+v", &newHS) } -// writeInitialState bootstraps a new Raft group (i.e. it is called when we -// bootstrap a Range, or when setting up the right hand side of a split). -// Its main task is to persist a consistent Raft (and associated Replica) state -// which does not start from zero but presupposes a few entries already having -// applied. -// The supplied MVCCStats are used for the Stats field after adjusting for -// persisting the state itself, and the updated stats are returned. -func writeInitialState( +// writeInitialReplicaState sets up a new Range, but without writing an +// associated Raft state (which must be written separately via +// synthesizeRaftState before instantiating a Replica). The main task is to +// persist a ReplicaState which does not start from zero but presupposes a few +// entries already having applied. The supplied MVCCStats are used for the Stats +// field after adjusting for persisting the state itself, and the updated stats +// are returned. +func writeInitialReplicaState( ctx context.Context, eng engine.ReadWriter, ms enginepb.MVCCStats, desc roachpb.RangeDescriptor, - oldHS raftpb.HardState, lease roachpb.Lease, gcThreshold hlc.Timestamp, txnSpanGCThreshold hlc.Timestamp, @@ -551,14 +579,29 @@ func writeInitialState( return enginepb.MVCCStats{}, err } - if err := rsl.synthesizeHardState(ctx, eng, s, oldHS); err != nil { + return newMS, nil +} + +// writeInitialState calls writeInitialReplicaState followed by +// synthesizeRaftState. It is typically called during bootstrap. The supplied +// MVCCStats are used for the Stats field after adjusting for persisting the +// state itself, and the updated stats are returned. +func writeInitialState( + ctx context.Context, + eng engine.ReadWriter, + ms enginepb.MVCCStats, + desc roachpb.RangeDescriptor, + lease roachpb.Lease, + gcThreshold hlc.Timestamp, + txnSpanGCThreshold hlc.Timestamp, +) (enginepb.MVCCStats, error) { + newMS, err := writeInitialReplicaState(ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold) + if err != nil { return enginepb.MVCCStats{}, err } - - if err := rsl.setLastIndex(ctx, eng, s.TruncatedState.Index); err != nil { + if err := makeReplicaStateLoader(desc.RangeID).synthesizeRaftState(ctx, eng); err != nil { return enginepb.MVCCStats{}, err } - return newMS, nil } diff --git a/pkg/storage/replica_state_test.go b/pkg/storage/replica_state_test.go index 0e1652c414aa..221d943b0a65 100644 --- a/pkg/storage/replica_state_test.go +++ b/pkg/storage/replica_state_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -60,12 +59,7 @@ func TestSynthesizeHardState(t *testing.T) { func() { batch := eng.NewBatch() defer batch.Close() - testState := storagebase.ReplicaState{ - Desc: testRangeDescriptor(), - TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm}, - RaftAppliedIndex: test.RaftAppliedIndex, - } - rsl := makeReplicaStateLoader(testState.Desc.RangeID) + rsl := makeReplicaStateLoader(1) if test.OldHS != nil { if err := rsl.setHardState(context.Background(), batch, *test.OldHS); err != nil { @@ -78,7 +72,9 @@ func TestSynthesizeHardState(t *testing.T) { t.Fatal(err) } - err = rsl.synthesizeHardState(context.Background(), batch, testState, oldHS) + err = rsl.synthesizeHardState( + context.Background(), batch, oldHS, roachpb.RaftTruncatedState{Term: test.TruncTerm}, test.RaftAppliedIndex, + ) if !testutils.IsError(err, test.Err) { t.Fatalf("%d: expected %q got %v", i, test.Err, err) } else if err != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 4c6fa9ef0853..91c46edb3556 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/gogo/protobuf/proto" "github.com/kr/pretty" "github.com/pkg/errors" @@ -200,7 +199,6 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, tc.store.Engine(), enginepb.MVCCStats{}, *testDesc, - raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 82eae67cff06..25bb63970c56 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1743,7 +1743,7 @@ func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error { return err } - updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}) + updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}) if err != nil { return err } @@ -1832,6 +1832,16 @@ func splitPostApply( } } + // Finish up the initialization of the RHS' RaftState now that we have + // committed the split Batch (which included the initialization of the + // ReplicaState). This will synthesize and persist the correct lastIndex and + // HardState. + if err := makeReplicaStateLoader(split.RightDesc.RangeID).synthesizeRaftState( + ctx, r.store.Engine(), + ); err != nil { + log.Fatal(ctx, err) + } + // Finish initialization of the RHS. r.mu.Lock() diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 92b034850b17..e8b7ef3b0e71 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1008,7 +1008,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep // Minimal amount of work to keep this deprecated machinery working: Write // some required Raft keys. if _, err := writeInitialState( - context.Background(), store.engine, enginepb.MVCCStats{}, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, + context.Background(), store.engine, enginepb.MVCCStats{}, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, ); err != nil { t.Fatal(err) } @@ -2263,7 +2263,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { } if _, err := writeInitialState( - ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, + ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, ); err != nil { t.Fatal(err) }