Skip to content

Commit

Permalink
storage: don't clobber HardState on splits
Browse files Browse the repository at this point in the history
Since the move to proposer-evaluated KV, we were potentially clobbering the
HardState on splits as we accidentally moved HardState synthesis upstream of
Raft as well. This change moves it downstream again.

Though not strictly necessary, writing lastIndex was moved as well. This is
cosmetic, though it aids @irfansharif's PR #16809, which moves lastIndex to
the Raft engine. After this PR, neither HardState nor last index keys are
added to the WriteBatch, so that pre-#16993 `TruncateLog` is the only remaining
command that does so (and it, too, won't keep doing that for long).

Note that there is no migration concern.

Fixes #16749.
  • Loading branch information
tbg committed Jul 17, 2017
1 parent 7c9e210 commit 7db82c4
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 192 deletions.
128 changes: 0 additions & 128 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2212,131 +2212,3 @@ func TestPushTxnQueueDependencyCycleWithRangeSplit(t *testing.T) {
})
}
}

func TestMinimal(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := &multiTestContext{}
storeCfg := storage.TestStoreConfig(nil)
// An aggressive tick interval lets groups communicate more and thus
// triggers test failures much more reliably. We can't go too aggressive
// or race tests never make any progress.
storeCfg.RaftTickInterval = 50 * time.Millisecond
storeCfg.RaftElectionTimeoutTicks = 2
currentTrigger := make(chan *roachpb.SplitTrigger, 1)
var seen struct {
syncutil.Mutex
sids map[storagebase.CmdIDKey][2]bool
}
seen.sids = make(map[storagebase.CmdIDKey][2]bool)

storeCfg.TestingKnobs.TestingEvalFilter = func(args storagebase.FilterArgs) *roachpb.Error {
et, ok := args.Req.(*roachpb.EndTransactionRequest)
if !ok || et.InternalCommitTrigger == nil {
return nil
}
trigger := protoutil.Clone(et.InternalCommitTrigger.GetSplitTrigger()).(*roachpb.SplitTrigger)
// The first time the trigger arrives (on each of the two stores),
// return a transaction retry. This allows us to pass the trigger to
// the goroutine creating faux incoming messages for the yet
// nonexistent right-hand-side, giving it a head start. This code looks
// fairly complicated since it wants to ensure that the two replicas
// don't diverge.
if trigger != nil && len(trigger.RightDesc.Replicas) == 2 && args.Hdr.Txn.Epoch == 0 {
seen.Lock()
defer seen.Unlock()
sid, sl := int(args.Sid)-1, seen.sids[args.CmdID]
if !sl[sid] {
sl[sid] = true
seen.sids[args.CmdID] = sl
} else {
return nil
}
select {
case currentTrigger <- trigger:
default:
}
return roachpb.NewError(
roachpb.NewReadWithinUncertaintyIntervalError(
args.Hdr.Timestamp, args.Hdr.Timestamp,
))
}
return nil
}

mtc.storeConfig = &storeCfg
defer mtc.Stop()
mtc.Start(t, 2)

leftRange := mtc.stores[0].LookupReplica(roachpb.RKey("a"), nil)

// Replicate the left range onto the second node. We don't wait since we
// don't actually care what the second node does. All we want is that the
// first node isn't surprised by messages from that node.
mtc.replicateRange(leftRange.RangeID, 1)

errChan := make(chan *roachpb.Error)

// Closed when the split goroutine is done.
splitDone := make(chan struct{})

go func() {
defer close(splitDone)

// Split the data range. The split keys are chosen so that they move
// towards "a" (so that the range being split is always the first
// range).
splitKey := roachpb.Key(encoding.EncodeVarintDescending([]byte("a"), int64(0)))
splitArgs := adminSplitArgs(keys.SystemMax, splitKey)
_, pErr := client.SendWrapped(context.Background(), mtc.distSenders[0], splitArgs)
errChan <- pErr
}()
go func() {
defer func() { errChan <- nil }()

trigger := <-currentTrigger // our own copy
// Make sure the first node is first for convenience.
replicas := trigger.RightDesc.Replicas
if replicas[0].NodeID > replicas[1].NodeID {
tmp := replicas[1]
replicas[1] = replicas[0]
replicas[0] = tmp
}

// Send a few vote requests which look like they're from the other
// node's right hand side of the split. This triggers a race which
// is discussed in #7600 (briefly, the creation of the right hand
// side in the split trigger was racing with the uninitialized
// version for the same group, resulting in clobbered HardState).
for term := uint64(1); ; term++ {
if err := mtc.stores[0].HandleRaftRequest(context.Background(),
&storage.RaftMessageRequest{
RangeID: trigger.RightDesc.RangeID,
ToReplica: replicas[0],
FromReplica: replicas[1],
Message: raftpb.Message{
Type: raftpb.MsgVote,
To: uint64(replicas[0].ReplicaID),
From: uint64(replicas[1].ReplicaID),
Term: term,
},
}, nil); err != nil {
t.Error(err)
}
select {
case <-splitDone:
return
case <-time.After(time.Microsecond):
// If we busy-loop here, we monopolize processRaftMu and the
// split takes a long time to complete. Sleeping reduces the
// chance that we hit the race, but it still shows up under
// stress.
}
}
}()

for i := 0; i < 2; i++ {
if pErr := <-errChan; pErr != nil {
t.Fatal(pErr)
}
}
}
61 changes: 31 additions & 30 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2981,34 +2981,6 @@ func splitTrigger(
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to account for enginepb.MVCCStats's own stats impact")
}

