diff --git a/storage/replica.go b/storage/replica.go index 328c365c1167..7010ddc357d7 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -1440,7 +1440,7 @@ func (r *Replica) handleRaftReady() error { if !raft.IsEmptySnap(rd.Snapshot) { var err error - lastIndex, err = r.applySnapshot(rd.Snapshot, normalSnapshot) + lastIndex, err = r.applySnapshot(rd.Snapshot, rd.HardState) if err != nil { return err } diff --git a/storage/replica_raftstorage.go b/storage/replica_raftstorage.go index 6cf10964f3ce..56488c84a5df 100644 --- a/storage/replica_raftstorage.go +++ b/storage/replica_raftstorage.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "strconv" "time" "github.com/coreos/etcd/raft" @@ -517,16 +518,21 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } -type snapshotType int - -const ( - normalSnapshot snapshotType = iota - preemptiveSnapshot -) - -// applySnapshot updates the replica based on the given snapshot. -// Returns the new last index. -func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, error) { +// applySnapshot updates the replica based on the given snapshot and associated +// HardState. The supplied HardState must be empty if a preemptive snapshot is +// being applied (which is the case if and only if the ReplicaID is zero), in +// which case it will be synthesized from any existing on-disk HardState +// appropriately. For a regular snapshot, a HardState may or may not be +// supplied, though in the common case it is (since the commit index changes as +// a result of the snapshot application, so Raft will supply us with one). +// The HardState, if altered or supplied, is persisted along with the applied +// snapshot and the new last index is returned. +// +// During preemptive snapshots, we (must) run additional safety checks. For +// example, the HardState, Raft's view of term, vote and committed log entries, +// and other Raft state (like acknowledged log entries) must not move backwards, +// so we must be very careful touching it manually and refuse such snapshots. +func (r *Replica) applySnapshot(snap raftpb.Snapshot, hs raftpb.HardState) (uint64, error) { // We use a separate batch to apply the snapshot since the Replica (and in // particular the last index) is updated after the batch commits. Using a // separate batch also allows for future optimization (such as using a @@ -542,21 +548,41 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, // Extract the updated range descriptor. desc := snapData.RangeDescriptor + // Fill the reservation if there was one for this range, regardless of + // whether the application succeeded. + defer r.store.bookie.Fill(desc.RangeID) r.mu.Lock() replicaID := r.mu.replicaID raftLogSize := r.mu.raftLogSize r.mu.Unlock() - log.Infof("replica %d received snapshot for range %d at index %d. "+ - "encoded size=%d, %d KV pairs, %d log entries", - replicaID, desc.RangeID, snap.Metadata.Index, + isPreemptive := replicaID == 0 + + replicaIDStr := "[?]" + snapType := "preemptive" + if !isPreemptive { + replicaIDStr = strconv.FormatInt(int64(replicaID), 10) + snapType = "Raft" + } + + log.Infof("replica %s applying %s snapshot for range %d at index %d "+ + "(encoded size=%d, %d KV pairs, %d log entries)", + replicaIDStr, snapType, desc.RangeID, snap.Metadata.Index, len(snap.Data), len(snapData.KV), len(snapData.LogEntries)) defer func(start time.Time) { - log.Infof("replica %d applied snapshot for range %d in %s", - replicaID, desc.RangeID, timeutil.Since(start)) + log.Infof("replica %s applied %s snapshot for range %d in %s", + replicaIDStr, snapType, desc.RangeID, timeutil.Since(start)) }(timeutil.Now()) + // Remember the old last index to verify that the snapshot doesn't wipe out + // log entries which have been acknowledged, which is possible with + // preemptive snapshots. We assert on it later in this call. + oldLastIndex, err := loadLastIndex(batch, desc.RangeID) + if err != nil { + return 0, errors.Wrap(err, "loading last index") + } + // Delete everything in the range and recreate it from the snapshot. // We need to delete any old Raft log entries here because any log entries // that predate the snapshot will be orphaned and never truncated or GC'd. @@ -605,19 +631,41 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, } // As outlined above, last and applied index are the same after applying - // the snapshot. + // the snapshot (i.e. the snapshot has no uncommitted tail). if s.RaftAppliedIndex != snap.Metadata.Index { log.Fatalf("%d: snapshot resulted in appliedIndex=%d, metadataIndex=%d", s.Desc.RangeID, s.RaftAppliedIndex, snap.Metadata.Index) } - if replicaID == 0 { - // The replica is not part of the Raft group so we need to write the Raft - // hard state for the replica in order for the Raft state machine to start - // correctly. - if err := updateHardState(batch, s); err != nil { - return 0, err + if !raft.IsEmptyHardState(hs) { + if isPreemptive { + return 0, errors.Errorf("unexpected HardState %+v on preemptive snapshot", &hs) + } + if err := setHardState(batch, s.Desc.RangeID, hs); err != nil { + return 0, errors.Wrapf(err, "unable to persist HardState %+v", &hs) + } + } else if isPreemptive { + // Preemptive snapshots get special verifications (see #7619) of their + // last index and (necessarily synthesized) HardState. + if snap.Metadata.Index < oldLastIndex { + // TODO(tschottdorf): how exactly would this happen? Perhaps: + // - ReplicaID=0 receives snapshot, applies + // - leaks info to leader (perhaps by rejecting a past append) + // - ReplicaID=0 receives older snapshot, truncates state + // Should simulate that test. + return 0, errors.Wrap(err, + "preemptive snapshot would erase acknowledged log entries") + } + if err := synthesizeHardState(batch, s); err != nil { + return 0, errors.Wrap(err, "unable to write synthesized HardState") } + } else { + // Note that we don't require that Raft supply us with a nonempty + // HardState on a snapshot. We don't want to make that assumption + // because it's not guaranteed by the contract. Raft *must* send us + // a HardState when it increases the committed index as a result of the + // snapshot, but who is to say it isn't going to accept a snapshot + // which is identical to the current state? } if err := batch.Commit(); err != nil { @@ -650,23 +698,16 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, if err := r.updateRangeInfo(&desc); err != nil { panic(err) } - - // Fill the reservation if there was any one for this range. - r.store.bookie.Fill(desc.RangeID) - // Update the range descriptor. This is done last as this is the step that // makes the Replica visible in the Store. if err := r.setDesc(&desc); err != nil { panic(err) } - switch typ { - case normalSnapshot: + if !isPreemptive { r.store.metrics.rangeSnapshotsNormalApplied.Inc(1) - case preemptiveSnapshot: + } else { r.store.metrics.rangeSnapshotsPreemptiveApplied.Inc(1) - default: - panic("not reached") } return lastIndex, nil } diff --git a/storage/replica_state.go b/storage/replica_state.go index fbe66ed541ed..e3c05d4d8e81 100644 --- a/storage/replica_state.go +++ b/storage/replica_state.go @@ -24,8 +24,8 @@ import ( "github.com/cockroachdb/cockroach/storage/storagebase" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/protoutil" - "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -321,33 +321,41 @@ func setHardState( hlc.ZeroTimestamp, nil, &st) } -func updateHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error { - // Load a potentially existing HardState as we may need to preserve - // information about cast votes. For example, during a Split for which - // another node's new right-hand side has contacted us before our left-hand - // side called in here to create the group. - rangeID := s.Desc.RangeID - oldHS, err := loadHardState(eng, rangeID) +// synthesizeHardState synthesizes a HardState from the given ReplicaState and +// any existing on-disk HardState in the context of a snapshot, while verifying +// that the application of the snapshot does not violate Raft invariants. It +// must be called after the supplied state and ReadWriter have been updated +// with the result of the snapshot. +// If there is an existing HardState, we must respect it and we must not apply +// a snapshot that would move the state backwards. +func synthesizeHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error { + oldHS, err := loadHardState(eng, s.Desc.RangeID) if err != nil { return err } newHS := raftpb.HardState{ - Term: s.TruncatedState.Term, + Term: s.TruncatedState.Term, + // Note that when applying a Raft snapshot, the applied index is + // equal to the Commit index represented by the snapshot. Commit: s.RaftAppliedIndex, } - if !raft.IsEmptyHardState(oldHS) { - if oldHS.Commit > newHS.Commit { - newHS.Commit = oldHS.Commit - } - if oldHS.Term > newHS.Term { - newHS.Term = oldHS.Term - } + if oldHS.Commit > newHS.Commit { + return errors.Errorf("can't decrease HardState.Commit from %d to %d", + oldHS.Commit, newHS.Commit) + } + if oldHS.Term > newHS.Term { + // The existing HardState is allowed to be ahead of us (and in fact we + // need this). We already checked above that we're not rewinding the + // acknowledged index, and we haven't updated votes yet. + newHS.Term = oldHS.Term + } + // If the existing HardState voted in this term, remember that. + if oldHS.Term == newHS.Term { newHS.Vote = oldHS.Vote } - - return setHardState(eng, rangeID, newHS) + return errors.Wrapf(setHardState(eng, s.Desc.RangeID, newHS), "writing HardState %+v", &newHS) } // writeInitialState bootstraps a new Raft group (i.e. it is called when we @@ -378,7 +386,7 @@ func writeInitialState( return enginepb.MVCCStats{}, err } - if err := updateHardState(eng, s); err != nil { + if err := synthesizeHardState(eng, s); err != nil { return enginepb.MVCCStats{}, err } diff --git a/storage/replica_state_test.go b/storage/replica_state_test.go new file mode 100644 index 000000000000..7cd1ffebe103 --- /dev/null +++ b/storage/replica_state_test.go @@ -0,0 +1,89 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com) + +package storage + +import ( + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/storage/storagebase" + "github.com/cockroachdb/cockroach/testutils" + "github.com/cockroachdb/cockroach/util/leaktest" + "github.com/cockroachdb/cockroach/util/stop" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestSynthesizeHardState(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop() + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20, stopper) + + tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4} + + testCases := []struct { + TruncTerm, RaftAppliedIndex uint64 + OldHS *raftpb.HardState + NewHS raftpb.HardState + Err string + }{ + {OldHS: nil, TruncTerm: 42, RaftAppliedIndex: 24, NewHS: raftpb.HardState{Term: 42, Vote: 0, Commit: 24}}, + // Can't wind back the committed index of the new HardState. + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit - 1, Err: "can't decrease HardState.Commit"}, + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit, NewHS: tHS}, + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit + 1, NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1}}, + // Higher Term is picked up, but vote isn't carried over when the term + // changes. + {OldHS: &tHS, RaftAppliedIndex: tHS.Commit, TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit}}, + } + + for i, test := range testCases { + func() { + batch := eng.NewBatch() + defer batch.Close() + testState := storagebase.ReplicaState{ + Desc: testRangeDescriptor(), + TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm}, + RaftAppliedIndex: test.RaftAppliedIndex, + } + + if test.OldHS != nil { + if err := setHardState(batch, testState.Desc.RangeID, *test.OldHS); err != nil { + t.Fatal(err) + } + } + + if err := synthesizeHardState(batch, testState); ((err == nil) != (test.Err == "")) || + (err != nil && !testutils.IsError(err, test.Err)) { + t.Fatalf("%d: %v (expected '%s')", i, err, test.Err) + } else if err != nil { + // No further checking if we expected an error and got it. + return + } + + hs, err := loadHardState(batch, testState.Desc.RangeID) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(hs, test.NewHS) { + t.Fatalf("%d: expected %+v, got %+v", i, &test.NewHS, &hs) + } + }() + } +} diff --git a/storage/replica_test.go b/storage/replica_test.go index eb984d600180..551a134d21f7 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -33,6 +33,7 @@ import ( "golang.org/x/net/context" "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" @@ -6168,7 +6169,10 @@ func TestReserveAndApplySnapshot(t *testing.T) { t.Fatalf("Can't reserve the replica") } checkReservations(t, 1) - if _, err := firstRng.applySnapshot(snap, normalSnapshot); err != nil { + + // Apply a snapshot and check the reservation was filled. Note that this + // out-of-band application could be a root cause if this test ever crashes. + if _, err := firstRng.applySnapshot(snap, raftpb.HardState{}); err != nil { t.Fatal(err) } checkReservations(t, 0) diff --git a/storage/store.go b/storage/store.go index 3fe6de99f101..2c90ab4f0a59 100644 --- a/storage/store.go +++ b/storage/store.go @@ -2152,7 +2152,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error { // the raft group (i.e. replicas with an ID of 0). This is the only // operation that can be performed on a replica before it is part of the // raft group. - _, err := r.applySnapshot(req.Message.Snapshot, preemptiveSnapshot) + _, err := r.applySnapshot(req.Message.Snapshot, raftpb.HardState{}) return err } // We disallow non-snapshot messages to replica ID 0. Note that