From 4d97585e0a3a953849a7439b88d7a4db7522ba76 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 12 Jul 2016 04:01:47 -0400 Subject: [PATCH 1/8] storage: update comment --- storage/replica.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/replica.go b/storage/replica.go index a415104e8b3d..85f0a7353522 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -622,8 +622,8 @@ func (r *Replica) Desc() *roachpb.RangeDescriptor { } // setDesc atomically sets the range's descriptor. This method calls -// processRangeDescriptorUpdate() to make the range manager handle the -// descriptor update. +// processRangeDescriptorUpdate() to make the Store handle the descriptor +// update. func (r *Replica) setDesc(desc *roachpb.RangeDescriptor) error { r.setDescWithoutProcessUpdate(desc) if r.store == nil { From 9acdc76c5101caffd1c2f345b8998c481521cec6 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 6 Jul 2016 11:58:49 -0400 Subject: [PATCH 2/8] storage: wrap error properly --- storage/replica_command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/replica_command.go b/storage/replica_command.go index 30c0abb451da..5a7f7dce256c 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -2476,7 +2476,7 @@ func (r *Replica) splitTrigger( // from it below. deltaMS, err = writeInitialState(batch, deltaMS, split.NewDesc) if err != nil { - return errors.Errorf("unable to write initial state: %s", err) + return errors.Wrap(err, "unable to write initial state") } rightMS := deltaMS From bf44eb93f6272d02d6840918c6314d6f283d3e21 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 8 Jul 2016 08:29:11 -0400 Subject: [PATCH 3/8] storage: always write a HardState As discovered in https://github.com/cockroachdb/cockroach/issues/6991#issuecomment-230054238, it's possible that we apply a Raft snapshot without writing a corresponding HardState since we write the snapshot in its own batch first and only then write a HardState. If that happens, the server is going to panic on restart: It will have a nontrivial first index, but a committed index of zero (from the empty HardState). This change prevents us from applying a snapshot when there is no HardState supplied along with it, except when applying a preemptive snapshot (in which case we synthesize a HardState). Ensure that the new HardState and Raft log does not break promises made by an existing one during preemptive snapshot application. Fixes #7619. storage: prevent loss of uncommitted log entries --- storage/replica.go | 2 +- storage/replica_command.go | 2 +- storage/replica_raftstorage.go | 110 +++++++++++++++++++++++---------- storage/replica_state.go | 62 +++++++++++-------- storage/replica_state_test.go | 94 ++++++++++++++++++++++++++++ storage/replica_test.go | 6 +- storage/store.go | 2 +- 7 files changed, 217 insertions(+), 61 deletions(-) create mode 100644 storage/replica_state_test.go diff --git a/storage/replica.go b/storage/replica.go index 85f0a7353522..0125edfcff9a 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -1459,7 +1459,7 @@ func (r *Replica) handleRaftReady() error { if !raft.IsEmptySnap(rd.Snapshot) { var err error - lastIndex, err = r.applySnapshot(rd.Snapshot, normalSnapshot) + lastIndex, err = r.applySnapshot(rd.Snapshot, rd.HardState) if err != nil { return err } diff --git a/storage/replica_command.go b/storage/replica_command.go index 5a7f7dce256c..060951bc5b42 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -556,7 +556,7 @@ func (r *Replica) EndTransaction( if err := r.runCommitTrigger(ctx, batch.(engine.Batch), ms, args, reply.Txn); err != nil { // TODO(tschottdorf): should an error here always amount to a // ReplicaCorruptionError? - log.Errorf("Range %d transaction commit trigger fail: %s", r.RangeID, err) + log.Error(errors.Wrapf(err, "range %d commit trigger", r.RangeID)) return reply, nil, err } } diff --git a/storage/replica_raftstorage.go b/storage/replica_raftstorage.go index 6cf10964f3ce..fa916d6bf1a0 100644 --- a/storage/replica_raftstorage.go +++ b/storage/replica_raftstorage.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "strconv" "time" "github.com/coreos/etcd/raft" @@ -517,16 +518,22 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } -type snapshotType int - -const ( - normalSnapshot snapshotType = iota - preemptiveSnapshot -) - -// applySnapshot updates the replica based on the given snapshot. -// Returns the new last index. -func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, error) { +// applySnapshot updates the replica based on the given snapshot and associated +// HardState. The supplied HardState must be empty if a preemptive snapshot is +// being applied (which is the case if and only if the ReplicaID is zero), in +// which case it will be synthesized from any existing on-disk HardState +// appropriately. For a regular snapshot, a HardState may or may not be +// supplied, though in the common case it is (since the commit index changes as +// a result of the snapshot application, so Raft will supply us with one). +// The HardState, if altered or supplied, is persisted along with the applied +// snapshot and the new last index is returned. +// +// During preemptive snapshots, we (must) run additional safety checks. For +// example, the HardState, Raft's view of term, vote and committed log entries, +// and other Raft state (like acknowledged log entries) must not move backwards. +func (r *Replica) applySnapshot( + snap raftpb.Snapshot, hs raftpb.HardState, +) (uint64, error) { // We use a separate batch to apply the snapshot since the Replica (and in // particular the last index) is updated after the batch commits. Using a // separate batch also allows for future optimization (such as using a @@ -542,21 +549,41 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, // Extract the updated range descriptor. desc := snapData.RangeDescriptor + // Fill the reservation if there was one for this range, regardless of + // whether the application succeeded. + defer r.store.bookie.Fill(desc.RangeID) r.mu.Lock() replicaID := r.mu.replicaID raftLogSize := r.mu.raftLogSize r.mu.Unlock() - log.Infof("replica %d received snapshot for range %d at index %d. "+ - "encoded size=%d, %d KV pairs, %d log entries", - replicaID, desc.RangeID, snap.Metadata.Index, + isPreemptive := replicaID == 0 + + replicaIDStr := "[?]" + snapType := "preemptive" + if !isPreemptive { + replicaIDStr = strconv.FormatInt(int64(replicaID), 10) + snapType = "Raft" + } + + log.Infof("replica %s applying %s snapshot for range %d at index %d "+ + "(encoded size=%d, %d KV pairs, %d log entries)", + replicaIDStr, snapType, desc.RangeID, snap.Metadata.Index, len(snap.Data), len(snapData.KV), len(snapData.LogEntries)) defer func(start time.Time) { - log.Infof("replica %d applied snapshot for range %d in %s", - replicaID, desc.RangeID, timeutil.Since(start)) + log.Infof("replica %s applied %s snapshot for range %d in %s", + replicaIDStr, snapType, desc.RangeID, timeutil.Since(start)) }(timeutil.Now()) + // Remember the old last index to verify that the snapshot doesn't wipe out + // log entries which have been acknowledged, which is possible with + // preemptive snapshots. We assert on it later in this call. + oldLastIndex, err := loadLastIndex(batch, desc.RangeID) + if err != nil { + return 0, errors.Wrap(err, "error loading last index") + } + // Delete everything in the range and recreate it from the snapshot. // We need to delete any old Raft log entries here because any log entries // that predate the snapshot will be orphaned and never truncated or GC'd. @@ -605,19 +632,47 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, } // As outlined above, last and applied index are the same after applying - // the snapshot. + // the snapshot (i.e. the snapshot has no uncommitted tail). if s.RaftAppliedIndex != snap.Metadata.Index { log.Fatalf("%d: snapshot resulted in appliedIndex=%d, metadataIndex=%d", s.Desc.RangeID, s.RaftAppliedIndex, snap.Metadata.Index) } - if replicaID == 0 { - // The replica is not part of the Raft group so we need to write the Raft - // hard state for the replica in order for the Raft state machine to start - // correctly. - if err := updateHardState(batch, s); err != nil { - return 0, err + if !raft.IsEmptyHardState(hs) { + if isPreemptive { + return 0, errors.Errorf("unexpected HardState %+v on preemptive snapshot", &hs) + } + if err := setHardState(batch, s.Desc.RangeID, hs); err != nil { + return 0, errors.Wrapf(err, "unable to persist HardState %+v", &hs) + } + } else if isPreemptive { + // Preemptive snapshots get special verifications (see #7619) of their + // last index and (necessarily synthesized) HardState. + if snap.Metadata.Index < oldLastIndex { + // We are not aware of a specific way in which this could happen + // (Raft itself should not emit such snapshots, and no Replica can + // ever apply two preemptive snapshots), but it doesn't hurt to + // check. + return 0, errors.Errorf("preemptive snapshot would erase acknowledged log entries") + } + oldHS, err := loadHardState(batch, s.Desc.RangeID) + if err != nil { + return 0, errors.Wrap(err, "unable to load HardState") + } + if snap.Metadata.Term < oldHS.Term { + return 0, errors.Errorf("cannot apply preemptive snapshot from past term %d at term %d", + snap.Metadata.Term, oldHS.Term) } + if err := synthesizeHardState(batch, s, oldHS); err != nil { + return 0, errors.Wrap(err, "unable to write synthesized HardState") + } + } else { + // Note that we don't require that Raft supply us with a nonempty + // HardState on a snapshot. We don't want to make that assumption + // because it's not guaranteed by the contract. Raft *must* send us + // a HardState when it increases the committed index as a result of the + // snapshot, but who is to say it isn't going to accept a snapshot + // which is identical to the current state? } if err := batch.Commit(); err != nil { @@ -650,23 +705,16 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, if err := r.updateRangeInfo(&desc); err != nil { panic(err) } - - // Fill the reservation if there was any one for this range. - r.store.bookie.Fill(desc.RangeID) - // Update the range descriptor. This is done last as this is the step that // makes the Replica visible in the Store. if err := r.setDesc(&desc); err != nil { panic(err) } - switch typ { - case normalSnapshot: + if !isPreemptive { r.store.metrics.rangeSnapshotsNormalApplied.Inc(1) - case preemptiveSnapshot: + } else { r.store.metrics.rangeSnapshotsPreemptiveApplied.Inc(1) - default: - panic("not reached") } return lastIndex, nil } diff --git a/storage/replica_state.go b/storage/replica_state.go index 82262d57ab29..cb98e9eb5c5b 100644 --- a/storage/replica_state.go +++ b/storage/replica_state.go @@ -24,8 +24,8 @@ import ( "github.com/cockroachdb/cockroach/storage/storagebase" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/protoutil" - "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -321,33 +321,39 @@ func setHardState( hlc.ZeroTimestamp, nil, &st) } -func updateHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error { - // Load a potentially existing HardState as we may need to preserve - // information about cast votes. For example, during a Split for which - // another node's new right-hand side has contacted us before our left-hand - // side called in here to create the group. - rangeID := s.Desc.RangeID - oldHS, err := loadHardState(eng, rangeID) - if err != nil { - return err - } - +// synthesizeHardState synthesizes a HardState from the given ReplicaState and +// any existing on-disk HardState in the context of a snapshot, while verifying +// that the application of the snapshot does not violate Raft invariants. It +// must be called after the supplied state and ReadWriter have been updated +// with the result of the snapshot. +// If there is an existing HardState, we must respect it and we must not apply +// a snapshot that would move the state backwards. +func synthesizeHardState( + eng engine.ReadWriter, s storagebase.ReplicaState, oldHS raftpb.HardState, +) error { newHS := raftpb.HardState{ - Term: s.TruncatedState.Term, + Term: s.TruncatedState.Term, + // Note that when applying a Raft snapshot, the applied index is + // equal to the Commit index represented by the snapshot. Commit: s.RaftAppliedIndex, } - if !raft.IsEmptyHardState(oldHS) { - if oldHS.Commit > newHS.Commit { - newHS.Commit = oldHS.Commit - } - if oldHS.Term > newHS.Term { - newHS.Term = oldHS.Term - } + if oldHS.Commit > newHS.Commit { + return errors.Errorf("can't decrease HardState.Commit from %d to %d", + oldHS.Commit, newHS.Commit) + } + if oldHS.Term > newHS.Term { + // The existing HardState is allowed to be ahead of us, which is + // relevant in practice for the split trigger. We already checked above + // that we're not rewinding the acknowledged index, and we haven't + // updated votes yet. + newHS.Term = oldHS.Term + } + // If the existing HardState voted in this term, remember that. + if oldHS.Term == newHS.Term { newHS.Vote = oldHS.Vote } - - return setHardState(eng, rangeID, newHS) + return errors.Wrapf(setHardState(eng, s.Desc.RangeID, newHS), "writing HardState %+v", &newHS) } // writeInitialState bootstraps a new Raft group (i.e. it is called when we @@ -360,7 +366,6 @@ func updateHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error { func writeInitialState( eng engine.ReadWriter, ms enginepb.MVCCStats, desc roachpb.RangeDescriptor, ) (enginepb.MVCCStats, error) { - rangeID := desc.RangeID var s storagebase.ReplicaState s.TruncatedState = &roachpb.RaftTruncatedState{ @@ -369,7 +374,7 @@ func writeInitialState( } s.RaftAppliedIndex = s.TruncatedState.Index s.Desc = &roachpb.RangeDescriptor{ - RangeID: rangeID, + RangeID: desc.RangeID, } s.Stats = ms @@ -378,11 +383,16 @@ func writeInitialState( return enginepb.MVCCStats{}, err } - if err := updateHardState(eng, s); err != nil { + oldHS, err := loadHardState(eng, desc.RangeID) + if err != nil { + return enginepb.MVCCStats{}, err + } + + if err := synthesizeHardState(eng, s, oldHS); err != nil { return enginepb.MVCCStats{}, err } - if err := setLastIndex(eng, rangeID, s.TruncatedState.Index); err != nil { + if err := setLastIndex(eng, desc.RangeID, s.TruncatedState.Index); err != nil { return enginepb.MVCCStats{}, err } diff --git a/storage/replica_state_test.go b/storage/replica_state_test.go new file mode 100644 index 000000000000..5e769737175a --- /dev/null +++ b/storage/replica_state_test.go @@ -0,0 +1,94 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com) + +package storage + +import ( + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/storage/storagebase" + "github.com/cockroachdb/cockroach/testutils" + "github.com/cockroachdb/cockroach/util/leaktest" + "github.com/cockroachdb/cockroach/util/stop" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestSynthesizeHardState(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop() + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20, stopper) + + tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4} + + testCases := []struct { + TruncTerm, RaftAppliedIndex uint64 + OldHS *raftpb.HardState + NewHS raftpb.HardState + Err string + }{ + {OldHS: nil, TruncTerm: 42, RaftAppliedIndex: 24, NewHS: raftpb.HardState{Term: 42, Vote: 0, Commit: 24}}, + // Can't wind back the committed index of the new HardState. + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit - 1, Err: "can't decrease HardState.Commit"}, + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit, NewHS: tHS}, + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit + 1, NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1}}, + // Higher Term is picked up, but vote isn't carried over when the term + // changes. + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit, TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit}}, + } + + for i, test := range testCases { + func() { + batch := eng.NewBatch() + defer batch.Close() + testState := storagebase.ReplicaState{ + Desc: testRangeDescriptor(), + TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm}, + RaftAppliedIndex: test.RaftAppliedIndex, + } + + if test.OldHS != nil { + if err := setHardState(batch, testState.Desc.RangeID, *test.OldHS); err != nil { + t.Fatal(err) + } + } + + oldHS, err := loadHardState(batch, testState.Desc.RangeID) + if err != nil { + t.Fatal(err) + } + + if err := synthesizeHardState(batch, testState, oldHS); ((err == nil) != (test.Err == "")) || + (err != nil && !testutils.IsError(err, test.Err)) { + t.Fatalf("%d: %v (expected '%s')", i, err, test.Err) + } else if err != nil { + // No further checking if we expected an error and got it. + return + } + + hs, err := loadHardState(batch, testState.Desc.RangeID) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(hs, test.NewHS) { + t.Fatalf("%d: expected %+v, got %+v", i, &test.NewHS, &hs) + } + }() + } +} diff --git a/storage/replica_test.go b/storage/replica_test.go index 895b162291a1..7d6cd9f1734c 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -33,6 +33,7 @@ import ( "golang.org/x/net/context" "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" @@ -6168,7 +6169,10 @@ func TestReserveAndApplySnapshot(t *testing.T) { t.Fatalf("Can't reserve the replica") } checkReservations(t, 1) - if _, err := firstRng.applySnapshot(snap, normalSnapshot); err != nil { + + // Apply a snapshot and check the reservation was filled. Note that this + // out-of-band application could be a root cause if this test ever crashes. + if _, err := firstRng.applySnapshot(snap, raftpb.HardState{}); err != nil { t.Fatal(err) } checkReservations(t, 0) diff --git a/storage/store.go b/storage/store.go index 83f34d4c7858..1e8d54f68c39 100644 --- a/storage/store.go +++ b/storage/store.go @@ -2157,7 +2157,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error { // the raft group (i.e. replicas with an ID of 0). This is the only // operation that can be performed on a replica before it is part of the // raft group. - _, err := r.applySnapshot(req.Message.Snapshot, preemptiveSnapshot) + _, err := r.applySnapshot(req.Message.Snapshot, raftpb.HardState{}) return err } // We disallow non-snapshot messages to replica ID 0. Note that From 739e231ba9f5f33a16410fbcf18f26925446f676 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 6 Jul 2016 16:36:08 -0400 Subject: [PATCH 4/8] storage: add a migration for missing HardState See #6991. It's possible that the HardState is missing after a snapshot was applied (so there is a TruncatedState). In this case, synthesize a HardState (simply setting everything that was in the snapshot to committed). Having lost the original HardState can theoretically mean that the replica was further ahead or had voted, and so there's no guarantee that this will be correct. But it will be correct in the majority of cases, and some state *has* to be recovered. To illustrate this in the scenario in #6991: There, we (presumably) have applied an empty snapshot (no real data, but a Raft log which starts and ends at index ten as designated by its TruncatedState). We don't have a HardState, so Raft will crash because its Commit index zero isn't in line with the fact that our Raft log starts only at index ten. The migration sees that there is a TruncatedState, but no HardState. It will synthesize a HardState with Commit:10 (and the corresponding Term from the TruncatedState, which is five). --- storage/store.go | 85 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/storage/store.go b/storage/store.go index 1e8d54f68c39..d230322d832d 100644 --- a/storage/store.go +++ b/storage/store.go @@ -873,38 +873,73 @@ func IterateRangeDescriptors( return err } -// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica -// always has a complete Raft state on disk. Prior versions may not -// have that, which causes issues due to the fact that we used to -// synthesize a TruncatedState and do so no more. To make up for -// that, write a missing TruncatedState here. That key is in the -// replicated state, but since during a cluster upgrade, all nodes -// do it, it's fine (and we never CPut on that key, so anything in -// the Raft pipeline will simply overwrite it). +// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica always has +// a complete Raft state on disk. Prior versions may not have that, which +// causes issues due to the fact that we used to synthesize a TruncatedState +// and do so no more. To make up for that, write a missing TruncatedState here. +// That key is in the replicated state, but since during a cluster upgrade, all +// nodes do it, it's fine (and we never CPut on that key, so anything in the +// Raft pipeline will simply overwrite it). // -// TODO(tschottdorf): test this method. -func (s *Store) migrate7310(desc roachpb.RangeDescriptor) { - if !desc.IsInitialized() { - log.Fatalf("found uninitialized descriptor on range: %+v", desc) - } +// Migration(tschottdorf): See #6991. It's possible that the HardState is +// missing after a snapshot was applied (so there is a TruncatedState). In this +// case, synthesize a HardState (simply setting everything that was in the +// snapshot to committed). Having lost the original HardState can theoretically +// mean that the replica was further ahead or had voted, and so there's no +// guarantee that this will be correct. But it will be correct in the majority +// of cases, and some state *has* to be recovered. +func (s *Store) migrate7310And6991(desc roachpb.RangeDescriptor) { batch := s.engine.NewBatch() + if err := migrate7310And6991(batch, desc); err != nil { + log.Fatal(errors.Wrap(err, "during migration")) + } + if err := batch.Commit(); err != nil { + log.Fatal(errors.Wrap(err, "could not migrate Raft state")) + } +} + +// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica always has +// a complete Raft state on disk. Prior versions may not have that, which +// causes issues due to the fact that we used to synthesize a TruncatedState +// and do so no more. To make up for that, write a missing TruncatedState here. +// That key is in the replicated state, but since during a cluster upgrade, all +// nodes do it, it's fine (and we never CPut on that key, so anything in the +// Raft pipeline will simply overwrite it). +// +// Migration(tschottdorf): See #6991. It's possible that the HardState is +// missing after a snapshot was applied (so there is a TruncatedState). In this +// case, synthesize a HardState (simply setting everything that was in the +// snapshot to committed). Having lost the original HardState can theoretically +// mean that the replica was further ahead or had voted, and so there's no +// guarantee that this will be correct. But it will be correct in the majority +// of cases, and some state *has* to be recovered. +func migrate7310And6991(batch engine.ReadWriter, desc roachpb.RangeDescriptor) error { state, err := loadState(batch, &desc) if err != nil { - log.Fatalf("could not migrate truncated state: %s", err) + return errors.Wrap(err, "could not migrate TruncatedState: %s") } - if (*state.TruncatedState != roachpb.RaftTruncatedState{}) { - return + if (*state.TruncatedState == roachpb.RaftTruncatedState{}) { + state.TruncatedState.Term = raftInitialLogTerm + state.TruncatedState.Index = raftInitialLogIndex + if _, err := saveState(batch, state); err != nil { + return errors.Wrapf(err, "could not migrate TruncatedState to %+v", &state.TruncatedState) + } + log.Warningf("migration: synthesized TruncatedState for %+v", desc) } - state.TruncatedState.Term = raftInitialLogTerm - state.TruncatedState.Index = raftInitialLogIndex - if _, err := saveState(batch, state); err != nil { - log.Fatalf("could not migrate truncated state: %s", err) - } - if err := batch.Commit(); err != nil { - log.Fatalf("could not migrate truncated state: %s", err) + hs, err := loadHardState(batch, desc.RangeID) + if err != nil { + return errors.Wrap(err, "unable to load HardState") + } + // Only update the HardState when there is a nontrivial Commit field. We + // don't have a snapshot here, so we could wind up lowering the commit + // index (which would error out and fatal us). + if hs.Commit == 0 { + if err := synthesizeHardState(batch, state); err != nil { + return errors.Wrap(err, "could not migrate HardState") + } } - log.Warningf("migration: synthesized truncated state for %+v", desc) + return nil } // Start the engine, set the GC and read the StoreIdent. @@ -994,7 +1029,7 @@ func (s *Store) Start(stopper *stop.Stopper) error { return false, s.destroyReplicaData(&desc) } - s.migrate7310(desc) + s.migrate7310And6991(desc) rng, err := NewReplica(&desc, s, 0) if err != nil { From 5b99900a9937c37400e40db634c641449fe69285 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 8 Jul 2016 15:51:00 -0400 Subject: [PATCH 5/8] storage: unit test migrations --- storage/migration_test.go | 72 +++++++++++++++++++++++++++++++++++++++ storage/store.go | 26 ++++---------- 2 files changed, 79 insertions(+), 19 deletions(-) create mode 100644 storage/migration_test.go diff --git a/storage/migration_test.go b/storage/migration_test.go new file mode 100644 index 000000000000..288285f8aad3 --- /dev/null +++ b/storage/migration_test.go @@ -0,0 +1,72 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com) + +package storage + +import ( + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/util/leaktest" + "github.com/cockroachdb/cockroach/util/stop" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestMigrate7310And6991(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop() + eng := engine.NewInMem(roachpb.Attributes{}, 1<<10, stopper) + + desc := *testRangeDescriptor() + + if err := migrate7310And6991(eng, desc); err != nil { + t.Fatal(err) + } + + ts, err := loadTruncatedState(eng, desc.RangeID) + if err != nil { + t.Fatal(err) + } + + hs, err := loadHardState(eng, desc.RangeID) + if err != nil { + t.Fatal(err) + } + + rApplied, lApplied, err := loadAppliedIndex(eng, desc.RangeID) + if err != nil { + t.Fatal(err) + } + + expTS := roachpb.RaftTruncatedState{Term: raftInitialLogTerm, Index: raftInitialLogIndex} + if expTS != ts { + t.Errorf("expected %+v, got %+v", &expTS, &ts) + } + + expHS := raftpb.HardState{Term: raftInitialLogTerm, Commit: raftInitialLogIndex} + if !reflect.DeepEqual(expHS, hs) { + t.Errorf("expected %+v, got %+v", &expHS, &hs) + } + + expRApplied, expLApplied := uint64(raftInitialLogIndex), uint64(0) + if expRApplied != rApplied || expLApplied != lApplied { + t.Errorf("expected (raftApplied,leaseApplied)=(%d,%d), got (%d,%d)", + expRApplied, expLApplied, rApplied, lApplied) + } +} diff --git a/storage/store.go b/storage/store.go index d230322d832d..8155abed4e91 100644 --- a/storage/store.go +++ b/storage/store.go @@ -873,22 +873,7 @@ func IterateRangeDescriptors( return err } -// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica always has -// a complete Raft state on disk. Prior versions may not have that, which -// causes issues due to the fact that we used to synthesize a TruncatedState -// and do so no more. To make up for that, write a missing TruncatedState here. -// That key is in the replicated state, but since during a cluster upgrade, all -// nodes do it, it's fine (and we never CPut on that key, so anything in the -// Raft pipeline will simply overwrite it). -// -// Migration(tschottdorf): See #6991. It's possible that the HardState is -// missing after a snapshot was applied (so there is a TruncatedState). In this -// case, synthesize a HardState (simply setting everything that was in the -// snapshot to committed). Having lost the original HardState can theoretically -// mean that the replica was further ahead or had voted, and so there's no -// guarantee that this will be correct. But it will be correct in the majority -// of cases, and some state *has* to be recovered. -func (s *Store) migrate7310And6991(desc roachpb.RangeDescriptor) { +func (s *Store) migrate(desc roachpb.RangeDescriptor) { batch := s.engine.NewBatch() if err := migrate7310And6991(batch, desc); err != nil { log.Fatal(errors.Wrap(err, "during migration")) @@ -921,6 +906,7 @@ func migrate7310And6991(batch engine.ReadWriter, desc roachpb.RangeDescriptor) e if (*state.TruncatedState == roachpb.RaftTruncatedState{}) { state.TruncatedState.Term = raftInitialLogTerm state.TruncatedState.Index = raftInitialLogIndex + state.RaftAppliedIndex = raftInitialLogIndex if _, err := saveState(batch, state); err != nil { return errors.Wrapf(err, "could not migrate TruncatedState to %+v", &state.TruncatedState) } @@ -935,7 +921,7 @@ func migrate7310And6991(batch engine.ReadWriter, desc roachpb.RangeDescriptor) e // don't have a snapshot here, so we could wind up lowering the commit // index (which would error out and fatal us). if hs.Commit == 0 { - if err := synthesizeHardState(batch, state); err != nil { + if err := synthesizeHardState(batch, state, hs); err != nil { return errors.Wrap(err, "could not migrate HardState") } } @@ -1028,8 +1014,10 @@ func (s *Store) Start(stopper *stop.Stopper) error { // (which is necessary to have a non-nil raft group) return false, s.destroyReplicaData(&desc) } - - s.migrate7310And6991(desc) + if !desc.IsInitialized() { + return false, errors.Errorf("found uninitialized RangeDescriptor: %+v", desc) + } + s.migrate(desc) rng, err := NewReplica(&desc, s, 0) if err != nil { From 3c1d880d8f8edff737a887903f27ed7e63274209 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 8 Jul 2016 15:52:46 -0400 Subject: [PATCH 6/8] storage: move migration to own file --- storage/migration.go | 69 ++++++++++++++++++++++++++++++++++++++++++++ storage/store.go | 45 ----------------------------- 2 files changed, 69 insertions(+), 45 deletions(-) create mode 100644 storage/migration.go diff --git a/storage/migration.go b/storage/migration.go new file mode 100644 index 000000000000..c8507dea26b0 --- /dev/null +++ b/storage/migration.go @@ -0,0 +1,69 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com) + +package storage + +import ( + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/util/log" + "github.com/pkg/errors" +) + +// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica always has +// a complete Raft state on disk. Prior versions may not have that, which +// causes issues due to the fact that we used to synthesize a TruncatedState +// and do so no more. To make up for that, write a missing TruncatedState here. +// That key is in the replicated state, but since during a cluster upgrade, all +// nodes do it, it's fine (and we never CPut on that key, so anything in the +// Raft pipeline will simply overwrite it). +// +// Migration(tschottdorf): See #6991. It's possible that the HardState is +// missing after a snapshot was applied (so there is a TruncatedState). In this +// case, synthesize a HardState (simply setting everything that was in the +// snapshot to committed). Having lost the original HardState can theoretically +// mean that the replica was further ahead or had voted, and so there's no +// guarantee that this will be correct. But it will be correct in the majority +// of cases, and some state *has* to be recovered. +func migrate7310And6991(batch engine.ReadWriter, desc roachpb.RangeDescriptor) error { + state, err := loadState(batch, &desc) + if err != nil { + return errors.Wrap(err, "could not migrate TruncatedState: %s") + } + if (*state.TruncatedState == roachpb.RaftTruncatedState{}) { + state.TruncatedState.Term = raftInitialLogTerm + state.TruncatedState.Index = raftInitialLogIndex + state.RaftAppliedIndex = raftInitialLogIndex + if _, err := saveState(batch, state); err != nil { + return errors.Wrapf(err, "could not migrate TruncatedState to %+v", &state.TruncatedState) + } + log.Warningf("migration: synthesized TruncatedState for %+v", desc) + } + + hs, err := loadHardState(batch, desc.RangeID) + if err != nil { + return errors.Wrap(err, "unable to load HardState") + } + // Only update the HardState when there is a nontrivial Commit field. We + // don't have a snapshot here, so we could wind up lowering the commit + // index (which would error out and fatal us). + if hs.Commit == 0 { + if err := synthesizeHardState(batch, state, hs); err != nil { + return errors.Wrap(err, "could not migrate HardState") + } + } + return nil +} diff --git a/storage/store.go b/storage/store.go index 8155abed4e91..241938954475 100644 --- a/storage/store.go +++ b/storage/store.go @@ -883,51 +883,6 @@ func (s *Store) migrate(desc roachpb.RangeDescriptor) { } } -// MIGRATION(tschottdorf): As of #7310, we make sure that a Replica always has -// a complete Raft state on disk. Prior versions may not have that, which -// causes issues due to the fact that we used to synthesize a TruncatedState -// and do so no more. To make up for that, write a missing TruncatedState here. -// That key is in the replicated state, but since during a cluster upgrade, all -// nodes do it, it's fine (and we never CPut on that key, so anything in the -// Raft pipeline will simply overwrite it). -// -// Migration(tschottdorf): See #6991. It's possible that the HardState is -// missing after a snapshot was applied (so there is a TruncatedState). In this -// case, synthesize a HardState (simply setting everything that was in the -// snapshot to committed). Having lost the original HardState can theoretically -// mean that the replica was further ahead or had voted, and so there's no -// guarantee that this will be correct. But it will be correct in the majority -// of cases, and some state *has* to be recovered. -func migrate7310And6991(batch engine.ReadWriter, desc roachpb.RangeDescriptor) error { - state, err := loadState(batch, &desc) - if err != nil { - return errors.Wrap(err, "could not migrate TruncatedState: %s") - } - if (*state.TruncatedState == roachpb.RaftTruncatedState{}) { - state.TruncatedState.Term = raftInitialLogTerm - state.TruncatedState.Index = raftInitialLogIndex - state.RaftAppliedIndex = raftInitialLogIndex - if _, err := saveState(batch, state); err != nil { - return errors.Wrapf(err, "could not migrate TruncatedState to %+v", &state.TruncatedState) - } - log.Warningf("migration: synthesized TruncatedState for %+v", desc) - } - - hs, err := loadHardState(batch, desc.RangeID) - if err != nil { - return errors.Wrap(err, "unable to load HardState") - } - // Only update the HardState when there is a nontrivial Commit field. We - // don't have a snapshot here, so we could wind up lowering the commit - // index (which would error out and fatal us). - if hs.Commit == 0 { - if err := synthesizeHardState(batch, state, hs); err != nil { - return errors.Wrap(err, "could not migrate HardState") - } - } - return nil -} - // Start the engine, set the GC and read the StoreIdent. func (s *Store) Start(stopper *stop.Stopper) error { s.stopper = stopper From 09da3fb0ca800e3db422d70a7207dc290e7cc7af Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 12 Jul 2016 04:20:29 -0400 Subject: [PATCH 7/8] storage: move snapshot benchmark --- storage/{replica_raftstorage_test.go => client_bench_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename storage/{replica_raftstorage_test.go => client_bench_test.go} (100%) diff --git a/storage/replica_raftstorage_test.go b/storage/client_bench_test.go similarity index 100% rename from storage/replica_raftstorage_test.go rename to storage/client_bench_test.go From 7f2218c828a32b5ad731620819c147b1fa8b4d61 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 12 Jul 2016 04:46:11 -0400 Subject: [PATCH 8/8] storage: test preemptive snapshot refusal --- storage/replica_raftstorage.go | 15 +++--- storage/replica_raftstorage_test.go | 83 +++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 storage/replica_raftstorage_test.go diff --git a/storage/replica_raftstorage.go b/storage/replica_raftstorage.go index fa916d6bf1a0..c6318df529bc 100644 --- a/storage/replica_raftstorage.go +++ b/storage/replica_raftstorage.go @@ -583,6 +583,11 @@ func (r *Replica) applySnapshot( if err != nil { return 0, errors.Wrap(err, "error loading last index") } + // Similar strategy for the HardState. + oldHardState, err := loadHardState(batch, desc.RangeID) + if err != nil { + return 0, errors.Wrap(err, "unable to load HardState") + } // Delete everything in the range and recreate it from the snapshot. // We need to delete any old Raft log entries here because any log entries @@ -655,15 +660,11 @@ func (r *Replica) applySnapshot( // check. return 0, errors.Errorf("preemptive snapshot would erase acknowledged log entries") } - oldHS, err := loadHardState(batch, s.Desc.RangeID) - if err != nil { - return 0, errors.Wrap(err, "unable to load HardState") - } - if snap.Metadata.Term < oldHS.Term { + if snap.Metadata.Term < oldHardState.Term { return 0, errors.Errorf("cannot apply preemptive snapshot from past term %d at term %d", - snap.Metadata.Term, oldHS.Term) + snap.Metadata.Term, oldHardState.Term) } - if err := synthesizeHardState(batch, s, oldHS); err != nil { + if err := synthesizeHardState(batch, s, oldHardState); err != nil { return 0, errors.Wrap(err, "unable to write synthesized HardState") } } else { diff --git a/storage/replica_raftstorage_test.go b/storage/replica_raftstorage_test.go new file mode 100644 index 000000000000..daff0d0f4ed9 --- /dev/null +++ b/storage/replica_raftstorage_test.go @@ -0,0 +1,83 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com) + +package storage + +import ( + "testing" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/testutils" + "github.com/cockroachdb/cockroach/util/leaktest" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestApplySnapshotDenyPreemptive(t *testing.T) { + defer leaktest.AfterTest(t)() + + var tc testContext + tc.Start(t) + defer tc.Stop() + + key := roachpb.RKey("a") + realRng := tc.store.LookupReplica(key, nil) + + // Use Raft to get a nontrivial term for our snapshot. + if pErr := realRng.redirectOnOrAcquireLease(context.Background()); pErr != nil { + t.Fatal(pErr) + } + + snap, err := realRng.GetSnapshot() + if err != nil { + t.Fatal(err) + } + + // Make sure that the Term is behind our first range term (raftInitialLogTerm) + snap.Metadata.Term-- + + // Create an uninitialized version of the first range. This is only ok + // because in the case we test, there's an error (and so we don't clobber + // our actual first range in the Store). If we want snapshots to apply + // successfully during tests, we need to adapt the snapshots to a new + // RangeID first and generally do a lot more work. + rng, err := NewReplica(&roachpb.RangeDescriptor{RangeID: 1}, tc.store, 0) + if err != nil { + t.Fatal(err) + } + + if _, err := rng.applySnapshot(snap, raftpb.HardState{}); !testutils.IsError( + err, "cannot apply preemptive snapshot from past term", + ) { + t.Fatal(err) + } + + // Do something that extends the Raft log past what we have in the + // snapshot. + put := putArgs(roachpb.Key("a"), []byte("foo")) + if _, pErr := tc.SendWrapped(&put); pErr != nil { + t.Fatal(pErr) + } + snap.Metadata.Term++ // restore the "real" term of the snapshot + + if _, err := rng.applySnapshot(snap, raftpb.HardState{}); !testutils.IsError( + err, "would erase acknowledged log entries", + ) { + t.Fatal(err) + } + +}