Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: assert around snapshot sending/receiving #42011

Merged
merged 1 commit into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/storage/batcheval/cmd_truncate_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,15 @@ func TruncateLog(
ms.SysBytes = -ms.SysBytes // simulate the deletion

} else {
if _, _, _, err := engine.MVCCDeleteRange(ctx, batch, &ms, start.Key, end.Key, math.MaxInt64, /* max */
hlc.Timestamp{}, nil /* txn */, false /* returnKeys */); err != nil {
_, _, numDeleted, err := engine.MVCCDeleteRange(ctx, batch, &ms, start.Key, end.Key, math.MaxInt64, /* max */
hlc.Timestamp{}, nil /* txn */, false /* returnKeys */)
if err != nil {
return result.Result{}, err
}
if expNumDeleted := (args.Index - firstIndex); uint64(numDeleted) > expNumDeleted {
log.Fatalf(ctx, "expected to delete up to %d log entries [%d, %d), deleted %d entries",
expNumDeleted, firstIndex, args.Index, numDeleted)
}
}

tState := &roachpb.RaftTruncatedState{
Expand Down
22 changes: 19 additions & 3 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,29 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision.NewFirstIndex = decision.Input.FirstIndex
decision.ChosenVia = truncatableIndexChosenViaFirstIndex
}

// Invariants: NewFirstIndex >= FirstIndex
// NewFirstIndex <= LastIndex (if != 10)
irfansharif marked this conversation as resolved.
Show resolved Hide resolved
// NewFirstIndex <= QuorumIndex (if != 0)
//
// For uninit'ed replicas we can have input.FirstIndex > input.LastIndex, more
// specifically input.FirstIndex = input.LastIndex + 1. FirstIndex is set to
// TruncatedState.Index + 1, and for an unit'ed replica, LastIndex is simply
// 10. This is what informs the `input.LastIndex == 10` conditional below.
valid := (decision.NewFirstIndex >= input.FirstIndex) &&
(decision.NewFirstIndex <= input.LastIndex || input.LastIndex == 10) &&
(decision.NewFirstIndex <= decision.QuorumIndex || decision.QuorumIndex == 0)
if !valid {
err := fmt.Sprintf("invalid truncation decision; output = %d, input: [%d, %d], quorum idx = %d",
decision.NewFirstIndex, input.FirstIndex, input.LastIndex, decision.QuorumIndex)
panic(err)
}

return decision
}

// getQuorumIndex returns the index which a quorum of the nodes have
// committed. The snapshotLogTruncationConstraints indicates the index of a pending
// snapshot which is considered part of the Raft group even though it hasn't
// been added yet. Note that getQuorumIndex may return 0 if the progress map
// committed. Note that getQuorumIndex may return 0 if the progress map
// doesn't contain information for a sufficient number of followers (e.g. the
// local replica has only recently become the leader). In general, the value
// returned by getQuorumIndex may be smaller than raftStatus.Commit which is
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ type Replica struct {
// from the Raft log entry. Use the invalidLastTerm constant for this
// case.
lastIndex, lastTerm uint64
// A map of raft log index of pending preemptive snapshots to deadlines.
// A map of raft log index of pending snapshots to deadlines.
// Used to prohibit raft log truncations that would leave a gap between
// the snapshot and the new first index. The map entry has a zero
// deadline while the snapshot is being sent and turns nonzero when the
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,13 @@ func (r *Replica) sendSnapshot(
r.store.Engine().NewBatch,
sent,
); err != nil {
if errors.Cause(err) == malformedSnapshotError {
checkpointDir := r.store.checkpoint(ctx,
fmt.Sprintf("r%d_%s", r.RangeID, snap.SnapUUID.Short()))

log.Fatalf(ctx, "malformed snapshot generated, checkpoint created at: %s", checkpointDir)
}

return &snapshotError{err}
}
return nil
Expand Down
42 changes: 42 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,10 @@ type IncomingSnapshot struct {
snapType string
}

func (s *IncomingSnapshot) String() string {
return fmt.Sprintf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.State.RaftAppliedIndex)
}

// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the
// given range. Note that snapshot() is called without Replica.raftMu held.
func snapshot(
Expand Down Expand Up @@ -936,6 +940,21 @@ func (r *Replica) applySnapshot(
s.RaftAppliedIndex, snap.Metadata.Index)
}

if expLen := s.RaftAppliedIndex - s.TruncatedState.Index; expLen != uint64(len(logEntries)) {
entriesRange, err := extractRangeFromEntries(inSnap.LogEntries)
if err != nil {
return err
}

checkpointDir := r.store.checkpoint(ctx,
fmt.Sprintf("r%d_%s", r.RangeID, inSnap.SnapUUID.String()))

log.Fatalf(ctx, "missing log entries in snapshot (%s): got %d entries, expected %d "+
"(TruncatedState.Index=%d, HardState=%s, LogEntries=%s). checkpoint created at: %s",
inSnap.String(), len(logEntries), expLen, s.TruncatedState.Index,
hs.String(), entriesRange, checkpointDir)
}