// Writing the initial state is subtle since this also seeds the Raft
// group. We are writing to the right hand side's Raft group state in this
// batch so we need to synchronize with anything else that could be
// touching that replica's Raft state. Specifically, we want to prohibit an
// uninitialized Replica from receiving a message for the right hand side
// range and performing raft processing. This is achieved by serializing
// execution of uninitialized Replicas in Store.processRaft and ensuring
// that no uninitialized Replica is being processed while an initialized
// one (like the one currently being split) is being processed.
//
// Note also that it is crucial that writeInitialState *absorbs* an
// existing HardState (which might contain a cast vote). We load the
// existing HardState from the underlying engine instead of the batch
// because batch reads are from a snapshot taken at the point in time when
// the first read was performed on the batch. This last requirement is not
// currently needed due to the uninitialized Replica synchronization
// mentioned above, but future work will relax that synchronization, moving
// it from before the point that batch was created to this method. We want
// to see any writes to the hard state that were performed between the
// creation of the batch and that synchronization point. The only drawback
// to not reading from the batch is that we won't see any writes to the
// right hand side's hard state that were previously made in the batch
// (which should be impossible).
oldHS, err := loadHardState(ctx, rec.Engine(), split.RightDesc.RangeID)
if err != nil {
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to load hard state")
}
// Initialize the right-hand lease to be the same as the left-hand lease.
// Various pieces of code rely on a replica's lease never being unitialized,
// but it's more than that - it ensures that we properly initialize the
// timestamp cache, which is only populated on the lease holder, from that
Expand Down Expand Up @@ -3061,8 +3033,37 @@ func splitTrigger(
log.VEventf(ctx, 1, "LHS's TxnSpanGCThreshold of split is not set")
}

