Skip to content

Commit

Permalink
storage: don't clobber HardState on splits
Browse files Browse the repository at this point in the history
Since the move to proposer-evaluated KV, we were potentially clobbering the
HardState on splits as we accidentally moved HardState synthesis upstream of
Raft as well. This change moves it downstream again.

Though not strictly necessary, writing lastIndex was moved as well. This is
cosmetic, though it aids @irfansharif's PR cockroachdb#16809, which moves lastIndex to
the Raft engine. After this PR, neither HardState nor last index keys are
added to the WriteBatch, so that pre-cockroachdb#16993 `TruncateLog` is the only remaining
command that does so (and it, too, won't keep doing that for long).

Migration concerns: a lease holder running the new version will propose splits
that don't propose the HardState to Raft. A follower running the old version
will not write the HardState downstream of Raft. In combination, the HardState
would never get written, and would thus be incompatible with the
TruncatedState. Thus, while 1.0 might be around, we're still sending the
potentially dangerous HardState.

Fixes cockroachdb#16749.
  • Loading branch information
tbg committed Jul 18, 2017
1 parent 6515f70 commit 1741cd3
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 70 deletions.
15 changes: 15 additions & 0 deletions pkg/storage/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"testing"

"github.com/coreos/etcd/raft/raftpb"
"github.com/gogo/protobuf/proto"

"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -73,6 +74,20 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
emptySum: 18064891702890239528,
populatedSum: 4287370248246326846,
},
reflect.TypeOf(&raftpb.HardState{}): {
populatedConstructor: func(r *rand.Rand) proto.Message {
n := r.Uint64()
// NB: this would rot if HardState ever picked up more (relevant)
// fields, but that's unlikely.
return &raftpb.HardState{
Term: n % 3,
Vote: n % 7,
Commit: n % 11,
}
},
emptySum: 13621293256077144893,
populatedSum: 13375098491754757572,
},
}