// We've written Raft log entries, so we need to sync the WAL.
if err := batch.Commit(syncRaftLog.Get(&r.store.cfg.Settings.SV)); err != nil {
return err
Expand Down Expand Up @@ -1016,6 +1035,29 @@ func (r *Replica) applySnapshot(
return nil
}

// extractRangeFromEntries returns a string representation of the range of
// marshaled list of raft log entries in the form of [first-index, last-index].
// If the list is empty, "[n/a, n/a]" is returned instead.
func extractRangeFromEntries(logEntries [][]byte) (string, error) {
var firstIndex, lastIndex string
if len(logEntries) == 0 {
firstIndex = "n/a"
lastIndex = "n/a"
} else {
firstAndLastLogEntries := make([]raftpb.Entry, 2)
if err := protoutil.Unmarshal(logEntries[0], &firstAndLastLogEntries[0]); err != nil {
return "", err
}
if err := protoutil.Unmarshal(logEntries[len(logEntries)-1], &firstAndLastLogEntries[1]); err != nil {
return "", err
}

firstIndex = string(firstAndLastLogEntries[0].Index)
lastIndex = string(firstAndLastLogEntries[1].Index)
}
return fmt.Sprintf("[%s, %s]", firstIndex, lastIndex), nil
}

type raftCommandEncodingVersion byte

// Raft commands are encoded with a 1-byte version (currently 0 or 1), an 8-byte
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -4458,6 +4460,20 @@ func (s *Store) updateCommandQueueGauges() error {
return nil
}

func (s *Store) checkpoint(ctx context.Context, tag string) string {
// We create RocksDB checkpoints whenever we run into missing raft log
// entries in the incoming snapshot.
checkpointBase := filepath.Join(s.engine.GetAuxiliaryDir(), "checkpoints")
_ = os.MkdirAll(checkpointBase, 0700)

checkpointDir := filepath.Join(checkpointBase, tag)
if err := s.engine.CreateCheckpoint(checkpointDir); err != nil {
log.Warningf(ctx, "unable to create checkpoint %s: %+v", checkpointDir, err)
}

return checkpointDir
}

// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
Expand Down
35 changes: 34 additions & 1 deletion pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type kvBatchSnapshotStrategy struct {
newBatch func() engine.Batch
}

// Send implements the snapshotStrategy interface.
// Receive implements the snapshotStrategy interface.
func (kvSS *kvBatchSnapshotStrategy) Receive(
ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header,
) (IncomingSnapshot, error) {
Expand Down Expand Up @@ -147,6 +147,17 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
State: &header.State,
snapType: snapTypeRaft,
}

expLen := inSnap.State.RaftAppliedIndex - inSnap.State.TruncatedState.Index
if expLen != uint64(len(logEntries)) {
// We've received a botched snapshot. We could fatal right here but opt
// to warn loudly instead, and fatal when applying the snapshot
// (in Replica.applySnapshot) in order to capture replica hard state.
log.Warningf(ctx,
"missing log entries in snapshot (%s): got %d entries, expected %d",
inSnap.String(), len(logEntries), expLen)
}

if header.RaftMessageRequest.ToReplica.ReplicaID == 0 {
inSnap.snapType = snapTypePreemptive
}
Expand All @@ -156,6 +167,10 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
}
}

// A MalformedSnapshotError indicates that the snapshot in question is
// malformed, for e.g. missing raft log entries.
var malformedSnapshotError = errors.New("malformed snapshot generated")

// Send implements the snapshotStrategy interface.
func (kvSS *kvBatchSnapshotStrategy) Send(
ctx context.Context,
Expand Down Expand Up @@ -266,6 +281,24 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
return err
}

// The difference between the snapshot index (applied index at the time of
// snapshot) and the truncated index should equal the number of log entries
// shipped over.
expLen := endIndex - firstIndex
if expLen != uint64(len(logEntries)) {
// We've generated a botched snapshot. We could fatal right here but opt
// to warn loudly instead, and fatal at the caller to capture a checkpoint
// of the underlying storage engine.
entriesRange, err := extractRangeFromEntries(logEntries)
if err != nil {
return err
}
log.Warningf(ctx, "missing log entries in snapshot (%s): "+
"got %d entries, expected %d (TruncatedState.Index=%d, LogEntries=%s)",
snap.String(), len(logEntries), expLen, snap.State.TruncatedState.Index, entriesRange)
return malformedSnapshotError
}

// Inline the payloads for all sideloaded proposals.
//
// TODO(tschottdorf): could also send slim proposals and attach sideloaded
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/store_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -60,6 +61,10 @@ func TestSnapshotRaftLogLimit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
firstIndex, err := (*replicaRaftStorage)(repl).FirstIndex()
if err != nil {
t.Fatal(err)
}
eng := store.Engine()
snap := eng.NewSnapshot()
defer snap.Close()
Expand All @@ -74,7 +79,12 @@ func TestSnapshotRaftLogLimit(t *testing.T) {
outSnap := &OutgoingSnapshot{
Iter: iter,
EngineSnap: snap,
snapType: snapType,
State: storagebase.ReplicaState{
TruncatedState: &roachpb.RaftTruncatedState{
Index: firstIndex - 1,
},
},
snapType: snapType,
RaftSnap: raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: lastIndex,
Expand Down