From bf44eb93f6272d02d6840918c6314d6f283d3e21 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 8 Jul 2016 08:29:11 -0400 Subject: [PATCH] storage: always write a HardState As discovered in https://github.com/cockroachdb/cockroach/issues/6991#issuecomment-230054238, it's possible that we apply a Raft snapshot without writing a corresponding HardState since we write the snapshot in its own batch first and only then write a HardState. If that happens, the server is going to panic on restart: It will have a nontrivial first index, but a committed index of zero (from the empty HardState). This change prevents us from applying a snapshot when there is no HardState supplied along with it, except when applying a preemptive snapshot (in which case we synthesize a HardState). Ensure that the new HardState and Raft log does not break promises made by an existing one during preemptive snapshot application. Fixes #7619. storage: prevent loss of uncommitted log entries --- storage/replica.go | 2 +- storage/replica_command.go | 2 +- storage/replica_raftstorage.go | 110 +++++++++++++++++++++++---------- storage/replica_state.go | 62 +++++++++++-------- storage/replica_state_test.go | 94 ++++++++++++++++++++++++++++ storage/replica_test.go | 6 +- storage/store.go | 2 +- 7 files changed, 217 insertions(+), 61 deletions(-) create mode 100644 storage/replica_state_test.go diff --git a/storage/replica.go b/storage/replica.go index 85f0a7353522..0125edfcff9a 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -1459,7 +1459,7 @@ func (r *Replica) handleRaftReady() error { if !raft.IsEmptySnap(rd.Snapshot) { var err error - lastIndex, err = r.applySnapshot(rd.Snapshot, normalSnapshot) + lastIndex, err = r.applySnapshot(rd.Snapshot, rd.HardState) if err != nil { return err } diff --git a/storage/replica_command.go b/storage/replica_command.go index 5a7f7dce256c..060951bc5b42 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -556,7 +556,7 @@ func (r *Replica) EndTransaction( if err := r.runCommitTrigger(ctx, batch.(engine.Batch), ms, args, reply.Txn); err != nil { // TODO(tschottdorf): should an error here always amount to a // ReplicaCorruptionError? - log.Errorf("Range %d transaction commit trigger fail: %s", r.RangeID, err) + log.Error(errors.Wrapf(err, "range %d commit trigger", r.RangeID)) return reply, nil, err } } diff --git a/storage/replica_raftstorage.go b/storage/replica_raftstorage.go index 6cf10964f3ce..fa916d6bf1a0 100644 --- a/storage/replica_raftstorage.go +++ b/storage/replica_raftstorage.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "strconv" "time" "github.com/coreos/etcd/raft" @@ -517,16 +518,22 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } -type snapshotType int - -const ( - normalSnapshot snapshotType = iota - preemptiveSnapshot -) - -// applySnapshot updates the replica based on the given snapshot. -// Returns the new last index. -func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, error) { +// applySnapshot updates the replica based on the given snapshot and associated +// HardState. The supplied HardState must be empty if a preemptive snapshot is +// being applied (which is the case if and only if the ReplicaID is zero), in +// which case it will be synthesized from any existing on-disk HardState +// appropriately. For a regular snapshot, a HardState may or may not be +// supplied, though in the common case it is (since the commit index changes as +// a result of the snapshot application, so Raft will supply us with one). +// The HardState, if altered or supplied, is persisted along with the applied +// snapshot and the new last index is returned. +// +// During preemptive snapshots, we (must) run additional safety checks. For +// example, the HardState, Raft's view of term, vote and committed log entries, +// and other Raft state (like acknowledged log entries) must not move backwards. +func (r *Replica) applySnapshot( + snap raftpb.Snapshot, hs raftpb.HardState, +) (uint64, error) { // We use a separate batch to apply the snapshot since the Replica (and in // particular the last index) is updated after the batch commits. Using a // separate batch also allows for future optimization (such as using a @@ -542,21 +549,41 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, // Extract the updated range descriptor. desc := snapData.RangeDescriptor + // Fill the reservation if there was one for this range, regardless of + // whether the application succeeded. + defer r.store.bookie.Fill(desc.RangeID) r.mu.Lock() replicaID := r.mu.replicaID raftLogSize := r.mu.raftLogSize r.mu.Unlock() - log.Infof("replica %d received snapshot for range %d at index %d. "+ - "encoded size=%d, %d KV pairs, %d log entries", - replicaID, desc.RangeID, snap.Metadata.Index, + isPreemptive := replicaID == 0 + + replicaIDStr := "[?]" + snapType := "preemptive" + if !isPreemptive { + replicaIDStr = strconv.FormatInt(int64(replicaID), 10) + snapType = "Raft" + } + + log.Infof("replica %s applying %s snapshot for range %d at index %d "+ + "(encoded size=%d, %d KV pairs, %d log entries)", + replicaIDStr, snapType, desc.RangeID, snap.Metadata.Index, len(snap.Data), len(snapData.KV), len(snapData.LogEntries)) defer func(start time.Time) { - log.Infof("replica %d applied snapshot for range %d in %s", - replicaID, desc.RangeID, timeutil.Since(start)) + log.Infof("replica %s applied %s snapshot for range %d in %s", + replicaIDStr, snapType, desc.RangeID, timeutil.Since(start)) }(timeutil.Now()) + // Remember the old last index to verify that the snapshot doesn't wipe out + // log entries which have been acknowledged, which is possible with + // preemptive snapshots. We assert on it later in this call. + oldLastIndex, err := loadLastIndex(batch, desc.RangeID) + if err != nil { + return 0, errors.Wrap(err, "error loading last index") + } + // Delete everything in the range and recreate it from the snapshot. // We need to delete any old Raft log entries here because any log entries // that predate the snapshot will be orphaned and never truncated or GC'd. @@ -605,19 +632,47 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, } // As outlined above, last and applied index are the same after applying - // the snapshot. + // the snapshot (i.e. the snapshot has no uncommitted tail). if s.RaftAppliedIndex != snap.Metadata.Index { log.Fatalf("%d: snapshot resulted in appliedIndex=%d, metadataIndex=%d", s.Desc.RangeID, s.RaftAppliedIndex, snap.Metadata.Index) } - if replicaID == 0 { - // The replica is not part of the Raft group so we need to write the Raft - // hard state for the replica in order for the Raft state machine to start - // correctly. - if err := updateHardState(batch, s); err != nil { - return 0, err + if !raft.IsEmptyHardState(hs) { + if isPreemptive { + return 0, errors.Errorf("unexpected HardState %+v on preemptive snapshot", &hs) + } + if err := setHardState(batch, s.Desc.RangeID, hs); err != nil { + return 0, errors.Wrapf(err, "unable to persist HardState %+v", &hs) + } + } else if isPreemptive { + // Preemptive snapshots get special verifications (see #7619) of their + // last index and (necessarily synthesized) HardState. + if snap.Metadata.Index < oldLastIndex { + // We are not aware of a specific way in which this could happen + // (Raft itself should not emit such snapshots, and no Replica can + // ever apply two preemptive snapshots), but it doesn't hurt to + // check. + return 0, errors.Errorf("preemptive snapshot would erase acknowledged log entries") + } + oldHS, err := loadHardState(batch, s.Desc.RangeID) + if err != nil { + return 0, errors.Wrap(err, "unable to load HardState") + } + if snap.Metadata.Term < oldHS.Term { + return 0, errors.Errorf("cannot apply preemptive snapshot from past term %d at term %d", + snap.Metadata.Term, oldHS.Term) } + if err := synthesizeHardState(batch, s, oldHS); err != nil { + return 0, errors.Wrap(err, "unable to write synthesized HardState") + } + } else { + // Note that we don't require that Raft supply us with a nonempty + // HardState on a snapshot. We don't want to make that assumption + // because it's not guaranteed by the contract. Raft *must* send us + // a HardState when it increases the committed index as a result of the + // snapshot, but who is to say it isn't going to accept a snapshot + // which is identical to the current state? } if err := batch.Commit(); err != nil { @@ -650,23 +705,16 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, if err := r.updateRangeInfo(&desc); err != nil { panic(err) } - - // Fill the reservation if there was any one for this range. - r.store.bookie.Fill(desc.RangeID) - // Update the range descriptor. This is done last as this is the step that // makes the Replica visible in the Store. if err := r.setDesc(&desc); err != nil { panic(err) } - switch typ { - case normalSnapshot: + if !isPreemptive { r.store.metrics.rangeSnapshotsNormalApplied.Inc(1) - case preemptiveSnapshot: + } else { r.store.metrics.rangeSnapshotsPreemptiveApplied.Inc(1) - default: - panic("not reached") } return lastIndex, nil } diff --git a/storage/replica_state.go b/storage/replica_state.go index 82262d57ab29..cb98e9eb5c5b 100644 --- a/storage/replica_state.go +++ b/storage/replica_state.go @@ -24,8 +24,8 @@ import ( "github.com/cockroachdb/cockroach/storage/storagebase" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/protoutil" - "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -321,33 +321,39 @@ func setHardState( hlc.ZeroTimestamp, nil, &st) } -func updateHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error { - // Load a potentially existing HardState as we may need to preserve - // information about cast votes. For example, during a Split for which - // another node's new right-hand side has contacted us before our left-hand - // side called in here to create the group. - rangeID := s.Desc.RangeID - oldHS, err := loadHardState(eng, rangeID) - if err != nil { - return err - } - +// synthesizeHardState synthesizes a HardState from the given ReplicaState and +// any existing on-disk HardState in the context of a snapshot, while verifying +// that the application of the snapshot does not violate Raft invariants. It +// must be called after the supplied state and ReadWriter have been updated +// with the result of the snapshot. +// If there is an existing HardState, we must respect it and we must not apply +// a snapshot that would move the state backwards. +func synthesizeHardState( + eng engine.ReadWriter, s storagebase.ReplicaState, oldHS raftpb.HardState, +) error { newHS := raftpb.HardState{ - Term: s.TruncatedState.Term, + Term: s.TruncatedState.Term, + // Note that when applying a Raft snapshot, the applied index is + // equal to the Commit index represented by the snapshot. Commit: s.RaftAppliedIndex, } - if !raft.IsEmptyHardState(oldHS) { - if oldHS.Commit > newHS.Commit { - newHS.Commit = oldHS.Commit - } - if oldHS.Term > newHS.Term { - newHS.Term = oldHS.Term - } + if oldHS.Commit > newHS.Commit { + return errors.Errorf("can't decrease HardState.Commit from %d to %d", + oldHS.Commit, newHS.Commit) + } + if oldHS.Term > newHS.Term { + // The existing HardState is allowed to be ahead of us, which is + // relevant in practice for the split trigger. We already checked above + // that we're not rewinding the acknowledged index, and we haven't + // updated votes yet. + newHS.Term = oldHS.Term + } + // If the existing HardState voted in this term, remember that. + if oldHS.Term == newHS.Term { newHS.Vote = oldHS.Vote } - - return setHardState(eng, rangeID, newHS) + return errors.Wrapf(setHardState(eng, s.Desc.RangeID, newHS), "writing HardState %+v", &newHS) } // writeInitialState bootstraps a new Raft group (i.e. it is called when we @@ -360,7 +366,6 @@ func updateHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error { func writeInitialState( eng engine.ReadWriter, ms enginepb.MVCCStats, desc roachpb.RangeDescriptor, ) (enginepb.MVCCStats, error) { - rangeID := desc.RangeID var s storagebase.ReplicaState s.TruncatedState = &roachpb.RaftTruncatedState{ @@ -369,7 +374,7 @@ func writeInitialState( } s.RaftAppliedIndex = s.TruncatedState.Index s.Desc = &roachpb.RangeDescriptor{ - RangeID: rangeID, + RangeID: desc.RangeID, } s.Stats = ms @@ -378,11 +383,16 @@ func writeInitialState( return enginepb.MVCCStats{}, err } - if err := updateHardState(eng, s); err != nil { + oldHS, err := loadHardState(eng, desc.RangeID) + if err != nil { + return enginepb.MVCCStats{}, err + } + + if err := synthesizeHardState(eng, s, oldHS); err != nil { return enginepb.MVCCStats{}, err } - if err := setLastIndex(eng, rangeID, s.TruncatedState.Index); err != nil { + if err := setLastIndex(eng, desc.RangeID, s.TruncatedState.Index); err != nil { return enginepb.MVCCStats{}, err } diff --git a/storage/replica_state_test.go b/storage/replica_state_test.go new file mode 100644 index 000000000000..5e769737175a --- /dev/null +++ b/storage/replica_state_test.go @@ -0,0 +1,94 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com) + +package storage + +import ( + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/storage/storagebase" + "github.com/cockroachdb/cockroach/testutils" + "github.com/cockroachdb/cockroach/util/leaktest" + "github.com/cockroachdb/cockroach/util/stop" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestSynthesizeHardState(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop() + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20, stopper) + + tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4} + + testCases := []struct { + TruncTerm, RaftAppliedIndex uint64 + OldHS *raftpb.HardState + NewHS raftpb.HardState + Err string + }{ + {OldHS: nil, TruncTerm: 42, RaftAppliedIndex: 24, NewHS: raftpb.HardState{Term: 42, Vote: 0, Commit: 24}}, + // Can't wind back the committed index of the new HardState. + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit - 1, Err: "can't decrease HardState.Commit"}, + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit, NewHS: tHS}, + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit + 1, NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1}}, + // Higher Term is picked up, but vote isn't carried over when the term + // changes. + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit, TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit}}, + } + + for i, test := range testCases { + func() { + batch := eng.NewBatch() + defer batch.Close() + testState := storagebase.ReplicaState{ + Desc: testRangeDescriptor(), + TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm}, + RaftAppliedIndex: test.RaftAppliedIndex, + } + + if test.OldHS != nil { + if err := setHardState(batch, testState.Desc.RangeID, *test.OldHS); err != nil { + t.Fatal(err) + } + } + + oldHS, err := loadHardState(batch, testState.Desc.RangeID) + if err != nil { + t.Fatal(err) + } + + if err := synthesizeHardState(batch, testState, oldHS); ((err == nil) != (test.Err == "")) || + (err != nil && !testutils.IsError(err, test.Err)) { + t.Fatalf("%d: %v (expected '%s')", i, err, test.Err) + } else if err != nil { + // No further checking if we expected an error and got it. + return + } + + hs, err := loadHardState(batch, testState.Desc.RangeID) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(hs, test.NewHS) { + t.Fatalf("%d: expected %+v, got %+v", i, &test.NewHS, &hs) + } + }() + } +} diff --git a/storage/replica_test.go b/storage/replica_test.go index 895b162291a1..7d6cd9f1734c 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -33,6 +33,7 @@ import ( "golang.org/x/net/context" "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" @@ -6168,7 +6169,10 @@ func TestReserveAndApplySnapshot(t *testing.T) { t.Fatalf("Can't reserve the replica") } checkReservations(t, 1) - if _, err := firstRng.applySnapshot(snap, normalSnapshot); err != nil { + + // Apply a snapshot and check the reservation was filled. Note that this + // out-of-band application could be a root cause if this test ever crashes. + if _, err := firstRng.applySnapshot(snap, raftpb.HardState{}); err != nil { t.Fatal(err) } checkReservations(t, 0) diff --git a/storage/store.go b/storage/store.go index 83f34d4c7858..1e8d54f68c39 100644 --- a/storage/store.go +++ b/storage/store.go @@ -2157,7 +2157,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error { // the raft group (i.e. replicas with an ID of 0). This is the only // operation that can be performed on a replica before it is part of the // raft group. - _, err := r.applySnapshot(req.Message.Snapshot, preemptiveSnapshot) + _, err := r.applySnapshot(req.Message.Snapshot, raftpb.HardState{}) return err } // We disallow non-snapshot messages to replica ID 0. Note that