func TestBelowRaftProtos(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4394,7 +4394,7 @@ func (r *Replica) applyRaftCommand(
start := timeutil.Now()

var assertHS *raftpb.HardState
if util.RaceEnabled && rResult.Split != nil {
if util.RaceEnabled && rResult.Split != nil && util.IsMigrated() {
oldHS, err := loadHardState(ctx, r.store.Engine(), rResult.Split.RightDesc.RangeID)
if err != nil {
log.Fatalf(ctx, "unable to load HardState: %s", err)
Expand All @@ -4410,11 +4410,11 @@ func (r *Replica) applyRaftCommand(
// Load the HardState that was just committed (if any).
newHS, err := loadHardState(ctx, r.store.Engine(), rResult.Split.RightDesc.RangeID)
if err != nil {
panic(err)
log.Fatalf(ctx, "unable to load HardState: %s", err)
}
// Assert that nothing moved "backwards".
if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) {
log.Fatalf(ctx, "clobbered hard state: %s\n\npreviously: %s\noverwritten with: %s",
log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s",
pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS))
}
}
Expand Down
74 changes: 43 additions & 31 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3004,34 +3004,6 @@ func splitTrigger(
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to account for enginepb.MVCCStats's own stats impact")
}

// Writing the initial state is subtle since this also seeds the Raft
// group. We are writing to the right hand side's Raft group state in this
// batch so we need to synchronize with anything else that could be
// touching that replica's Raft state. Specifically, we want to prohibit an
// uninitialized Replica from receiving a message for the right hand side
// range and performing raft processing. This is achieved by serializing
// execution of uninitialized Replicas in Store.processRaft and ensuring
// that no uninitialized Replica is being processed while an initialized
// one (like the one currently being split) is being processed.
//
// Note also that it is crucial that writeInitialState *absorbs* an
// existing HardState (which might contain a cast vote). We load the
// existing HardState from the underlying engine instead of the batch
// because batch reads are from a snapshot taken at the point in time when
// the first read was performed on the batch. This last requirement is not
// currently needed due to the uninitialized Replica synchronization
// mentioned above, but future work will relax that synchronization, moving
// it from before the point that batch was created to this method. We want
// to see any writes to the hard state that were performed between the
// creation of the batch and that synchronization point. The only drawback
// to not reading from the batch is that we won't see any writes to the
// right hand side's hard state that were previously made in the batch
// (which should be impossible).
oldHS, err := loadHardState(ctx, rec.Engine(), split.RightDesc.RangeID)
if err != nil {
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to load hard state")
}
// Initialize the right-hand lease to be the same as the left-hand lease.
// Various pieces of code rely on a replica's lease never being unitialized,
// but it's more than that - it ensures that we properly initialize the
// timestamp cache, which is only populated on the lease holder, from that
Expand Down Expand Up @@ -3084,12 +3056,52 @@ func splitTrigger(
log.VEventf(ctx, 1, "LHS's TxnSpanGCThreshold of split is not set")
}

rightMS, err = writeInitialState(
ctx, batch, rightMS, split.RightDesc, oldHS, rightLease, gcThreshold, txnSpanGCThreshold,
// Writing the initial state is subtle since this also seeds the Raft
// group. It becomes more subtle due to proposer-evaluated Raft.
//
// We are writing to the right hand side's Raft group state in this
// batch so we need to synchronize with anything else that could be
// touching that replica's Raft state. Specifically, we want to prohibit
// an uninitialized Replica from receiving a message for the right hand
// side range and performing raft processing. This is achieved by
// serializing execution of uninitialized Replicas in Store.processRaft
// and ensuring that no uninitialized Replica is being processed while
// an initialized one (like the one currently being split) is being
// processed.
//
// Since the right hand side of the split's Raft group may already
// exist, we must be prepared to absorb an existing HardState. The Raft
// group may already exist because other nodes could already have
// processed the split and started talking to our node, prompting the
// creation of a Raft group that can vote and bump its term, but not
// much else: it can't receive snapshots because those intersect the
// pre-split range; it can't apply log commands because it needs a
// snapshot first.
//
// However, we can't absorb the right-hand side's HardState here because
// we only *evaluate* the proposal here, but by the time it is
// *applied*, the HardState could have changed. We do this downstream of
// Raft, in splitPostApply, where we write the last index and the
// HardState via a call to synthesizeRaftState. Here, we only call
// writeInitialReplicaState which essentially writes a ReplicaState
// only.
rightMS, err = writeInitialReplicaState(
ctx, batch, rightMS, split.RightDesc, rightLease, gcThreshold, txnSpanGCThreshold,
)
if err != nil {
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to write initial state")
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to write initial Replica state")
}

if !util.IsMigrated() {
// Write an initial state upstream of Raft even though it might
// clobber downstream simply because that's what 1.0 does and if we
// don't write it here, then a 1.0 version applying it as a follower
// won't write a HardState at all and is guaranteed to crash.
if err := makeReplicaStateLoader(split.RightDesc.RangeID).synthesizeRaftState(ctx, batch); err != nil {
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to synthesize initial Raft state")
}
}

bothDeltaMS.Subtract(preRightMS)
bothDeltaMS.Add(rightMS)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,10 @@ func (r *Replica) applySnapshot(
// increases the committed index as a result of the snapshot, but who is to
// say it isn't going to accept a snapshot which is identical to the current
// state?
//
// Note that since this snapshot comes from Raft, we don't have to synthesize
// the HardState -- Raft wouldn't ask us to update the HardState in incorrect
// ways.
if !raft.IsEmptyHardState(hs) {
if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatch, hs); err != nil {
return errors.Wrapf(err, "unable to persist HardState %+v", &hs)
Expand Down
84 changes: 61 additions & 23 deletions pkg/storage/replica_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,21 +459,45 @@ func (rsl replicaStateLoader) setHardState(
rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st)
}

// synthesizeHardState synthesizes a HardState from the given ReplicaState and
// any existing on-disk HardState in the context of a snapshot, while verifying
// that the application of the snapshot does not violate Raft invariants. It
// must be called after the supplied state and ReadWriter have been updated
// with the result of the snapshot.
// If there is an existing HardState, we must respect it and we must not apply
// a snapshot that would move the state backwards.
// synthesizeRaftState creates a Raft state which synthesizes both a HardState
// and a lastIndex from pre-seeded data in the engine (typically created via
// writeInitialReplicaState and, on a split, perhaps the activity of an
// uninitialized Raft group)
func (rsl replicaStateLoader) synthesizeRaftState(
ctx context.Context, eng engine.ReadWriter,
) error {
hs, err := rsl.loadHardState(ctx, eng)
if err != nil {
return err
}
truncState, err := rsl.loadTruncatedState(ctx, eng)
if err != nil {
return err
}
raftAppliedIndex, _, err := rsl.loadAppliedIndex(ctx, eng)
if err != nil {
return err
}
if err := rsl.synthesizeHardState(ctx, eng, hs, truncState, raftAppliedIndex); err != nil {
return err
}
return rsl.setLastIndex(ctx, eng, truncState.Index)
}

// synthesizeHardState synthesizes an on-disk HardState from the given input,
// taking care that a HardState compatible with the existing data is written.
func (rsl replicaStateLoader) synthesizeHardState(
ctx context.Context, eng engine.ReadWriter, s storagebase.ReplicaState, oldHS raftpb.HardState,
ctx context.Context,
eng engine.ReadWriter,
oldHS raftpb.HardState,
truncState roachpb.RaftTruncatedState,
raftAppliedIndex uint64,
) error {
newHS := raftpb.HardState{
Term: s.TruncatedState.Term,
Term: truncState.Term,
// Note that when applying a Raft snapshot, the applied index is
// equal to the Commit index represented by the snapshot.
Commit: s.RaftAppliedIndex,
Commit: raftAppliedIndex,
}

if oldHS.Commit > newHS.Commit {
Expand All @@ -495,19 +519,18 @@ func (rsl replicaStateLoader) synthesizeHardState(
return errors.Wrapf(err, "writing HardState %+v", &newHS)
}

// writeInitialState bootstraps a new Raft group (i.e. it is called when we
// bootstrap a Range, or when setting up the right hand side of a split).
// Its main task is to persist a consistent Raft (and associated Replica) state
// which does not start from zero but presupposes a few entries already having
// applied.
// The supplied MVCCStats are used for the Stats field after adjusting for
// persisting the state itself, and the updated stats are returned.
func writeInitialState(
// writeInitialReplicaState sets up a new Range, but without writing an
// associated Raft state (which must be written separately via
// synthesizeRaftState before instantiating a Replica). The main task is to
// persist a ReplicaState which does not start from zero but presupposes a few
// entries already having applied. The supplied MVCCStats are used for the Stats
// field after adjusting for persisting the state itself, and the updated stats
// are returned.
func writeInitialReplicaState(
ctx context.Context,
eng engine.ReadWriter,
ms enginepb.MVCCStats,
desc roachpb.RangeDescriptor,
oldHS raftpb.HardState,
lease roachpb.Lease,
gcThreshold hlc.Timestamp,
txnSpanGCThreshold hlc.Timestamp,
Expand Down Expand Up @@ -551,14 +574,29 @@ func writeInitialState(
return enginepb.MVCCStats{}, err
}

if err := rsl.synthesizeHardState(ctx, eng, s, oldHS); err != nil {
return newMS, nil
}

// writeInitialState calls writeInitialReplicaState followed by
// synthesizeRaftState. It is typically called during bootstrap. The supplied
// MVCCStats are used for the Stats field after adjusting for persisting the
// state itself, and the updated stats are returned.
func writeInitialState(
ctx context.Context,
eng engine.ReadWriter,
ms enginepb.MVCCStats,
desc roachpb.RangeDescriptor,
lease roachpb.Lease,
gcThreshold hlc.Timestamp,
txnSpanGCThreshold hlc.Timestamp,
) (enginepb.MVCCStats, error) {
newMS, err := writeInitialReplicaState(ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold)
if err != nil {
return enginepb.MVCCStats{}, err
}

if err := rsl.setLastIndex(ctx, eng, s.TruncatedState.Index); err != nil {
if err := makeReplicaStateLoader(desc.RangeID).synthesizeRaftState(ctx, eng); err != nil {
return enginepb.MVCCStats{}, err
}

return newMS, nil
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/storage/replica_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -60,12 +59,7 @@ func TestSynthesizeHardState(t *testing.T) {
func() {
batch := eng.NewBatch()
defer batch.Close()
testState := storagebase.ReplicaState{
Desc: testRangeDescriptor(),
TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm},
RaftAppliedIndex: test.RaftAppliedIndex,
}
rsl := makeReplicaStateLoader(testState.Desc.RangeID)
rsl := makeReplicaStateLoader(1)

if test.OldHS != nil {
if err := rsl.setHardState(context.Background(), batch, *test.OldHS); err != nil {
Expand All @@ -78,7 +72,9 @@ func TestSynthesizeHardState(t *testing.T) {
t.Fatal(err)
}

err = rsl.synthesizeHardState(context.Background(), batch, testState, oldHS)
err = rsl.synthesizeHardState(
context.Background(), batch, oldHS, roachpb.RaftTruncatedState{Term: test.TruncTerm}, test.RaftAppliedIndex,
)
if !testutils.IsError(err, test.Err) {
t.Fatalf("%d: expected %q got %v", i, test.Err, err)
} else if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"time"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/gogo/protobuf/proto"
"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -200,7 +199,6 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper,
tc.store.Engine(),
enginepb.MVCCStats{},
*testDesc,
raftpb.HardState{},
roachpb.Lease{},
hlc.Timestamp{},
hlc.Timestamp{},
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,7 @@ func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error {
return err
}

updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{})
updatedMS, err := writeInitialState(ctx, batch, *ms, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{})
if err != nil {
return err
}
Expand Down Expand Up @@ -1832,6 +1832,16 @@ func splitPostApply(
}
}

// Finish up the initialization of the RHS' RaftState now that we have
// committed the split Batch (which included the initialization of the
// ReplicaState). This will synthesize and persist the correct lastIndex and
// HardState.
if err := makeReplicaStateLoader(split.RightDesc.RangeID).synthesizeRaftState(
ctx, r.store.Engine(),
); err != nil {
log.Fatal(ctx, err)
}

// Finish initialization of the RHS.

r.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep
// Minimal amount of work to keep this deprecated machinery working: Write
// some required Raft keys.
if _, err := writeInitialState(
context.Background(), store.engine, enginepb.MVCCStats{}, *desc, raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{},
context.Background(), store.engine, enginepb.MVCCStats{}, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{},
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2263,7 +2263,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) {
}

if _, err := writeInitialState(
ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), raftpb.HardState{}, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{},
ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{},
); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 1741cd3

Please sign in to comment.