From 1741cd3d9cf80c2c95f5db1526f8519de762c5d3 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 18 Jul 2017 10:24:49 -0400 Subject: [PATCH] storage: don't clobber HardState on splits Since the move to proposer-evaluated KV, we were potentially clobbering the HardState on splits as we accidentally moved HardState synthesis upstream of Raft as well. This change moves it downstream again. Though not strictly necessary, writing lastIndex was moved as well. This is cosmetic, though it aids @irfansharif's PR #16809, which moves lastIndex to the Raft engine. After this PR, neither HardState nor last index keys are added to the WriteBatch, so that pre-#16993 `TruncateLog` is the only remaining command that does so (and it, too, won't keep doing that for long). Migration concerns: a lease holder running the new version will propose splits that don't propose the HardState to Raft. A follower running the old version will not write the HardState downstream of Raft. In combination, the HardState would never get written, and would thus be incompatible with the TruncatedState. Thus, while 1.0 might be around, we're still sending the potentially dangerous HardState. Fixes #16749. --- pkg/storage/below_raft_protos_test.go | 15 +++++ pkg/storage/replica.go | 6 +- pkg/storage/replica_command.go | 74 +++++++++++++---------- pkg/storage/replica_raftstorage.go | 4 ++ pkg/storage/replica_state.go | 84 +++++++++++++++++++-------- pkg/storage/replica_state_test.go | 12 ++-- pkg/storage/replica_test.go | 2 - pkg/storage/store.go | 12 +++- pkg/storage/store_test.go | 4 +- 9 files changed, 143 insertions(+), 70 deletions(-) diff --git a/pkg/storage/below_raft_protos_test.go b/pkg/storage/below_raft_protos_test.go index 4fbdc2351fea..19c12a09da9f 100644 --- a/pkg/storage/below_raft_protos_test.go +++ b/pkg/storage/below_raft_protos_test.go @@ -23,6 +23,7 @@ import ( "reflect" "testing" + "github.com/coreos/etcd/raft/raftpb" "github.com/gogo/protobuf/proto" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" @@ -73,6 +74,20 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ emptySum: 18064891702890239528, populatedSum: 4287370248246326846, }, + reflect.TypeOf(&raftpb.HardState{}): { + populatedConstructor: func(r *rand.Rand) proto.Message { + n := r.Uint64() + // NB: this would rot if HardState ever picked up more (relevant) + // fields, but that's unlikely. + return &raftpb.HardState{ + Term: n % 3, + Vote: n % 7, + Commit: n % 11, + } + }, + emptySum: 13621293256077144893, + populatedSum: 13375098491754757572, + }, } func TestBelowRaftProtos(t *testing.T) { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index b0c21daf66fb..5892cc82058b 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -4394,7 +4394,7 @@ func (r *Replica) applyRaftCommand( start := timeutil.Now() var assertHS *raftpb.HardState - if util.RaceEnabled && rResult.Split != nil { + if util.RaceEnabled && rResult.Split != nil && util.IsMigrated() { oldHS, err := loadHardState(ctx, r.store.Engine(), rResult.Split.RightDesc.RangeID) if err != nil { log.Fatalf(ctx, "unable to load HardState: %s", err) @@ -4410,11 +4410,11 @@ func (r *Replica) applyRaftCommand( // Load the HardState that was just committed (if any). newHS, err := loadHardState(ctx, r.store.Engine(), rResult.Split.RightDesc.RangeID) if err != nil { - panic(err) + log.Fatalf(ctx, "unable to load HardState: %s", err) } // Assert that nothing moved "backwards". if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { - log.Fatalf(ctx, "clobbered hard state: %s\n\npreviously: %s\noverwritten with: %s", + log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) } } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index d824af3d4d2a..93c1b20bc50e 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -3004,34 +3004,6 @@ func splitTrigger( return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to account for enginepb.MVCCStats's own stats impact") } - // Writing the initial state is subtle since this also seeds the Raft - // group. We are writing to the right hand side's Raft group state in this - // batch so we need to synchronize with anything else that could be - // touching that replica's Raft state. Specifically, we want to prohibit an - // uninitialized Replica from receiving a message for the right hand side - // range and performing raft processing. This is achieved by serializing - // execution of uninitialized Replicas in Store.processRaft and ensuring - // that no uninitialized Replica is being processed while an initialized - // one (like the one currently being split) is being processed. - // - // Note also that it is crucial that writeInitialState *absorbs* an - // existing HardState (which might contain a cast vote). We load the - // existing HardState from the underlying engine instead of the batch - // because batch reads are from a snapshot taken at the point in time when - // the first read was performed on the batch. This last requirement is not - // currently needed due to the uninitialized Replica synchronization - // mentioned above, but future work will relax that synchronization, moving - // it from before the point that batch was created to this method. We want - // to see any writes to the hard state that were performed between the - // creation of the batch and that synchronization point. The only drawback - // to not reading from the batch is that we won't see any writes to the - // right hand side's hard state that were previously made in the batch - // (which should be impossible). - oldHS, err := loadHardState(ctx, rec.Engine(), split.RightDesc.RangeID) - if err != nil { - return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to load hard state") - } - // Initialize the right-hand lease to be the same as the left-hand lease. // Various pieces of code rely on a replica's lease never being unitialized, // but it's more than that - it ensures that we properly initialize the // timestamp cache, which is only populated on the lease holder, from that @@ -3084,12 +3056,52 @@ func splitTrigger( log.VEventf(ctx, 1, "LHS's TxnSpanGCThreshold of split is not set") } - rightMS, err = writeInitialState( - ctx, batch, rightMS, split.RightDesc, oldHS, rightLease, gcThreshold, txnSpanGCThreshold, + // Writing the initial state is subtle since this also seeds the Raft + // group. It becomes more subtle due to proposer-evaluated Raft. + // + // We are writing to the right hand side's Raft group state in this + // batch so we need to synchronize with anything else that could be + // touching that replica's Raft state. Specifically, we want to prohibit + // an uninitialized Replica from receiving a message for the right hand + // side range and performing raft processing. This is achieved by + // serializing execution of uninitialized Replicas in Store.processRaft + // and ensuring that no uninitialized Replica is being processed while + // an initialized one (like the one currently being split) is being + // processed. + // + // Since the right hand side of the split's Raft group may already + // exist, we must be prepared to absorb an existing HardState. The Raft + // group may already exist because other nodes could already have + // processed the split and started talking to our node, prompting the + // creation of a Raft group that can vote and bump its term, but not + // much else: it can't receive snapshots because those intersect the + // pre-split range; it can't apply log commands because it needs a + // snapshot first. + // + // However, we can't absorb the right-hand side's HardState here because + // we only *evaluate* the proposal here, but by the time it is + // *applied*, the HardState could have changed. We do this downstream of + // Raft, in splitPostApply, where we write the last index and the + // HardState via a call to synthesizeRaftState. Here, we only call + // writeInitialReplicaState which essentially writes a ReplicaState + // only. + rightMS, err = writeInitialReplicaState( + ctx, batch, rightMS, split.RightDesc, rightLease, gcThreshold, txnSpanGCThreshold, ) if err != nil { - return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to write initial state") + return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to write initial Replica state") } + + if !util.IsMigrated() { + // Write an initial state upstream of Raft even though it might + // clobber downstream simply because that's what 1.0 does and if we + // don't write it here, then a 1.0 version applying it as a follower + // won't write a HardState at all and is guaranteed to crash. + if err := makeReplicaStateLoader(split.RightDesc.RangeID).synthesizeRaftState(ctx, batch); err != nil { + return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to synthesize initial Raft state") + } + } + bothDeltaMS.Subtract(preRightMS) bothDeltaMS.Add(rightMS) } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 8a5d7a2a31ec..d65c1a285659 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -771,6 +771,10 @@ func (r *Replica) applySnapshot( // increases the committed index as a result of the snapshot, but who is to // say it isn't going to accept a snapshot which is identical to the current // state? + // + // Note that since this snapshot comes from Raft, we don't have to synthesize + // the HardState -- Raft wouldn't ask us to update the HardState in incorrect + // ways. if !raft.IsEmptyHardState(hs) { if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatch, hs); err != nil { return errors.Wrapf(err, "unable to persist HardState %+v", &hs) diff --git a/pkg/storage/replica_state.go b/pkg/storage/replica_state.go index c4f88a8abad7..151cb7c65d96 100644 --- a/pkg/storage/replica_state.go +++ b/pkg/storage/replica_state.go @@ -459,21 +459,45 @@ func (rsl replicaStateLoader) setHardState( rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st) } -// synthesizeHardState synthesizes a HardState from the given ReplicaState and -// any existing on-disk HardState in the context of a snapshot, while verifying -// that the application of the snapshot does not violate Raft invariants. It -// must be called after the supplied state and ReadWriter have been updated -// with the result of the snapshot. -// If there is an existing HardState, we must respect it and we must not apply -// a snapshot that would move the state backwards. +// synthesizeRaftState creates a Raft state which synthesizes both a HardState +// and a lastIndex from pre-seeded data in the engine (typically created via +// writeInitialReplicaState and, on a split, perhaps the activity of an +// uninitialized Raft group) +func (rsl replicaStateLoader) synthesizeRaftState( + ctx context.Context, eng engine.ReadWriter, +) error { + hs, err := rsl.loadHardState(ctx, eng) + if err != nil { + return err + } + truncState, err := rsl.loadTruncatedState(ctx, eng) + if err != nil { + return err + } + raftAppliedIndex, _, err := rsl.loadAppliedIndex(ctx, eng) + if err != nil { + return err + } + if err := rsl.synthesizeHardState(ctx, eng, hs, truncState, raftAppliedIndex); err != nil { + return err + } + return rsl.setLastIndex(ctx, eng, truncState.Index) +} + +// synthesizeHardState synthesizes an on-disk HardState from the given input, +// taking care that a HardState compatible with the existing data is written. func (rsl replicaStateLoader) synthesizeHardState( - ctx context.Context, eng engine.ReadWriter, s storagebase.ReplicaState, oldHS raftpb.HardState, + ctx context.Context, + eng engine.ReadWriter, + oldHS raftpb.HardState, + truncState roachpb.RaftTruncatedState, + raftAppliedIndex uint64, ) error { newHS := raftpb.HardState{ - Term: s.TruncatedState.Term, + Term: truncState.Term, // Note that when applying a Raft snapshot, the applied index is // equal to the Commit index represented by the snapshot. - Commit: s.RaftAppliedIndex, + Commit: raftAppliedIndex, } if oldHS.Commit > newHS.Commit { @@ -495,19 +519,18 @@ func (rsl replicaStateLoader) synthesizeHardState( return errors.Wrapf(err, "writing HardState %+v", &newHS) } -// writeInitialState bootstraps a new Raft group (i.e. it is called when we -// bootstrap a Range, or when setting up the right hand side of a split). -// Its main task is to persist a consistent Raft (and associated Replica) state -// which does not start from zero but presupposes a few entries already having -// applied. -// The supplied MVCCStats are used for the Stats field after adjusting for -// persisting the state itself, and the updated stats are returned. -func writeInitialState( +// writeInitialReplicaState sets up a new Range, but without writing an +// associated Raft state (which must be written separately via +// synthesizeRaftState before instantiating a Replica). The main task is to +// persist a ReplicaState which does not start from zero but presupposes a few +// entries already having applied. The supplied MVCCStats are used for the Stats +// field after adjusting for persisting the state itself, and the updated stats +// are returned. +func writeInitialReplicaState( ctx context.Context, eng engine.ReadWriter, ms enginepb.MVCCStats, desc roachpb.RangeDescriptor, - oldHS raftpb.HardState, lease roachpb.Lease, gcThreshold hlc.Timestamp, txnSpanGCThreshold hlc.Timestamp, @@ -551,14 +574,29 @@ func writeInitialState( return enginepb.MVCCStats{}, err } - if err := rsl.synthesizeHardState(ctx, eng, s, oldHS); err != nil { + return newMS, nil +} + +// writeInitialState calls writeInitialReplicaState followed by +// synthesizeRaftState. It is typically called during bootstrap. The supplied +// MVCCStats are used for the Stats field after adjusting for persisting the +// state itself, and the updated stats are returned. +func writeInitialState( + ctx context.Context, + eng engine.ReadWriter, + ms enginepb.MVCCStats, + desc roachpb.RangeDescriptor, + lease roachpb.Lease, + gcThreshold hlc.Timestamp, + txnSpanGCThreshold hlc.Timestamp, +) (enginepb.MVCCStats, error) { + newMS, err := writeInitialReplicaState(ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold) + if err != nil { return enginepb.MVCCStats{}, err } - - if err := rsl.setLastIndex(ctx, eng, s.TruncatedState.Index); err != nil { + if err := makeReplicaStateLoader(desc.RangeID).synthesizeRaftState(ctx, eng); err != nil { return enginepb.MVCCStats{}, err } - return newMS, nil } diff --git a/pkg/storage/replica_state_test.go b/pkg/storage/replica_state_test.go index 0e1652c414aa..221d943b0a65 100644 --- a/pkg/storage/replica_state_test.go +++ b/pkg/storage/replica_state_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -60,12 +59,7 @@ func TestSynthesizeHardState(t *testing.T) { func() { batch := eng.NewBatch() defer batch.Close() - testState := storagebase.ReplicaState{ - Desc: testRangeDescriptor(), - TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm}, - RaftAppliedIndex: test.RaftAppliedIndex, - } - rsl := makeReplicaStateLoader(testState.Desc.RangeID) + rsl := makeReplicaStateLoader(1) if test.OldHS != nil { if err := rsl.setHardState(context.Background(), batch, *test.OldHS); err != nil { @@ -78,7 +72,9 @@ func TestSynthesizeHardState(t *testing.T) { t.Fatal(err) } - err = rsl.synthesizeHardState(context.Background(), batch, testState, oldHS) + err = rsl.synthesizeHardState( + context.Background(), batch, oldHS, roachpb.RaftTruncatedState{Term: test.TruncTerm}, test.RaftAppliedIndex, + ) if !testutils.IsError(err, test.Err) { t.Fatalf("%d: expected %q got %v", i, test.Err, err) } else if err != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 0817d86b76b5..cd012378cc25 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/gogo/protobuf/proto" "github.com/kr/pretty" "github.com/pkg/errors" @@ -200,7 +199,6 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, tc.store.Engine(), enginepb.MVCCStats{}, *testDesc, - raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 82eae67cff06..25bb63970c56 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1743,7 +1743,7 @@ func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error { return err } - updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}) + updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}) if err != nil { return err } @@ -1832,6 +1832,16 @@ func splitPostApply( } } + // Finish up the initialization of the RHS' RaftState now that we have + // committed the split Batch (which included the initialization of the + // ReplicaState). This will synthesize and persist the correct lastIndex and + // HardState. + if err := makeReplicaStateLoader(split.RightDesc.RangeID).synthesizeRaftState( + ctx, r.store.Engine(), + ); err != nil { + log.Fatal(ctx, err) + } + // Finish initialization of the RHS. r.mu.Lock() diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 92b034850b17..e8b7ef3b0e71 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1008,7 +1008,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep // Minimal amount of work to keep this deprecated machinery working: Write // some required Raft keys. if _, err := writeInitialState( - context.Background(), store.engine, enginepb.MVCCStats{}, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, + context.Background(), store.engine, enginepb.MVCCStats{}, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, ); err != nil { t.Fatal(err) } @@ -2263,7 +2263,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { } if _, err := writeInitialState( - ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, + ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, ); err != nil { t.Fatal(err) }