rightMS, err = writeInitialState(
ctx, batch, rightMS, split.RightDesc, oldHS, rightLease, gcThreshold, txnSpanGCThreshold,
// Writing the initial state is subtle since this also seeds the Raft
// group. It becomes more subtle due to proposer-evaluated Raft.
//
// We are writing to the right hand side's Raft group state in this
// batch so we need to synchronize with anything else that could be
// touching that replica's Raft state. Specifically, we want to prohibit
// an uninitialized Replica from receiving a message for the right hand
// side range and performing raft processing. This is achieved by
// serializing execution of uninitialized Replicas in Store.processRaft
// and ensuring that no uninitialized Replica is being processed while
// an initialized one (like the one currently being split) is being
// processed.
//
// Since the right hand side of the split's Raft group may already
// exist, we must be prepared to absorb an existing HardState. The Raft
// group may already exist because other nodes could already have
// processed the split and started talking to our node, prompting the
// creation of a Raft group that can vote and bump its term, but not
// much else: it can't receive snapshots because those intersect the
// pre-split range; it can't apply log commands because it needs a
// snapshot first.
//
// However, we can't absorb the right-hand side's HardState here because
// we only *evaluate* the proposal here, but by the time it is
// *applied*, the HardState could have changed. We do this downstream of
// Raft, in splitPostApply, where we write the last index and the
// HardState via a call to synthesizeRaftState. Here, we only call
// writeInitialReplicaState which essentially writes a ReplicaState
// only.
rightMS, err = writeInitialReplicaState(
ctx, batch, rightMS, split.RightDesc, rightLease, gcThreshold, txnSpanGCThreshold,
)
if err != nil {
return enginepb.MVCCStats{}, EvalResult{}, errors.Wrap(err, "unable to write initial state")
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,10 @@ func (r *Replica) applySnapshot(
// increases the committed index as a result of the snapshot, but who is to
// say it isn't going to accept a snapshot which is identical to the current
// state?
//
// Note that since this snapshot comes from Raft, we don't have to synthesize
// the HardState -- Raft wouldn't ask us to update the HardState in incorrect
// ways.
if !raft.IsEmptyHardState(hs) {
if err := r.raftMu.stateLoader.setHardState(ctx, distinctBatch, hs); err != nil {
return errors.Wrapf(err, "unable to persist HardState %+v", &hs)
Expand Down
85 changes: 64 additions & 21 deletions pkg/storage/replica_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,21 +459,50 @@ func (rsl replicaStateLoader) setHardState(
rsl.RaftHardStateKey(), hlc.Timestamp{}, nil, &st)
}

// synthesizeRaftState creates a Raft state which synthesizes both a HardState
// and a lastIndex from pre-seeded data in the engine (typically created via
// writeInitialReplicaState and, on a split, perhaps the activity of an
// uninitialized Raft group)
func (rsl replicaStateLoader) synthesizeRaftState(
ctx context.Context, eng engine.ReadWriter,
) error {
hs, err := rsl.loadHardState(ctx, eng)
if err != nil {
return err
}
truncState, err := rsl.loadTruncatedState(ctx, eng)
if err != nil {
return err
}
raftAppliedIndex, _, err := rsl.loadAppliedIndex(ctx, eng)
if err != nil {
return err
}
if err := rsl.synthesizeHardState(ctx, eng, hs, truncState, raftAppliedIndex); err != nil {
return err
}
return rsl.setLastIndex(ctx, eng, truncState.Index)
}

// synthesizeHardState synthesizes a HardState from the given ReplicaState and
// any existing on-disk HardState in the context of a snapshot, while verifying
// any existing on-disk HardState in the context of a split, while verifying
// that the application of the snapshot does not violate Raft invariants. It
// must be called after the supplied state and ReadWriter have been updated
// with the result of the snapshot.
// If there is an existing HardState, we must respect it and we must not apply
// a snapshot that would move the state backwards.
// must be called after the supplied state and ReadWriter have been updated with
// the result of the snapshot. If there is an existing HardState, we must
// respect it and we must not apply a snapshot that would move the state
// backwards.
func (rsl replicaStateLoader) synthesizeHardState(
ctx context.Context, eng engine.ReadWriter, s storagebase.ReplicaState, oldHS raftpb.HardState,
ctx context.Context,
eng engine.ReadWriter,
oldHS raftpb.HardState,
truncState roachpb.RaftTruncatedState,
raftAppliedIndex uint64,
) error {
newHS := raftpb.HardState{
Term: s.TruncatedState.Term,
Term: truncState.Term,
// Note that when applying a Raft snapshot, the applied index is
// equal to the Commit index represented by the snapshot.
Commit: s.RaftAppliedIndex,
Commit: raftAppliedIndex,
}

if oldHS.Commit > newHS.Commit {
Expand All @@ -495,19 +524,18 @@ func (rsl replicaStateLoader) synthesizeHardState(
return errors.Wrapf(err, "writing HardState %+v", &newHS)
}

// writeInitialState bootstraps a new Raft group (i.e. it is called when we
// bootstrap a Range, or when setting up the right hand side of a split).
// Its main task is to persist a consistent Raft (and associated Replica) state
// which does not start from zero but presupposes a few entries already having
// applied.
// The supplied MVCCStats are used for the Stats field after adjusting for
// persisting the state itself, and the updated stats are returned.
func writeInitialState(
// writeInitialReplicaState sets up a new Range, but without writing an
// associated Raft state (which must be written separately via
// synthesizeRaftState before instantiating a Replica). The main task is to
// persist a ReplicaState which does not start from zero but presupposes a few
// entries already having applied. The supplied MVCCStats are used for the Stats
// field after adjusting for persisting the state itself, and the updated stats
// are returned.
func writeInitialReplicaState(
ctx context.Context,
eng engine.ReadWriter,
ms enginepb.MVCCStats,
desc roachpb.RangeDescriptor,
oldHS raftpb.HardState,
lease roachpb.Lease,
gcThreshold hlc.Timestamp,
txnSpanGCThreshold hlc.Timestamp,
Expand Down Expand Up @@ -551,14 +579,29 @@ func writeInitialState(
return enginepb.MVCCStats{}, err
}

if err := rsl.synthesizeHardState(ctx, eng, s, oldHS); err != nil {
return newMS, nil
}

// writeInitialState calls writeInitialReplicaState followed by
// synthesizeRaftState. It is typically called during bootstrap. The supplied
// MVCCStats are used for the Stats field after adjusting for persisting the
// state itself, and the updated stats are returned.
func writeInitialState(
ctx context.Context,
eng engine.ReadWriter,
ms enginepb.MVCCStats,
desc roachpb.RangeDescriptor,
lease roachpb.Lease,
gcThreshold hlc.Timestamp,
txnSpanGCThreshold hlc.Timestamp,
) (enginepb.MVCCStats, error) {
newMS, err := writeInitialReplicaState(ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold)
if err != nil {
return enginepb.MVCCStats{}, err
}

if err := rsl.setLastIndex(ctx, eng, s.TruncatedState.Index); err != nil {
if err := makeReplicaStateLoader(desc.RangeID).synthesizeRaftState(ctx, eng); err != nil {
return enginepb.MVCCStats{}, err
}

return newMS, nil
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/storage/replica_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -60,12 +59,7 @@ func TestSynthesizeHardState(t *testing.T) {
func() {
batch := eng.NewBatch()
defer batch.Close()
testState := storagebase.ReplicaState{
Desc: testRangeDescriptor(),
TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm},
RaftAppliedIndex: test.RaftAppliedIndex,
}
rsl := makeReplicaStateLoader(testState.Desc.RangeID)
rsl := makeReplicaStateLoader(1)

if test.OldHS != nil {
if err := rsl.setHardState(context.Background(), batch, *test.OldHS); err != nil {
Expand All @@ -78,7 +72,9 @@ func TestSynthesizeHardState(t *testing.T) {
t.Fatal(err)
}

err = rsl.synthesizeHardState(context.Background(), batch, testState, oldHS)
err = rsl.synthesizeHardState(
context.Background(), batch, oldHS, roachpb.RaftTruncatedState{Term: test.TruncTerm}, test.RaftAppliedIndex,
)
if !testutils.IsError(err, test.Err) {
t.Fatalf("%d: expected %q got %v", i, test.Err, err)
} else if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"time"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/gogo/protobuf/proto"
"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -200,7 +199,6 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper,
tc.store.Engine(),
enginepb.MVCCStats{},
*testDesc,
raftpb.HardState{},
roachpb.Lease{},
hlc.Timestamp{},
hlc.Timestamp{},
Expand Down
Loading

0 comments on commit 7db82c4

Please sign in to comment.