Skip to content

Commit

Permalink
storage: always write a HardState
Browse files Browse the repository at this point in the history
As discovered in
cockroachdb#6991 (comment),
it's possible that we apply a Raft snapshot without writing a corresponding
HardState since we write the snapshot in its own batch first and only then
write a HardState.

If that happens, the server is going to panic on restart: It will have a
nontrivial first index, but a committed index of zero (from the empty
HardState).

This change prevents us from applying a snapshot when there is no HardState
supplied along with it, except when applying a preemptive snapshot (in which
case we synthesize a HardState).

Ensure that the new HardState and Raft log does not break promises made by an
existing one during preemptive snapshot application.

Fixes cockroachdb#7619.

storage: prevent loss of uncommitted log entries
  • Loading branch information
tbg committed Jul 8, 2016
1 parent 70fc07e commit 6af073e
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 53 deletions.
2 changes: 1 addition & 1 deletion storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ func (r *Replica) handleRaftReady() error {

if !raft.IsEmptySnap(rd.Snapshot) {
var err error
lastIndex, err = r.applySnapshot(rd.Snapshot, normalSnapshot)
lastIndex, err = r.applySnapshot(rd.Snapshot, rd.HardState)
if err != nil {
return err
}
Expand Down
103 changes: 72 additions & 31 deletions storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package storage

import (
"bytes"
"strconv"
"time"

"github.com/coreos/etcd/raft"
Expand Down Expand Up @@ -517,16 +518,21 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error {
return nil
}

type snapshotType int

const (
normalSnapshot snapshotType = iota
preemptiveSnapshot
)

// applySnapshot updates the replica based on the given snapshot.
// Returns the new last index.
func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64, error) {
// applySnapshot updates the replica based on the given snapshot and associated
// HardState. The supplied HardState must be empty if a preemptive snapshot is
// being applied (which is the case if and only if the ReplicaID is zero), in
// which case it will be synthesized from any existing on-disk HardState
// appropriately. For a regular snapshot, a HardState may or may not be
// supplied, though in the common case it is (since the commit index changes as
// a result of the snapshot application, so Raft will supply us with one).
// The HardState, if altered or supplied, is persisted along with the applied
// snapshot and the new last index is returned.
//
// During preemptive snapshots, we (must) run additional safety checks. For
// example, the HardState, Raft's view of term, vote and committed log entries,
// and other Raft state (like acknowledged log entries) must not move backwards,
// so we must be very careful touching it manually and refuse such snapshots.
func (r *Replica) applySnapshot(snap raftpb.Snapshot, hs raftpb.HardState) (uint64, error) {
// We use a separate batch to apply the snapshot since the Replica (and in
// particular the last index) is updated after the batch commits. Using a
// separate batch also allows for future optimization (such as using a
Expand All @@ -542,21 +548,41 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64,

// Extract the updated range descriptor.
desc := snapData.RangeDescriptor
// Fill the reservation if there was one for this range, regardless of
// whether the application succeeded.
defer r.store.bookie.Fill(desc.RangeID)

r.mu.Lock()
replicaID := r.mu.replicaID
raftLogSize := r.mu.raftLogSize
r.mu.Unlock()

log.Infof("replica %d received snapshot for range %d at index %d. "+
"encoded size=%d, %d KV pairs, %d log entries",
replicaID, desc.RangeID, snap.Metadata.Index,
isPreemptive := replicaID == 0

replicaIDStr := "[?]"
snapType := "preemptive"
if !isPreemptive {
replicaIDStr = strconv.FormatInt(int64(replicaID), 10)
snapType = "Raft"
}

log.Infof("replica %s applying %s snapshot for range %d at index %d "+
"(encoded size=%d, %d KV pairs, %d log entries)",
replicaIDStr, snapType, desc.RangeID, snap.Metadata.Index,
len(snap.Data), len(snapData.KV), len(snapData.LogEntries))
defer func(start time.Time) {
log.Infof("replica %d applied snapshot for range %d in %s",
replicaID, desc.RangeID, timeutil.Since(start))
log.Infof("replica %s applied %s snapshot for range %d in %s",
replicaIDStr, snapType, desc.RangeID, timeutil.Since(start))
}(timeutil.Now())

// Remember the old last index to verify that the snapshot doesn't wipe out
// log entries which have been acknowledged, which is possible with
// preemptive snapshots. We assert on it later in this call.
oldLastIndex, err := loadLastIndex(batch, desc.RangeID)
if err != nil {
return 0, errors.Wrap(err, "loading last index")
}

// Delete everything in the range and recreate it from the snapshot.
// We need to delete any old Raft log entries here because any log entries
// that predate the snapshot will be orphaned and never truncated or GC'd.
Expand Down Expand Up @@ -605,19 +631,41 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64,
}

// As outlined above, last and applied index are the same after applying
// the snapshot.
// the snapshot (i.e. the snapshot has no uncommitted tail).
if s.RaftAppliedIndex != snap.Metadata.Index {
log.Fatalf("%d: snapshot resulted in appliedIndex=%d, metadataIndex=%d",
s.Desc.RangeID, s.RaftAppliedIndex, snap.Metadata.Index)
}

if replicaID == 0 {
// The replica is not part of the Raft group so we need to write the Raft
// hard state for the replica in order for the Raft state machine to start
// correctly.
if err := updateHardState(batch, s); err != nil {
return 0, err
if !raft.IsEmptyHardState(hs) {
if isPreemptive {
return 0, errors.Errorf("unexpected HardState %+v on preemptive snapshot", &hs)
}
if err := setHardState(batch, s.Desc.RangeID, hs); err != nil {
return 0, errors.Wrapf(err, "unable to persist HardState %+v", &hs)
}
} else if isPreemptive {
// Preemptive snapshots get special verifications (see #7619) of their
// last index and (necessarily synthesized) HardState.
if snap.Metadata.Index < oldLastIndex {
// TODO(tschottdorf): how exactly would this happen? Perhaps:
// - ReplicaID=0 receives snapshot, applies
// - leaks info to leader (perhaps by rejecting a past append)
// - ReplicaID=0 receives older snapshot, truncates state
// Should simulate that test.
return 0, errors.Wrap(err,
"preemptive snapshot would erase acknowledged log entries")
}
if err := synthesizeHardState(batch, s); err != nil {
return 0, errors.Wrap(err, "unable to write synthesized HardState")
}
} else {
// Note that we don't require that Raft supply us with a nonempty
// HardState on a snapshot. We don't want to make that assumption
// because it's not guaranteed by the contract. Raft *must* send us
// a HardState when it increases the committed index as a result of the
// snapshot, but who is to say it isn't going to accept a snapshot
// which is identical to the current state?
}

if err := batch.Commit(); err != nil {
Expand Down Expand Up @@ -650,23 +698,16 @@ func (r *Replica) applySnapshot(snap raftpb.Snapshot, typ snapshotType) (uint64,
if err := r.updateRangeInfo(&desc); err != nil {
panic(err)
}

// Fill the reservation if there was any one for this range.
r.store.bookie.Fill(desc.RangeID)

// Update the range descriptor. This is done last as this is the step that
// makes the Replica visible in the Store.
if err := r.setDesc(&desc); err != nil {
panic(err)
}

switch typ {
case normalSnapshot:
if !isPreemptive {
r.store.metrics.rangeSnapshotsNormalApplied.Inc(1)
case preemptiveSnapshot:
} else {
r.store.metrics.rangeSnapshotsPreemptiveApplied.Inc(1)
default:
panic("not reached")
}
return lastIndex, nil
}
Expand Down
46 changes: 27 additions & 19 deletions storage/replica_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/cockroachdb/cockroach/storage/storagebase"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/protoutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/pkg/errors"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -321,33 +321,41 @@ func setHardState(
hlc.ZeroTimestamp, nil, &st)
}

func updateHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error {
// Load a potentially existing HardState as we may need to preserve
// information about cast votes. For example, during a Split for which
// another node's new right-hand side has contacted us before our left-hand
// side called in here to create the group.
rangeID := s.Desc.RangeID
oldHS, err := loadHardState(eng, rangeID)
// synthesizeHardState synthesizes a HardState from the given ReplicaState and
// any existing on-disk HardState in the context of a snapshot, while verifying
// that the application of the snapshot does not violate Raft invariants. It
// must be called after the supplied state and ReadWriter have been updated
// with the result of the snapshot.
// If there is an existing HardState, we must respect it and we must not apply
// a snapshot that would move the state backwards.
func synthesizeHardState(eng engine.ReadWriter, s storagebase.ReplicaState) error {
oldHS, err := loadHardState(eng, s.Desc.RangeID)
if err != nil {
return err
}

newHS := raftpb.HardState{
Term: s.TruncatedState.Term,
Term: s.TruncatedState.Term,
// Note that when applying a Raft snapshot, the applied index is
// equal to the Commit index represented by the snapshot.
Commit: s.RaftAppliedIndex,
}

if !raft.IsEmptyHardState(oldHS) {
if oldHS.Commit > newHS.Commit {
newHS.Commit = oldHS.Commit
}
if oldHS.Term > newHS.Term {
newHS.Term = oldHS.Term
}
if oldHS.Commit > newHS.Commit {
return errors.Errorf("can't decrease HardState.Commit from %d to %d",
oldHS.Commit, newHS.Commit)
}
if oldHS.Term > newHS.Term {
// The existing HardState is allowed to be ahead of us (and in fact we
// need this). We already checked above that we're not rewinding the
// acknowledged index, and we haven't updated votes yet.
newHS.Term = oldHS.Term
}
// If the existing HardState voted in this term, remember that.
if oldHS.Term == newHS.Term {
newHS.Vote = oldHS.Vote
}

return setHardState(eng, rangeID, newHS)
return errors.Wrapf(setHardState(eng, s.Desc.RangeID, newHS), "writing HardState %+v", &newHS)
}

// writeInitialState bootstraps a new Raft group (i.e. it is called when we
Expand Down Expand Up @@ -378,7 +386,7 @@ func writeInitialState(
return enginepb.MVCCStats{}, err
}

if err := updateHardState(eng, s); err != nil {
if err := synthesizeHardState(eng, s); err != nil {
return enginepb.MVCCStats{}, err
}

Expand Down
89 changes: 89 additions & 0 deletions storage/replica_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tobias Schottdorf ([email protected])

package storage

import (
"reflect"
"testing"

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/storage/storagebase"
"github.com/cockroachdb/cockroach/testutils"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/coreos/etcd/raft/raftpb"
)

func TestSynthesizeHardState(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop()
eng := engine.NewInMem(roachpb.Attributes{}, 1<<20, stopper)

tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4}

testCases := []struct {
TruncTerm, RaftAppliedIndex uint64
OldHS *raftpb.HardState
NewHS raftpb.HardState
Err string
}{
{OldHS: nil, TruncTerm: 42, RaftAppliedIndex: 24, NewHS: raftpb.HardState{Term: 42, Vote: 0, Commit: 24}},
// Can't wind back the committed index of the new HardState.
{OldHS: &tHS, RaftAppliedIndex: tHS.Commit - 1, Err: "can't decrease HardState.Commit"},
{OldHS: &tHS, RaftAppliedIndex: tHS.Commit, NewHS: tHS},
{OldHS: &tHS, RaftAppliedIndex: tHS.Commit + 1, NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1}},
// Higher Term is picked up, but vote isn't carried over when the term
// changes.
{OldHS: &tHS, RaftAppliedIndex: tHS.Commit, TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit}},
}

for i, test := range testCases {
func() {
batch := eng.NewBatch()
defer batch.Close()
testState := storagebase.ReplicaState{
Desc: testRangeDescriptor(),
TruncatedState: &roachpb.RaftTruncatedState{Term: test.TruncTerm},
RaftAppliedIndex: test.RaftAppliedIndex,
}

if test.OldHS != nil {
if err := setHardState(batch, testState.Desc.RangeID, *test.OldHS); err != nil {
t.Fatal(err)
}
}

if err := synthesizeHardState(batch, testState); ((err == nil) != (test.Err == "")) ||
(err != nil && !testutils.IsError(err, test.Err)) {
t.Fatalf("%d: %v (expected '%s')", i, err, test.Err)
} else if err != nil {
// No further checking if we expected an error and got it.
return
}

hs, err := loadHardState(batch, testState.Desc.RangeID)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(hs, test.NewHS) {
t.Fatalf("%d: expected %+v, got %+v", i, &test.NewHS, &hs)
}
}()
}
}
6 changes: 5 additions & 1 deletion storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"golang.org/x/net/context"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"

Expand Down Expand Up @@ -6168,7 +6169,10 @@ func TestReserveAndApplySnapshot(t *testing.T) {
t.Fatalf("Can't reserve the replica")
}
checkReservations(t, 1)
if _, err := firstRng.applySnapshot(snap, normalSnapshot); err != nil {

// Apply a snapshot and check the reservation was filled. Note that this
// out-of-band application could be a root cause if this test ever crashes.
if _, err := firstRng.applySnapshot(snap, raftpb.HardState{}); err != nil {
t.Fatal(err)
}
checkReservations(t, 0)
Expand Down
2 changes: 1 addition & 1 deletion storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
// the raft group (i.e. replicas with an ID of 0). This is the only
// operation that can be performed on a replica before it is part of the
// raft group.
_, err := r.applySnapshot(req.Message.Snapshot, preemptiveSnapshot)
_, err := r.applySnapshot(req.Message.Snapshot, raftpb.HardState{})
return err
}
// We disallow non-snapshot messages to replica ID 0. Note that
Expand Down

0 comments on commit 6af073e

Please sign in to comment.