From ef8a2a4206a2cb74c3a50b99e5d2feb46354a62b Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Thu, 14 Mar 2019 14:21:10 +0100 Subject: [PATCH] storage: don't send historical Raft log with snapshots Assume a leaseholder wants to send a (Raft or preemptive) snapshot to a follower. Say the leaseholder's log ranges from 100 to 200, and we assume that the size (in bytes) of this log is 200mb. All of the log is successfully committed and applied, and is thus reflected in the snapshot data. Prior to this change, we would still send the 200mb of log entries along with the snapshot, even though the snapshot itself already reflected them. After this change, we won't send any log entries along with the snapshot, as sanity would suggest we would. We were unable to make this change because up until recently, the Raft truncated state (which dictates the first log index) was replicated and consistency checked; this was changed in #34660. The migration introduced there makes it straightforward to omit a prefix of the log in snapshots, as done in this commit. Somewhere down the road (19.2?) we should localize all the log truncation decisions and simplify all this further. I suspect that in doing so we can avoid tracking the size of the Raft log in the first place; all we really need for this is some mechanism that makes sure that an "idle" replica truncates its logs. With unreplicated truncation, this becomes cheap enough to "just do". Release note (bug fix): Remove historical log entries from Raft snapshots. These log entries could lead to failed snapshots with a message such as: snapshot failed: aborting snapshot because raft log is too large (25911051 bytes after processing 7 of 37 entries) --- pkg/storage/client_raft_test.go | 48 ++++++++++++++---------------- pkg/storage/helpers_test.go | 8 ++--- pkg/storage/raft_log_queue_test.go | 37 ++++++++++++++++------- pkg/storage/replica_command.go | 22 ++++++++++---- pkg/storage/replica_raft.go | 43 +++++++++++++++++--------- pkg/storage/replica_raftstorage.go | 3 ++ 6 files changed, 101 insertions(+), 60 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 688bab6d029b..c15156e3231a 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -630,10 +630,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { startWithSingleRange: true, } defer mtc.Stop() - mtc.Start(t, 3) + mtc.Start(t, 1) const rangeID = 1 - mtc.replicateRange(rangeID, 1, 2) repl, err := mtc.stores[0].GetReplica(rangeID) if err != nil { @@ -647,8 +646,6 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { t.Fatal(err) } - mtc.waitForValues(key, []int64{5, 5, 5}) - index, err := repl.GetLastIndex() if err != nil { t.Fatal(err) @@ -656,28 +653,24 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { // Verifies the recomputed log size against what we track in `r.mu.raftLogSize`. assertCorrectRaftLogSize := func() error { - for _, s := range mtc.stores { - repl, err := s.GetReplica(rangeID) - if err != nil { - t.Fatal(err) - } - - // Recompute under raft lock so that the log doesn't change while we - // compute its size. - repl.RaftLock() - realSize, err := storage.ComputeRaftLogSize( - context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(), - ) - size := repl.GetRaftLogSize() - repl.RaftUnlock() + // Recompute under raft lock so that the log doesn't change while we + // compute its size. + repl.RaftLock() + realSize, err := storage.ComputeRaftLogSize( + context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(), + ) + size, _ := repl.GetRaftLogSize() + repl.RaftUnlock() - if err != nil { - t.Fatal(err) - } + if err != nil { + t.Fatal(err) + } - if size != realSize { - return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize) - } + // If the size isn't trusted, it won't have to match (and in fact + // likely won't). In this test, this is because the upreplication + // elides old Raft log entries in the snapshot it uses. + if size != realSize { + return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize) } return nil } @@ -690,6 +683,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { t.Fatal(err) } + // Note that if there were multiple nodes, the Raft log sizes would not + // be correct for the followers as they would have received a shorter + // Raft log than the leader. assert.NoError(t, assertCorrectRaftLogSize()) } @@ -1304,7 +1300,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { } // Determine the current raft log size. - initLogSize := leaderRepl.GetRaftLogSize() + initLogSize, _ := leaderRepl.GetRaftLogSize() // While a majority nodes are down, write some data. putRes := make(chan *roachpb.Error) @@ -1335,7 +1331,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { // etc.). The important thing here is that the log doesn't grow // forever. logSizeLimit := int64(2 * sc.RaftMaxUncommittedEntriesSize) - curlogSize := leaderRepl.GetRaftLogSize() + curlogSize, _ := leaderRepl.GetRaftLogSize() logSize := curlogSize - initLogSize logSizeStr := humanizeutil.IBytes(logSize) // Note that logSize could be negative if something got truncated. diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index cd57b763c6a6..6eaf779025c4 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -322,12 +322,12 @@ func (r *Replica) ShouldBackpressureWrites() bool { return r.shouldBackpressureWrites() } -// GetRaftLogSize returns the approximate raft log size. See r.mu.raftLogSize -// for details. -func (r *Replica) GetRaftLogSize() int64 { +// GetRaftLogSize returns the approximate raft log size and whether it is +// trustworthy.. See r.mu.raftLogSize for details. +func (r *Replica) GetRaftLogSize() (int64, bool) { r.mu.RLock() defer r.mu.RUnlock() - return r.mu.raftLogSize + return r.mu.raftLogSize, r.mu.raftLogSizeTrusted } // GetCachedLastTerm returns the cached last term value. May return diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index ec2a6fe1bff4..d756d58f9470 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -887,20 +887,32 @@ func TestTruncateLogRecompute(t *testing.T) { key := roachpb.Key("a") repl := tc.store.LookupReplica(keys.MustAddr(key)) - var v roachpb.Value - v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5)) - put := roachpb.NewPut(key, v) - var ba roachpb.BatchRequest - ba.Add(put) - ba.RangeID = repl.RangeID - - if _, pErr := tc.store.Send(ctx, ba); pErr != nil { - t.Fatal(pErr) + trusted := func() bool { + repl.mu.Lock() + defer repl.mu.Unlock() + return repl.mu.raftLogSizeTrusted } + put := func() { + var v roachpb.Value + v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5)) + put := roachpb.NewPut(key, v) + var ba roachpb.BatchRequest + ba.Add(put) + ba.RangeID = repl.RangeID + + if _, pErr := tc.store.Send(ctx, ba); pErr != nil { + t.Fatal(pErr) + } + } + + put() + decision, err := newTruncateDecision(ctx, repl) assert.NoError(t, err) assert.True(t, decision.ShouldTruncate()) + // Should never trust initially, until recomputed at least once. + assert.False(t, trusted()) repl.mu.Lock() repl.mu.raftLogSizeTrusted = false @@ -913,5 +925,10 @@ func TestTruncateLogRecompute(t *testing.T) { // grown over threshold again; we compute instead that its size is correct). tc.store.SetRaftLogQueueActive(true) tc.store.MustForceRaftLogScanAndProcess() - verifyLogSizeInSync(t, repl) + + for i := 0; i < 2; i++ { + verifyLogSizeInSync(t, repl) + assert.True(t, trusted()) + put() // make sure we remain trusted and in sync + } } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index ebee6802ae1e..497e216c08ce 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -888,12 +888,6 @@ func (r *Replica) sendSnapshot( return &benignError{errors.New("raft status not initialized")} } - // TODO(tbg): send snapshots without the past raft log. This means replacing - // the state's truncated state with one whose index and term equal that of - // the RaftAppliedIndex of the snapshot. It looks like the code sending out - // the actual entries will do the right thing from then on (see anchor - // below). - _ = (*kvBatchSnapshotStrategy)(nil).Send usesReplicatedTruncatedState, err := engine.MVCCGetProto( ctx, snap.EngineSnap, keys.RaftTruncatedStateLegacyKey(r.RangeID), hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, ) @@ -901,6 +895,22 @@ func (r *Replica) sendSnapshot( return errors.Wrap(err, "loading legacy truncated state") } + if !usesReplicatedTruncatedState && snap.State.TruncatedState.Index < snap.State.RaftAppliedIndex { + // If we're not using a legacy (replicated) truncated state, we avoid + // sending the (past) Raft log in the snapshot in the first place and + // send only those entries that are actually useful to the follower. + // This is done by changing the truncated state, which we're allowed + // to do since it is not a replicated key (and thus not subject to + // matching across replicas). The actual sending happens here: + _ = (*kvBatchSnapshotStrategy)(nil).Send + // and results in no log entries being sent at all. Note that + // Metadata.Index is really the applied index of the replica. + snap.State.TruncatedState = &roachpb.RaftTruncatedState{ + Index: snap.RaftSnap.Metadata.Index, + Term: snap.RaftSnap.Metadata.Term, + } + } + req := SnapshotRequest_Header{ State: snap.State, // Tell the recipient whether it needs to synthesize the new diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 2f1577eb6f8c..21fc60ed6f0f 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2333,21 +2333,20 @@ func (r *Replica) applyRaftCommand( return storagepb.ReplicatedEvalResult{}, err } if !apply { - // TODO(tbg): As written, there is low confidence that nil'ing out - // the truncated state has the desired effect as our caller actually - // applies the side effects. It may have taken a copy and won't - // observe what we did. - // - // It's very difficult to test this functionality because of how - // difficult it is to test (*Replica).processRaftCommand (and this - // method). Instead of adding yet another terrible that that bends - // reality to its will in some clunky way, assert that we're never - // hitting this branch, which is supposed to be true until we stop - // sending the raft log in snapshots (#34287). - // Morally we would want to drop the command in checkForcedErrLocked, - // but that may be difficult to achieve. - log.Fatal(ctx, log.Safe(fmt.Sprintf("TruncatedState regressed:\nold: %+v\nnew: %+v", oldTruncatedState, rResult.State.TruncatedState))) + // The truncated state was discarded, so make sure we don't apply + // it to our in-memory state. rResult.State.TruncatedState = nil + rResult.RaftLogDelta = 0 + // We received a truncation that doesn't apply to us, so we know that + // there's a leaseholder out there with a log that has earlier entries + // than ours. That leader also guided our log size computations by + // giving us RaftLogDeltas for past truncations, and this was likely + // off. Mark our Raft log size is not trustworthy so that, assuming + // we step up as leader at some point in the future, we recompute + // our numbers. + r.mu.Lock() + r.mu.raftLogSizeTrusted = false + r.mu.Unlock() } } @@ -2392,6 +2391,22 @@ func (r *Replica) applyRaftCommand( return rResult, nil } +// handleTruncatedStateBelowRaft is called when a Raft command updates the truncated +// state. This isn't 100% trivial for two reasons: +// - in 19.1 we're making the TruncatedState key unreplicated, so there's a migration +// - we're making use of the above by not sending the Raft log in snapshots (the truncated +// state effectively determines the first index of the log, which requires it to be unreplicated). +// Updates to the HardState are sent out by a leaseholder truncating the log based on its local +// knowledge. For example, the leader might have a log 10..100 and truncates to 50, and will send +// out a TruncatedState with Index 50 to that effect. However, some replicas may not even have log +// entries that old, and must make sure to ignore this update to the truncated state, as it would +// otherwise clobber their "newer" truncated state. +// +// The returned boolean tells the caller whether to apply the truncated state's +// side effects, which means replacing the in-memory TruncatedState and applying +// the associated RaftLogDelta. It is usually expected to be true, but may not +// be for the first truncation after on a replica that recently received a +// snapshot. func handleTruncatedStateBelowRaft( ctx context.Context, oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState, diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 0791a4201f97..e4ea93acff52 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -999,6 +999,9 @@ func (r *Replica) applySnapshot( // by r.leasePostApply, but we called those above, so now it's safe to // wholesale replace r.mu.state. r.mu.state = s + // Snapshots typically have less log entries than the leaseholder. The next + // time we hold the lease, recompute the log size before making decisions. + r.mu.raftLogSizeTrusted = false r.assertStateLocked(ctx, r.store.Engine()) r.mu.Unlock()