From d3aae3ef88eb1b08021d60fb63c0267bd7297201 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 29 Jan 2019 12:16:38 +0100 Subject: [PATCH 1/5] storage: rename RaftTruncatedState -> LegacyRaftTruncatedState No functional changes, just preparing to introduce the shiny new unreplicated raft truncated state. Release note: None --- pkg/cli/debug.go | 2 +- pkg/cli/debug/print.go | 2 +- pkg/keys/constants.go | 4 ++-- pkg/keys/keys.go | 12 ++++++------ pkg/keys/keys_test.go | 2 +- pkg/keys/printer.go | 4 ++-- pkg/keys/printer_test.go | 2 +- pkg/storage/batcheval/cmd_truncate_log.go | 4 ++-- pkg/storage/rditer/replica_data_iter_test.go | 2 +- pkg/storage/replica_raftstorage.go | 6 +++--- pkg/storage/stateloader/stateloader.go | 20 ++++++++++---------- 11 files changed, 30 insertions(+), 30 deletions(-) diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index eefa7c54fb10..bc4f047e8895 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -665,7 +665,7 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error { return false, err } getReplicaInfo(rangeID).committedIndex = hs.Commit - case bytes.Equal(suffix, keys.LocalRaftTruncatedStateSuffix): + case bytes.Equal(suffix, keys.LocalRaftTruncatedStateLegacySuffix): var trunc roachpb.RaftTruncatedState if err := kv.Value.GetProto(&trunc); err != nil { return false, err diff --git a/pkg/cli/debug/print.go b/pkg/cli/debug/print.go index 18f02934b3e4..e5bcdc6b91ac 100644 --- a/pkg/cli/debug/print.go +++ b/pkg/cli/debug/print.go @@ -186,7 +186,7 @@ func tryRangeIDKey(kv engine.MVCCKeyValue) (string, error) { case bytes.Equal(suffix, keys.LocalRaftTombstoneSuffix): msg = &roachpb.RaftTombstone{} - case bytes.Equal(suffix, keys.LocalRaftTruncatedStateSuffix): + case bytes.Equal(suffix, keys.LocalRaftTruncatedStateLegacySuffix): msg = &roachpb.RaftTruncatedState{} case bytes.Equal(suffix, keys.LocalRangeLeaseSuffix): diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index b128db4fd434..ebee2fe9ea8a 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -121,8 +121,8 @@ var ( LocalRaftAppliedIndexLegacySuffix = []byte("rfta") // LocalRaftTombstoneSuffix is the suffix for the raft tombstone. LocalRaftTombstoneSuffix = []byte("rftb") - // LocalRaftTruncatedStateSuffix is the suffix for the RaftTruncatedState. - LocalRaftTruncatedStateSuffix = []byte("rftt") + // LocalRaftTruncatedStateLegacySuffix is the suffix for the RaftTruncatedState. + LocalRaftTruncatedStateLegacySuffix = []byte("rftt") // LocalRangeLeaseSuffix is the suffix for a range lease. LocalRangeLeaseSuffix = []byte("rll-") // LocalLeaseAppliedIndexLegacySuffix is the suffix for the applied lease index. diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index 1e05eb3761f5..b9af3cb223fe 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -252,9 +252,9 @@ func LeaseAppliedIndexLegacyKey(rangeID roachpb.RangeID) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).LeaseAppliedIndexLegacyKey() } -// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState. -func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key { - return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey() +// RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState. +func RaftTruncatedStateLegacyKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateLegacyKey() } // RangeFrozenStatusKey returns a system-local key for the frozen status. @@ -897,9 +897,9 @@ func (b RangeIDPrefixBuf) LeaseAppliedIndexLegacyKey() roachpb.Key { return append(b.replicatedPrefix(), LocalLeaseAppliedIndexLegacySuffix...) } -// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState. -func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key { - return append(b.replicatedPrefix(), LocalRaftTruncatedStateSuffix...) +// RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState. +func (b RangeIDPrefixBuf) RaftTruncatedStateLegacyKey() roachpb.Key { + return append(b.replicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...) } // RangeFrozenStatusKey returns a system-local key for the frozen status. diff --git a/pkg/keys/keys_test.go b/pkg/keys/keys_test.go index 8d282847f78f..090640d1cd56 100644 --- a/pkg/keys/keys_test.go +++ b/pkg/keys/keys_test.go @@ -146,7 +146,7 @@ func TestKeyAddressError(t *testing.T) { AbortSpanKey(0, uuid.MakeV4()), RaftTombstoneKey(0), RaftAppliedIndexLegacyKey(0), - RaftTruncatedStateKey(0), + RaftTruncatedStateLegacyKey(0), RangeLeaseKey(0), RangeStatsLegacyKey(0), RaftHardStateKey(0), diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index ebbbe36e05fe..af40f14d7c76 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -150,7 +150,7 @@ var ( ppFunc: raftLogKeyPrint, psFunc: raftLogKeyParse, }, - {name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateSuffix}, + {name: "LegacyRaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix}, {name: "RaftLastIndex", suffix: LocalRaftLastIndexSuffix}, {name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix}, {name: "RangeLastVerificationTimestamp", suffix: LocalRangeLastVerificationTimestampSuffixDeprecated}, @@ -575,7 +575,7 @@ func prettyPrintInternal(valDirs []encoding.Direction, key roachpb.Key, quoteRaw // /[rangeid]/RaftHardState "\x01s"+[rangeid]+"rfth" // /[rangeid]/RaftAppliedIndex "\x01s"+[rangeid]+"rfta" // /[rangeid]/RaftLog/logIndex:[logIndex] "\x01s"+[rangeid]+"rftl"+[logIndex] -// /[rangeid]/RaftTruncatedState "\x01s"+[rangeid]+"rftt" +// /[rangeid]/LegacyRaftTruncatedState "\x01s"+[rangeid]+"rftt" // /[rangeid]/RaftLastIndex "\x01s"+[rangeid]+"rfti" // /[rangeid]/RangeLastReplicaGCTimestamp "\x01s"+[rangeid]+"rlrt" // /[rangeid]/RangeLastVerificationTimestamp "\x01s"+[rangeid]+"rlvt" diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index b9cc118568bb..d9b449f39460 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -58,7 +58,7 @@ func TestPrettyPrint(t *testing.T) { {RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState"}, {RaftAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftAppliedIndex"}, {LeaseAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LeaseAppliedIndex"}, - {RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"}, + {RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LegacyRaftTruncatedState"}, {RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease"}, {RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats"}, {RangeTxnSpanGCThresholdKey(roachpb.RangeID(1000001)), `/Local/RangeID/1000001/r/RangeTxnSpanGCThreshold`}, diff --git a/pkg/storage/batcheval/cmd_truncate_log.go b/pkg/storage/batcheval/cmd_truncate_log.go index b27c17dbf30f..26ea6d5092df 100644 --- a/pkg/storage/batcheval/cmd_truncate_log.go +++ b/pkg/storage/batcheval/cmd_truncate_log.go @@ -34,7 +34,7 @@ func init() { func declareKeysTruncateLog( _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateKey(header.RangeID)}) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)}) prefix := keys.RaftLogPrefix(header.RangeID) spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}) } @@ -111,5 +111,5 @@ func TruncateLog( } pd.Replicated.RaftLogDelta = ms.SysBytes - return pd, MakeStateLoader(cArgs.EvalCtx).SetTruncatedState(ctx, batch, cArgs.Stats, tState) + return pd, MakeStateLoader(cArgs.EvalCtx).SetLegacyRaftTruncatedState(ctx, batch, cArgs.Stats, tState) } diff --git a/pkg/storage/rditer/replica_data_iter_test.go b/pkg/storage/rditer/replica_data_iter_test.go index 4cda60b8857a..b1f53f44cc21 100644 --- a/pkg/storage/rditer/replica_data_iter_test.go +++ b/pkg/storage/rditer/replica_data_iter_test.go @@ -85,7 +85,7 @@ func createRangeData( {keys.RangeLastGCKey(desc.RangeID), ts0}, {keys.RangeAppliedStateKey(desc.RangeID), ts0}, {keys.RaftAppliedIndexLegacyKey(desc.RangeID), ts0}, - {keys.RaftTruncatedStateKey(desc.RangeID), ts0}, + {keys.RaftTruncatedStateLegacyKey(desc.RangeID), ts0}, {keys.RangeLeaseKey(desc.RangeID), ts0}, {keys.LeaseAppliedIndexLegacyKey(desc.RangeID), ts0}, {keys.RangeStatsLegacyKey(desc.RangeID), ts0}, diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 74996fdae44b..4e1a6b88eb7c 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -212,7 +212,7 @@ func entries( } // No results, was it due to unavailability or truncation? - ts, err := rsl.LoadTruncatedState(ctx, e) + ts, err := rsl.LoadLegacyRaftTruncatedState(ctx, e) if err != nil { return nil, err } @@ -281,7 +281,7 @@ func term( // sideloaded entries. We only need the term, so this is what we do. ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */) if err == raft.ErrCompacted { - ts, err := rsl.LoadTruncatedState(ctx, eng) + ts, err := rsl.LoadLegacyRaftTruncatedState(ctx, eng) if err != nil { return 0, err } @@ -318,7 +318,7 @@ func (r *Replica) raftTruncatedStateLocked( if r.mu.state.TruncatedState != nil { return *r.mu.state.TruncatedState, nil } - ts, err := r.mu.stateLoader.LoadTruncatedState(ctx, r.store.Engine()) + ts, err := r.mu.stateLoader.LoadLegacyRaftTruncatedState(ctx, r.store.Engine()) if err != nil { return ts, err } diff --git a/pkg/storage/stateloader/stateloader.go b/pkg/storage/stateloader/stateloader.go index 5b5fb360c360..995b19311136 100644 --- a/pkg/storage/stateloader/stateloader.go +++ b/pkg/storage/stateloader/stateloader.go @@ -105,7 +105,7 @@ func (rsl StateLoader) Load( // The truncated state should not be optional (i.e. the pointer is // pointless), but it is and the migration is not worth it. - truncState, err := rsl.LoadTruncatedState(ctx, reader) + truncState, err := rsl.LoadLegacyRaftTruncatedState(ctx, reader) if err != nil { return storagepb.ReplicaState{}, err } @@ -138,7 +138,7 @@ func (rsl StateLoader) Save( if err := rsl.SetTxnSpanGCThreshold(ctx, eng, ms, state.TxnSpanGCThreshold); err != nil { return enginepb.MVCCStats{}, err } - if err := rsl.SetTruncatedState(ctx, eng, ms, state.TruncatedState); err != nil { + if err := rsl.SetLegacyRaftTruncatedState(ctx, eng, ms, state.TruncatedState); err != nil { return enginepb.MVCCStats{}, err } if state.UsingAppliedStateKey { @@ -416,21 +416,21 @@ func (rsl StateLoader) SetMVCCStats( return rsl.writeLegacyMVCCStatsInternal(ctx, eng, newMS) } -// LoadTruncatedState loads the truncated state. -func (rsl StateLoader) LoadTruncatedState( +// LoadLegacyRaftTruncatedState loads the truncated state. +func (rsl StateLoader) LoadLegacyRaftTruncatedState( ctx context.Context, reader engine.Reader, ) (roachpb.RaftTruncatedState, error) { var truncState roachpb.RaftTruncatedState if _, err := engine.MVCCGetProto( - ctx, reader, rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, + ctx, reader, rsl.RaftTruncatedStateLegacyKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, ); err != nil { return roachpb.RaftTruncatedState{}, err } return truncState, nil } -// SetTruncatedState overwrites the truncated state. -func (rsl StateLoader) SetTruncatedState( +// SetLegacyRaftTruncatedState overwrites the truncated state. +func (rsl StateLoader) SetLegacyRaftTruncatedState( ctx context.Context, eng engine.ReadWriter, ms *enginepb.MVCCStats, @@ -440,7 +440,7 @@ func (rsl StateLoader) SetTruncatedState( return errors.New("cannot persist empty RaftTruncatedState") } return engine.MVCCPutProto(ctx, eng, ms, - rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, nil, truncState) + rsl.RaftTruncatedStateLegacyKey(), hlc.Timestamp{}, nil, truncState) } // LoadGCThreshold loads the GC threshold. @@ -508,7 +508,7 @@ func (rsl StateLoader) LoadLastIndex(ctx context.Context, reader engine.Reader) if lastIndex == 0 { // The log is empty, which means we are either starting from scratch // or the entire log has been truncated away. - lastEnt, err := rsl.LoadTruncatedState(ctx, reader) + lastEnt, err := rsl.LoadLegacyRaftTruncatedState(ctx, reader) if err != nil { return 0, err } @@ -575,7 +575,7 @@ func (rsl StateLoader) SynthesizeRaftState(ctx context.Context, eng engine.ReadW if err != nil { return err } - truncState, err := rsl.LoadTruncatedState(ctx, eng) + truncState, err := rsl.LoadLegacyRaftTruncatedState(ctx, eng) if err != nil { return err } From 4ed9e5b00db715ad207cdd4186d36c4bf8a726e9 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 29 Jan 2019 12:36:46 +0100 Subject: [PATCH 2/5] batcheval: add reminder when implementing divergent truncated states When replicas can have divergent truncated states, we want to carry out truncations even if they seem pointless to the leaseholder, since the leaseholder might have a shorter log than other replicas. Release note: None --- pkg/storage/batcheval/cmd_truncate_log.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/batcheval/cmd_truncate_log.go b/pkg/storage/batcheval/cmd_truncate_log.go index 26ea6d5092df..ae61e3d53eb4 100644 --- a/pkg/storage/batcheval/cmd_truncate_log.go +++ b/pkg/storage/batcheval/cmd_truncate_log.go @@ -58,6 +58,7 @@ func TruncateLog( } // Have we already truncated this log? If so, just return without an error. + // TODO(tbg): remove this once followers can have divergent truncated states. firstIndex, err := cArgs.EvalCtx.GetFirstIndex() if err != nil { return result.Result{}, err From 2990b968c3c7aaae99a541fe0a6a8221602934ce Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 29 Jan 2019 12:59:14 +0100 Subject: [PATCH 3/5] keys: remove misleading overview over encoded keys The encoded range-local keys were mostly incorrect in that they were missing the replicated/unreplicated infix. Rather than trying to keep this comment up to date, readers should be directed to TestPrettyPrint which now conveniently logs all types of keys and their encoding. Release note: None --- pkg/keys/printer.go | 37 +++---------------------------------- pkg/keys/printer_test.go | 5 +++++ 2 files changed, 8 insertions(+), 34 deletions(-) diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index af40f14d7c76..33210ead7897 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -563,40 +563,9 @@ func prettyPrintInternal(valDirs []encoding.Direction, key roachpb.Key, quoteRaw return str } -// PrettyPrint prints the key in a human readable format: -// -// Key format Key value -// /Local/... "\x01"+... -// /Store/... "\x01s"+... -// /RangeID/... "\x01s"+[rangeid] -// /[rangeid]/AbortSpan/[id] "\x01s"+[rangeid]+"abc-"+[id] -// /[rangeid]/Lease "\x01s"+[rangeid]+"rfll" -// /[rangeid]/RaftTombstone "\x01s"+[rangeid]+"rftb" -// /[rangeid]/RaftHardState "\x01s"+[rangeid]+"rfth" -// /[rangeid]/RaftAppliedIndex "\x01s"+[rangeid]+"rfta" -// /[rangeid]/RaftLog/logIndex:[logIndex] "\x01s"+[rangeid]+"rftl"+[logIndex] -// /[rangeid]/LegacyRaftTruncatedState "\x01s"+[rangeid]+"rftt" -// /[rangeid]/RaftLastIndex "\x01s"+[rangeid]+"rfti" -// /[rangeid]/RangeLastReplicaGCTimestamp "\x01s"+[rangeid]+"rlrt" -// /[rangeid]/RangeLastVerificationTimestamp "\x01s"+[rangeid]+"rlvt" -// /[rangeid]/RangeStats "\x01s"+[rangeid]+"stat" -// /Range/... "\x01k"+... -// [key]/RangeDescriptor "\x01k"+[key]+"rdsc" -// [key]/Transaction/[id] "\x01k"+[key]+"txn-"+[txn-id] -// [key]/QueueLastProcessed/[queue] "\x01k"+[key]+"qlpt"+[queue] -// /Local/Max "\x02" -// -// /Meta1/[key] "\x02"+[key] -// /Meta2/[key] "\x03"+[key] -// /System/... "\x04" -// /NodeLiveness/[key] "\x04\0x00liveness-"+[key] -// /StatusNode/[key] "\x04status-node-"+[key] -// /System/Max "\x05" -// -// /Table/[key] [key] -// -// /Min "" -// /Max "\xff\xff" +// PrettyPrint prints the key in a human readable format, see TestPrettyPrint. +// The output does not indicate whether a key is part of the replicated or un- +// replicated keyspace. // // valDirs correspond to the encoding direction of each encoded value in key. // For example, table keys could have column values encoded in ascending or diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index d9b449f39460..4e3790be87cb 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -184,6 +184,11 @@ func TestPrettyPrint(t *testing.T) { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { keyInfo := MassagePrettyPrintedSpanForTest(PrettyPrint(nil /* valDirs */, test.key), nil) exp := MassagePrettyPrintedSpanForTest(test.exp, nil) + t.Logf(`---- test case #%d: +input: %q +output: %s +exp: %s +`, i+1, []byte(test.key), keyInfo, exp) if exp != keyInfo { t.Errorf("%d: expected %s, got %s", i, exp, keyInfo) } From 3878b12698dbbc422e007160dcb6712432c2c082 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 30 Jan 2019 16:32:35 +0100 Subject: [PATCH 4/5] storage: add Store.VisitReplicas Release note: None --- pkg/storage/store.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ed2925e8e95e..c5497e144017 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1788,6 +1788,13 @@ func (s *Store) GossipDeadReplicas(ctx context.Context) error { return s.cfg.Gossip.AddInfoProto(key, &deadReplicas, gossip.StoreTTL) } +// VisitReplicas invokes the visitor on the Store's Replicas until the visitor returns false. +// Replicas which are added to the Store after iteration begins may or may not be observed. +func (s *Store) VisitReplicas(visitor func(*Replica) bool) { + v := newStoreReplicaVisitor(s) + v.Visit(visitor) +} + // Bootstrap writes a new store ident to the underlying engine. To // ensure that no crufty data already exists in the engine, it scans // the engine contents before writing the new store ident. The engine From d0aa09e6afc802ee9d02f367925220c0e1f9a0aa Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 11 Feb 2019 22:26:45 +0100 Subject: [PATCH 5/5] storage: make RaftTruncatedState unreplicated MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See #34287. Today, Raft (or preemptive) snapshots include the past Raft log, that is, log entries which are already reflected in the state of the snapshot. Fundamentally, this is because we have historically used a replicated TruncatedState. TruncatedState essentially tells us what the first index in the log is (though it also includes a Term). If the TruncatedState cannot diverge across replicas, we *must* send the whole log in snapshots, as the first log index must match what the TruncatedState claims it is. The Raft log is typically, but not necessarily small. Log truncations are driven by a queue and use a complex decision process. That decision process can be faulty and even if it isn't, the queue could be held up. Besides, even when the Raft log contains only very few entries, these entries may be quite large (see SSTable ingestion during RESTORE). All this motivates that we don't want to (be forced to) send the Raft log as part of snapshots, and in turn we need the TruncatedState to be unreplicated. This change migrates the TruncatedState into unreplicated keyspace. It does not yet allow snapshots to avoid sending the past Raft log, but that is a relatively straightforward follow-up change. VersionUnreplicatedRaftTruncatedState, when active, moves the truncated state into unreplicated keyspace on log truncations. The migration works as follows: 1. at any log position, the replicas of a Range either use the new (unreplicated) key or the old one, and exactly one of them exists. 2. When a log truncation evaluates under the new cluster version, it initiates the migration by deleting the old key. Under the old cluster version, it behaves like today, updating the replicated truncated state. 3. The deletion signals new code downstream of Raft and triggers a write to the new, unreplicated, key (atomic with the deletion of the old key). 4. Future log truncations don't write any replicated data any more, but (like before) send along the TruncatedState which is written downstream of Raft atomically with the deletion of the log entries. This actually uses the same code as 3. What's new is that the truncated state needs to be verified before replacing a previous one. If replicas disagree about their truncated state, it's possible for replica X at FirstIndex=100 to apply a truncated state update that sets FirstIndex to, say, 50 (proposed by a replica with a "longer" historical log). In that case, the truncated state update must be ignored (this is straightforward downstream-of-Raft code). 5. When a split trigger evaluates, it seeds the RHS with the legacy key iff the LHS uses the legacy key, and the unreplicated key otherwise. This makes sure that the invariant that all replicas agree on the state of the migration is upheld. 6. When a snapshot is applied, the receiver is told whether the snapshot contains a legacy key. If not, it writes the truncated state (which is part of the snapshot metadata) in its unreplicated version. Otherwise it doesn't have to do anything (the range will migrate later). The following diagram visualizes the above. Note that it abuses sequence diagrams to get a nice layout; the vertical lines belonging to NewState and OldState don't imply any particular ordering of operations. ``` ┌────────┐ ┌────────┐ │OldState│ │NewState│ └───┬────┘ └───┬────┘ │ Bootstrap under old version │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ │ Bootstrap under new version │ │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │─ ─ ┐ │ | Log truncation under old version │< ─ ┘ │ │ │─ ─ ┐ │ │ | Snapshot │ │< ─ ┘ │ │ │ │ │─ ─ ┐ │ │ | Snapshot │ │< ─ ┘ │ │ │ Log truncation under new version │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─>│ │ │ │ │─ ─ ┐ │ │ | Log truncation under new version │ │< ─ ┘ │ │ │ │─ ─ ┐ │ │ | Log truncation under old version │ │< ─ ┘ (necessarily running new binary) ``` Source: http://www.plantuml.com/plantuml/uml/ and the following input: @startuml scale 600 width OldState <--] : Bootstrap under old version NewState <--] : Bootstrap under new version OldState --> OldState : Log truncation under old version OldState --> OldState : Snapshot NewState --> NewState : Snapshot OldState --> NewState : Log truncation under new version NewState --> NewState : Log truncation under new version NewState --> NewState : Log truncation under old version\n(necessarily running new binary) @enduml Release note: None --- docs/generated/settings/settings.html | 2 +- pkg/keys/constants.go | 3 +- pkg/keys/keys.go | 11 + pkg/keys/printer.go | 2 +- pkg/keys/printer_test.go | 3 +- pkg/server/version_cluster_test.go | 172 ++++++++++++++ pkg/settings/cluster/cockroach_versions.go | 93 ++++++++ .../testdata/logic_test/crdb_internal | 4 +- pkg/storage/batcheval/cmd_end_transaction.go | 24 ++ .../batcheval/cmd_resolve_intent_test.go | 19 +- pkg/storage/batcheval/cmd_truncate_log.go | 55 ++++- pkg/storage/batcheval/truncate_log_test.go | 200 ++++++++++++++++ pkg/storage/below_raft_protos_test.go | 11 + pkg/storage/engine/engine.go | 10 +- pkg/storage/main_test.go | 2 +- pkg/storage/raft.pb.go | 223 +++++++++++------- pkg/storage/raft.proto | 11 + pkg/storage/replica_command.go | 22 ++ pkg/storage/replica_raft.go | 135 ++++++++--- pkg/storage/replica_raft_truncation_test.go | 119 ++++++++++ pkg/storage/replica_raftstorage.go | 32 ++- pkg/storage/replica_test.go | 6 +- pkg/storage/stateloader/initial.go | 11 +- pkg/storage/stateloader/stateloader.go | 87 +++++-- pkg/storage/stats_test.go | 4 +- pkg/storage/store.go | 15 +- pkg/storage/store_snapshot.go | 11 +- pkg/storage/store_test.go | 2 + .../truncated_state_migration/migration | 25 ++ .../truncated_state_migration/pre_migration | 26 ++ 30 files changed, 1165 insertions(+), 175 deletions(-) create mode 100644 pkg/storage/batcheval/truncate_log_test.go create mode 100644 pkg/storage/replica_raft_truncation_test.go create mode 100644 pkg/storage/testdata/truncated_state_migration/migration create mode 100644 pkg/storage/testdata/truncated_state_migration/pre_migration diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index d8f8c6ba6121..914d334755a5 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -95,6 +95,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen in the /debug page trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set. -versioncustom validation2.1-5set the active cluster version in the format '.'. +versioncustom validation2.1-6set the active cluster version in the format '.'. diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index ebee2fe9ea8a..6bd5e0bcfcf3 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -121,7 +121,8 @@ var ( LocalRaftAppliedIndexLegacySuffix = []byte("rfta") // LocalRaftTombstoneSuffix is the suffix for the raft tombstone. LocalRaftTombstoneSuffix = []byte("rftb") - // LocalRaftTruncatedStateLegacySuffix is the suffix for the RaftTruncatedState. + // LocalRaftTruncatedStateLegacySuffix is the suffix for the legacy RaftTruncatedState. + // See VersionUnreplicatedRaftTruncatedState. LocalRaftTruncatedStateLegacySuffix = []byte("rftt") // LocalRangeLeaseSuffix is the suffix for a range lease. LocalRangeLeaseSuffix = []byte("rll-") diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index b9af3cb223fe..8d0d1e91f8f0 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -253,6 +253,7 @@ func LeaseAppliedIndexLegacyKey(rangeID roachpb.RangeID) roachpb.Key { } // RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState. +// See VersionUnreplicatedRaftTruncatedState. func RaftTruncatedStateLegacyKey(rangeID roachpb.RangeID) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateLegacyKey() } @@ -314,6 +315,11 @@ func RaftTombstoneKey(rangeID roachpb.RangeID) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftTombstoneKey() } +// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState. +func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey() +} + // RaftHardStateKey returns a system-local key for a Raft HardState. func RaftHardStateKey(rangeID roachpb.RangeID) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftHardStateKey() @@ -935,6 +941,11 @@ func (b RangeIDPrefixBuf) RaftTombstoneKey() roachpb.Key { return append(b.unreplicatedPrefix(), LocalRaftTombstoneSuffix...) } +// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState. +func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key { + return append(b.unreplicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...) +} + // RaftHardStateKey returns a system-local key for a Raft HardState. func (b RangeIDPrefixBuf) RaftHardStateKey() roachpb.Key { return append(b.unreplicatedPrefix(), LocalRaftHardStateSuffix...) diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 33210ead7897..3fe3a46e1c15 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -150,7 +150,7 @@ var ( ppFunc: raftLogKeyPrint, psFunc: raftLogKeyParse, }, - {name: "LegacyRaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix}, + {name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix}, {name: "RaftLastIndex", suffix: LocalRaftLastIndexSuffix}, {name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix}, {name: "RangeLastVerificationTimestamp", suffix: LocalRangeLastVerificationTimestampSuffixDeprecated}, diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index 4e3790be87cb..f3b679766d2e 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -58,7 +58,8 @@ func TestPrettyPrint(t *testing.T) { {RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState"}, {RaftAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftAppliedIndex"}, {LeaseAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LeaseAppliedIndex"}, - {RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LegacyRaftTruncatedState"}, + {RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"}, + {RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState"}, {RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease"}, {RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats"}, {RangeTxnSpanGCThresholdKey(roachpb.RangeID(1000001)), `/Local/RangeID/1000001/r/RangeTxnSpanGCThreshold`}, diff --git a/pkg/server/version_cluster_test.go b/pkg/server/version_cluster_test.go index afca6ccfd360..bc00d92ef0cb 100644 --- a/pkg/server/version_cluster_test.go +++ b/pkg/server/version_cluster_test.go @@ -24,16 +24,20 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) type testClusterWithHelpers struct { @@ -191,6 +195,174 @@ func TestClusterVersionPersistedOnJoin(t *testing.T) { } } +// TestClusterVersionUnreplicatedRaftTruncatedState exercises the +// VersionUnreplicatedRaftTruncatedState migration in as much detail as possible +// in a unit test. +// +// It starts a four node cluster with a pre-migration version and upgrades into +// the new version while traffic and scattering are active, verifying that the +// truncated states are rewritten. +func TestClusterVersionUnreplicatedRaftTruncatedState(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + dir, finish := testutils.TempDir(t) + defer finish() + + oldVersion := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState - 1) + oldVersionS := oldVersion.String() + newVersionS := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState).String() + + // Four node cluster in which all versions support newVersion (i.e. would in + // principle upgrade to it) but are bootstrapped at oldVersion. + versions := [][2]string{ + {oldVersionS, newVersionS}, + {oldVersionS, newVersionS}, + {oldVersionS, newVersionS}, + {oldVersionS, newVersionS}, + } + + bootstrapVersion := cluster.ClusterVersion{Version: oldVersion} + + knobs := base.TestingKnobs{ + Store: &storage.StoreTestingKnobs{ + BootstrapVersion: &bootstrapVersion, + }, + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + }, + } + + tc := setupMixedCluster(t, knobs, versions, dir) + defer tc.TestCluster.Stopper().Stop(ctx) + + if _, err := tc.ServerConn(0).Exec(` +CREATE TABLE kv (id INT PRIMARY KEY, v INT); +ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i); +`); err != nil { + t.Fatal(err) + } + + scatter := func() { + t.Helper() + if _, err := tc.ServerConn(0).Exec( + `ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i%$1+1], i FROM generate_series(0, 9) AS g(i)`, len(versions), + ); err != nil { + t.Log(err) + } + } + + var n int + insert := func() { + t.Helper() + n++ + // Write only to a subset of our ranges to guarantee log truncations there. + _, err := tc.ServerConn(0).Exec(`UPSERT INTO kv VALUES($1, $2)`, n%2, n) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 500; i++ { + insert() + } + scatter() + + for _, server := range tc.Servers { + assert.NoError(t, server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error { + s.VisitReplicas(func(r *storage.Replica) bool { + key := keys.RaftTruncatedStateKey(r.RangeID) + var truncState roachpb.RaftTruncatedState + found, err := engine.MVCCGetProto( + context.Background(), s.Engine(), key, + hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, + ) + if err != nil { + t.Fatal(err) + } + if found { + t.Errorf("unexpectedly found unreplicated TruncatedState at %s", key) + } + return true // want more + }) + return nil + })) + } + + if v := tc.getVersionFromSelect(0); v != oldVersionS { + t.Fatalf("running %s, wanted %s", v, oldVersionS) + } + + assert.NoError(t, tc.setVersion(0, newVersionS)) + for i := 0; i < 500; i++ { + insert() + } + scatter() + + for _, server := range tc.Servers { + testutils.SucceedsSoon(t, func() error { + err := server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error { + // We scattered and so old copies of replicas may be laying around. + // If we're not proactive about removing them, the test gets pretty + // slow because those replicas aren't caught up any more. + s.MustForceReplicaGCScanAndProcess() + var err error + s.VisitReplicas(func(r *storage.Replica) bool { + snap := s.Engine().NewSnapshot() + defer snap.Close() + + keyLegacy := keys.RaftTruncatedStateLegacyKey(r.RangeID) + keyUnreplicated := keys.RaftTruncatedStateKey(r.RangeID) + + if found, innerErr := engine.MVCCGetProto( + context.Background(), snap, keyLegacy, + hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, + ); innerErr != nil { + t.Fatal(innerErr) + } else if found { + if err == nil { + err = errors.New("found legacy TruncatedState") + } + err = errors.Wrap(err, r.String()) + + // Force a log truncation to prove that this rectifies + // the situation. + status := r.RaftStatus() + if status != nil { + desc := r.Desc() + truncate := &roachpb.TruncateLogRequest{} + truncate.Key = desc.StartKey.AsRawKey() + truncate.RangeID = desc.RangeID + truncate.Index = status.HardState.Commit + var ba roachpb.BatchRequest + ba.RangeID = r.RangeID + ba.Add(truncate) + if _, err := s.DB().NonTransactionalSender().Send(ctx, ba); err != nil { + t.Fatal(err) + } + } + return true // want more + } + + if found, err := engine.MVCCGetProto( + context.Background(), snap, keyUnreplicated, + hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, + ); err != nil { + t.Fatal(err) + } else if !found { + // We can't have neither of the keys present. + t.Fatalf("%s: unexpectedly did not find unreplicated TruncatedState at %s", r, keyUnreplicated) + } + + return true // want more + }) + return err + }) + return err + }) + } +} + func TestClusterVersionUpgrade(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() diff --git a/pkg/settings/cluster/cockroach_versions.go b/pkg/settings/cluster/cockroach_versions.go index cd151f156b2a..5601ca32d40e 100644 --- a/pkg/settings/cluster/cockroach_versions.go +++ b/pkg/settings/cluster/cockroach_versions.go @@ -71,6 +71,7 @@ const ( VersionExportStorageWorkload VersionLazyTxnRecord VersionSequencedReads + VersionUnreplicatedRaftTruncatedState // see versionsSingleton for details // Add new versions here (step one of two). @@ -315,6 +316,98 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Key: VersionSequencedReads, Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 5}, }, + { + // VersionLazyTxnRecord is https://github.com/cockroachdb/cockroach/pull/34660. + // When active, it moves the truncated state into unreplicated keyspace + // on log truncations. + // + // The migration works as follows: + // + // 1. at any log position, the replicas of a Range either use the new + // (unreplicated) key or the old one, and exactly one of them exists. + // + // 2. When a log truncation evaluates under the new cluster version, + // it initiates the migration by deleting the old key. Under the old cluster + // version, it behaves like today, updating the replicated truncated state. + // + // 3. The deletion signals new code downstream of Raft and triggers a write + // to the new, unreplicated, key (atomic with the deletion of the old key). + // + // 4. Future log truncations don't write any replicated data any more, but + // (like before) send along the TruncatedState which is written downstream + // of Raft atomically with the deletion of the log entries. This actually + // uses the same code as 3. + // What's new is that the truncated state needs to be verified before + // replacing a previous one. If replicas disagree about their truncated + // state, it's possible for replica X at FirstIndex=100 to apply a + // truncated state update that sets FirstIndex to, say, 50 (proposed by a + // replica with a "longer" historical log). In that case, the truncated + // state update must be ignored (this is straightforward downstream-of-Raft + // code). + // + // 5. When a split trigger evaluates, it seeds the RHS with the legacy + // key iff the LHS uses the legacy key, and the unreplicated key otherwise. + // This makes sure that the invariant that all replicas agree on the + // state of the migration is upheld. + // + // 6. When a snapshot is applied, the receiver is told whether the snapshot + // contains a legacy key. If not, it writes the truncated state (which is + // part of the snapshot metadata) in its unreplicated version. Otherwise + // it doesn't have to do anything (the range will migrate later). + // + // The following diagram visualizes the above. Note that it abuses sequence + // diagrams to get a nice layout; the vertical lines belonging to NewState + // and OldState don't imply any particular ordering of operations. + // + // ┌────────┐ ┌────────┐ + // │OldState│ │NewState│ + // └───┬────┘ └───┬────┘ + // │ Bootstrap under old version + // │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + // │ │ + // │ │ Bootstrap under new version + // │ │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + // │ │ + // │─ ─ ┐ + // │ | Log truncation under old version + // │< ─ ┘ + // │ │ + // │─ ─ ┐ │ + // │ | Snapshot │ + // │< ─ ┘ │ + // │ │ + // │ │─ ─ ┐ + // │ │ | Snapshot + // │ │< ─ ┘ + // │ │ + // │ Log truncation under new version │ + // │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─>│ + // │ │ + // │ │─ ─ ┐ + // │ │ | Log truncation under new version + // │ │< ─ ┘ + // │ │ + // │ │─ ─ ┐ + // │ │ | Log truncation under old version + // │ │< ─ ┘ (necessarily running new binary) + // + // Source: http://www.plantuml.com/plantuml/uml/ and the following input: + // + // @startuml + // scale 600 width + // + // OldState <--] : Bootstrap under old version + // NewState <--] : Bootstrap under new version + // OldState --> OldState : Log truncation under old version + // OldState --> OldState : Snapshot + // NewState --> NewState : Snapshot + // OldState --> NewState : Log truncation under new version + // NewState --> NewState : Log truncation under new version + // NewState --> NewState : Log truncation under old version\n(necessarily running new binary) + // @enduml + Key: VersionUnreplicatedRaftTruncatedState, + Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 6}, + }, // Add new versions here (step two of two). diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index c035e310449e..7f956f1f4ea5 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -268,7 +268,7 @@ select crdb_internal.set_vmodule('') query T select crdb_internal.node_executable_version() ---- -2.1-5 +2.1-6 query ITTT colnames select node_id, component, field, regexp_replace(regexp_replace(value, '^\d+$', ''), e':\\d+', ':') as value from crdb_internal.node_runtime_info @@ -365,7 +365,7 @@ select * from crdb_internal.gossip_alerts query T select crdb_internal.node_executable_version() ---- -2.1-5 +2.1-6 user root diff --git a/pkg/storage/batcheval/cmd_end_transaction.go b/pkg/storage/batcheval/cmd_end_transaction.go index 82a432c4884d..49fb8970c5c3 100644 --- a/pkg/storage/batcheval/cmd_end_transaction.go +++ b/pkg/storage/batcheval/cmd_end_transaction.go @@ -911,6 +911,29 @@ func splitTrigger( log.VEventf(ctx, 1, "LHS's TxnSpanGCThreshold of split is not set") } + // We're about to write the initial state for the replica. We migrated + // the formerly replicated truncated state into unreplicated keyspace + // in 2.2., but this range may still be using the replicated version + // and we need to make a decision about what to use for the RHS that + // is consistent across the followers: do for the RHS what the LHS + // does: if the LHS has the legacy key, initialize the RHS with a + // legacy key as well. + // + // See VersionUnreplicatedRaftTruncatedState. + truncStateType := stateloader.TruncatedStateUnreplicated + if found, err := engine.MVCCGetProto( + ctx, + batch, + keys.RaftTruncatedStateLegacyKey(rec.GetRangeID()), + hlc.Timestamp{}, + nil, + engine.MVCCGetOptions{}, + ); err != nil { + return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load legacy truncated state") + } else if found { + truncStateType = stateloader.TruncatedStateLegacyReplicated + } + // Writing the initial state is subtle since this also seeds the Raft // group. It becomes more subtle due to proposer-evaluated Raft. // @@ -944,6 +967,7 @@ func splitTrigger( ctx, batch, rightMS, split.RightDesc, rightLease, *gcThreshold, *txnSpanGCThreshold, rec.ClusterSettings().Version.Version().Version, + truncStateType, ) if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state") diff --git a/pkg/storage/batcheval/cmd_resolve_intent_test.go b/pkg/storage/batcheval/cmd_resolve_intent_test.go index 505eafe10119..d4d935fb5525 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent_test.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_test.go @@ -36,13 +36,14 @@ import ( ) type mockEvalCtx struct { - clusterSettings *cluster.Settings - desc *roachpb.RangeDescriptor - clock *hlc.Clock - stats enginepb.MVCCStats - qps float64 - abortSpan *abortspan.AbortSpan - gcThreshold hlc.Timestamp + clusterSettings *cluster.Settings + desc *roachpb.RangeDescriptor + clock *hlc.Clock + stats enginepb.MVCCStats + qps float64 + abortSpan *abortspan.AbortSpan + gcThreshold hlc.Timestamp + term, firstIndex uint64 } func (m *mockEvalCtx) String() string { @@ -85,10 +86,10 @@ func (m *mockEvalCtx) IsFirstRange() bool { panic("unimplemented") } func (m *mockEvalCtx) GetFirstIndex() (uint64, error) { - panic("unimplemented") + return m.firstIndex, nil } func (m *mockEvalCtx) GetTerm(uint64) (uint64, error) { - panic("unimplemented") + return m.term, nil } func (m *mockEvalCtx) GetLeaseAppliedIndex() uint64 { panic("unimplemented") diff --git a/pkg/storage/batcheval/cmd_truncate_log.go b/pkg/storage/batcheval/cmd_truncate_log.go index ae61e3d53eb4..1564f849baa4 100644 --- a/pkg/storage/batcheval/cmd_truncate_log.go +++ b/pkg/storage/batcheval/cmd_truncate_log.go @@ -19,10 +19,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) @@ -57,13 +59,33 @@ func TruncateLog( return result.Result{}, nil } - // Have we already truncated this log? If so, just return without an error. - // TODO(tbg): remove this once followers can have divergent truncated states. - firstIndex, err := cArgs.EvalCtx.GetFirstIndex() + var legacyTruncatedState roachpb.RaftTruncatedState + legacyKeyFound, err := engine.MVCCGetProto( + ctx, batch, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()), + hlc.Timestamp{}, &legacyTruncatedState, engine.MVCCGetOptions{}, + ) if err != nil { return result.Result{}, err } + // See the comment on the cluster version for all the moving parts involved + // in migrating into this cluster version. Note that if the legacy key is + // missing, the cluster version has been bumped (though we may not know it + // yet) and we keep using the unreplicated key. + useNewUnreplicatedTruncatedStateKey := cArgs.EvalCtx.ClusterSettings().Version.IsActive(cluster.VersionUnreplicatedRaftTruncatedState) || !legacyKeyFound + + firstIndex, err := cArgs.EvalCtx.GetFirstIndex() + if err != nil { + return result.Result{}, errors.Wrap(err, "getting first index") + } + // Have we already truncated this log? If so, just return without an error. + // Note that there may in principle be followers whose Raft log is longer + // than this node's, but to issue a truncation we also need the *term* for + // the new truncated state, which we can't obtain if we don't have the log + // entry ourselves. + // + // TODO(tbg): think about synthesizing a valid term. Can we use the next + // existing entry's term? if firstIndex >= args.Index { if log.V(3) { log.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d", @@ -75,9 +97,16 @@ func TruncateLog( // args.Index is the first index to keep. term, err := cArgs.EvalCtx.GetTerm(args.Index - 1) if err != nil { - return result.Result{}, err + return result.Result{}, errors.Wrap(err, "getting term") } + // Compute the number of bytes freed by this truncation. Note that this will + // only make sense for the leaseholder as we base this off its own first + // index (other replicas may have other first indexes assuming we're not + // still using the legacy truncated state key). In principle, this could be + // off either way, though in practice we don't expect followers to have + // a first index smaller than the leaseholder's (see #34287), and most of + // the time everyone's first index should be the same. start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(rangeID, firstIndex)) end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(rangeID, args.Index)) @@ -110,7 +139,23 @@ func TruncateLog( pd.Replicated.State = &storagepb.ReplicaState{ TruncatedState: tState, } + pd.Replicated.RaftLogDelta = ms.SysBytes - return pd, MakeStateLoader(cArgs.EvalCtx).SetLegacyRaftTruncatedState(ctx, batch, cArgs.Stats, tState) + if !useNewUnreplicatedTruncatedStateKey { + return pd, MakeStateLoader(cArgs.EvalCtx).SetLegacyRaftTruncatedState(ctx, batch, cArgs.Stats, tState) + } + if legacyKeyFound { + // Time to migrate by deleting the legacy key. The downstream-of-Raft + // code will atomically rewrite the truncated state (supplied via the + // side effect) into the new unreplicated key. + if err := engine.MVCCDelete( + ctx, batch, cArgs.Stats, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()), + hlc.Timestamp{}, nil, /* txn */ + ); err != nil { + return result.Result{}, err + } + } + + return pd, nil } diff --git a/pkg/storage/batcheval/truncate_log_test.go b/pkg/storage/batcheval/truncate_log_test.go new file mode 100644 index 000000000000..e9915697d89d --- /dev/null +++ b/pkg/storage/batcheval/truncate_log_test.go @@ -0,0 +1,200 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package batcheval + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func putTruncatedState( + t *testing.T, + eng engine.Engine, + rangeID roachpb.RangeID, + truncState roachpb.RaftTruncatedState, + legacy bool, +) { + key := keys.RaftTruncatedStateKey(rangeID) + if legacy { + key = keys.RaftTruncatedStateLegacyKey(rangeID) + } + if err := engine.MVCCPutProto( + context.Background(), eng, nil, key, + hlc.Timestamp{}, nil /* txn */, &truncState, + ); err != nil { + t.Fatal(err) + } +} + +func readTruncStates( + t *testing.T, eng engine.Engine, rangeID roachpb.RangeID, +) (legacy roachpb.RaftTruncatedState, unreplicated roachpb.RaftTruncatedState) { + t.Helper() + legacyFound, err := engine.MVCCGetProto( + context.Background(), eng, keys.RaftTruncatedStateLegacyKey(rangeID), + hlc.Timestamp{}, &legacy, engine.MVCCGetOptions{}, + ) + if err != nil { + t.Fatal(err) + } + if legacyFound != (legacy != roachpb.RaftTruncatedState{}) { + t.Fatalf("legacy key found=%t but state is %+v", legacyFound, legacy) + } + + unreplicatedFound, err := engine.MVCCGetProto( + context.Background(), eng, keys.RaftTruncatedStateKey(rangeID), + hlc.Timestamp{}, &unreplicated, engine.MVCCGetOptions{}, + ) + if err != nil { + t.Fatal(err) + } + if unreplicatedFound != (unreplicated != roachpb.RaftTruncatedState{}) { + t.Fatalf("unreplicated key found=%t but state is %+v", unreplicatedFound, unreplicated) + } + return +} + +const ( + expectationNeither = iota + expectationLegacy + expectationUnreplicated +) + +type unreplicatedTruncStateTest struct { + startsWithLegacy bool + hasVersionBumped bool + exp int // see consts above +} + +func TestTruncateLogUnreplicatedTruncatedState(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Follow the reference below for more information on what's being tested. + _ = cluster.VersionUnreplicatedRaftTruncatedState + + const ( + startsLegacy = true + startsUnreplicated = false + newVersion = true + oldVersion = false + ) + + testCases := []unreplicatedTruncStateTest{ + // Steady states: we have one type of TruncatedState and will end up with + // the same type: either we've already migrated, or we haven't but aren't + // allowed to migrate yet. + {startsUnreplicated, oldVersion, expectationUnreplicated}, + {startsUnreplicated, newVersion, expectationUnreplicated}, + {startsLegacy, oldVersion, expectationLegacy}, + // This is the case in which the migration is triggered. As a result, + // we see neither of the keys written. The new key will be written + // atomically as a side effect (outside of the scope of this test). + {startsLegacy, newVersion, expectationNeither}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) { + runUnreplicatedTruncatedState(t, tc) + }) + } +} + +func runUnreplicatedTruncatedState(t *testing.T, tc unreplicatedTruncStateTest) { + ctx := context.Background() + versionOff := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState - 1) + versionOn := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState) + st := cluster.MakeClusterSettings(versionOff, versionOn) + + if tc.hasVersionBumped { + assert.NoError(t, st.InitializeVersion(cluster.ClusterVersion{Version: versionOn})) + } else { + assert.NoError(t, st.InitializeVersion(cluster.ClusterVersion{Version: versionOff})) + } + + const ( + rangeID = 12 + term = 10 + firstIndex = 100 + ) + + evalCtx := mockEvalCtx{ + clusterSettings: st, + desc: &roachpb.RangeDescriptor{RangeID: rangeID}, + term: term, + firstIndex: firstIndex, + } + + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + defer eng.Close() + + truncState := roachpb.RaftTruncatedState{ + Index: firstIndex + 1, + Term: term, + } + + // Put down the TruncatedState specified by the test case. + putTruncatedState(t, eng, rangeID, truncState, tc.startsWithLegacy) + + // Send a truncation request. + req := roachpb.TruncateLogRequest{ + RangeID: rangeID, + Index: firstIndex + 7, + } + cArgs := CommandArgs{ + EvalCtx: &evalCtx, + Args: &req, + } + resp := &roachpb.TruncateLogResponse{} + res, err := TruncateLog(ctx, eng, cArgs, resp) + if err != nil { + t.Fatal(err) + } + + expTruncState := roachpb.RaftTruncatedState{ + Index: req.Index - 1, + Term: term, + } + + legacy, unreplicated := readTruncStates(t, eng, rangeID) + + switch tc.exp { + case expectationLegacy: + assert.Equal(t, expTruncState, legacy) + assert.Zero(t, unreplicated) + case expectationUnreplicated: + // The unreplicated key that we see should be the initial truncated + // state (it's only updated below Raft). + assert.Equal(t, truncState, unreplicated) + assert.Zero(t, legacy) + case expectationNeither: + assert.Zero(t, unreplicated) + assert.Zero(t, legacy) + default: + t.Fatalf("unknown expectation %d", tc.exp) + } + + assert.NotNil(t, res.Replicated.State) + assert.NotNil(t, res.Replicated.State.TruncatedState) + assert.Equal(t, expTruncState, *res.Replicated.State.TruncatedState) +} diff --git a/pkg/storage/below_raft_protos_test.go b/pkg/storage/below_raft_protos_test.go index c2e8f4833d54..69a56db7e817 100644 --- a/pkg/storage/below_raft_protos_test.go +++ b/pkg/storage/below_raft_protos_test.go @@ -118,6 +118,17 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ emptySum: 892800390935990883, populatedSum: 16231745342114354146, }, + // This is used downstream of Raft only to write it into unreplicated keyspace + // as part of VersionUnreplicatedRaftTruncatedState. + // However, it has been sent through Raft for a long time, as part of + // ReplicatedEvalResult. + reflect.TypeOf(&roachpb.RaftTruncatedState{}): { + populatedConstructor: func(r *rand.Rand) protoutil.Message { + return roachpb.NewPopulatedRaftTruncatedState(r, false) + }, + emptySum: 5531676819244041709, + populatedSum: 14781226418259198098, + }, } func TestBelowRaftProtos(t *testing.T) { diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index 00932fe2668e..b0ffa5e33bcb 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -352,11 +352,19 @@ type Batch interface { // Distinct returns a view of the existing batch which only sees writes that // were performed before the Distinct batch was created. That is, the // returned batch will not read its own writes, but it will read writes to - // the parent batch performed before the call to Distinct(). The returned + // the parent batch performed before the call to Distinct(), except if the + // parent batch is a WriteOnlyBatch, in which case the Distinct() batch will + // read from the underlying engine. + // + // The returned // batch needs to be closed before using the parent batch again. This is used // as an optimization to avoid flushing mutations buffered by the batch in // situations where we know all of the batched operations are for distinct // keys. + // + // TODO(tbg): it seems insane that you cannot read from a WriteOnlyBatch but + // you can read from a Distinct on top of a WriteOnlyBatch but randomly don't + // see the batch at all. I was personally just bitten by this. Distinct() ReadWriter // Empty returns whether the batch has been written to or not. Empty() bool diff --git a/pkg/storage/main_test.go b/pkg/storage/main_test.go index f333f5f48d67..551d87a8616b 100644 --- a/pkg/storage/main_test.go +++ b/pkg/storage/main_test.go @@ -68,7 +68,7 @@ func TestMain(m *testing.M) { delete(notBelowRaftProtos, typ) } else { failed = true - fmt.Printf("%s: missing fixture!\n", typ) + fmt.Printf("%s: missing fixture! Please adjust belowRaftGoldenProtos if necessary\n", typ) } } diff --git a/pkg/storage/raft.pb.go b/pkg/storage/raft.pb.go index d231257348f1..80b533dc71f9 100644 --- a/pkg/storage/raft.pb.go +++ b/pkg/storage/raft.pb.go @@ -70,7 +70,7 @@ func (x *SnapshotRequest_Priority) UnmarshalJSON(data []byte) error { return nil } func (SnapshotRequest_Priority) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5, 0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5, 0} } type SnapshotRequest_Strategy int32 @@ -107,7 +107,7 @@ func (x *SnapshotRequest_Strategy) UnmarshalJSON(data []byte) error { return nil } func (SnapshotRequest_Strategy) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5, 1} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5, 1} } type SnapshotResponse_Status int32 @@ -152,7 +152,7 @@ func (x *SnapshotResponse_Status) UnmarshalJSON(data []byte) error { return nil } func (SnapshotResponse_Status) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{6, 0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{6, 0} } // RaftHeartbeat is a request that contains the barebones information for a @@ -174,7 +174,7 @@ func (m *RaftHeartbeat) Reset() { *m = RaftHeartbeat{} } func (m *RaftHeartbeat) String() string { return proto.CompactTextString(m) } func (*RaftHeartbeat) ProtoMessage() {} func (*RaftHeartbeat) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{0} } func (m *RaftHeartbeat) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -232,7 +232,7 @@ func (m *RaftMessageRequest) Reset() { *m = RaftMessageRequest{} } func (m *RaftMessageRequest) String() string { return proto.CompactTextString(m) } func (*RaftMessageRequest) ProtoMessage() {} func (*RaftMessageRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{1} + return fileDescriptor_raft_06448cf81da2fcd5, []int{1} } func (m *RaftMessageRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -267,7 +267,7 @@ func (m *RaftMessageRequestBatch) Reset() { *m = RaftMessageRequestBatch func (m *RaftMessageRequestBatch) String() string { return proto.CompactTextString(m) } func (*RaftMessageRequestBatch) ProtoMessage() {} func (*RaftMessageRequestBatch) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{2} + return fileDescriptor_raft_06448cf81da2fcd5, []int{2} } func (m *RaftMessageRequestBatch) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -302,7 +302,7 @@ func (m *RaftMessageResponseUnion) Reset() { *m = RaftMessageResponseUni func (m *RaftMessageResponseUnion) String() string { return proto.CompactTextString(m) } func (*RaftMessageResponseUnion) ProtoMessage() {} func (*RaftMessageResponseUnion) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{3} + return fileDescriptor_raft_06448cf81da2fcd5, []int{3} } func (m *RaftMessageResponseUnion) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -347,7 +347,7 @@ func (m *RaftMessageResponse) Reset() { *m = RaftMessageResponse{} } func (m *RaftMessageResponse) String() string { return proto.CompactTextString(m) } func (*RaftMessageResponse) ProtoMessage() {} func (*RaftMessageResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{4} + return fileDescriptor_raft_06448cf81da2fcd5, []int{4} } func (m *RaftMessageResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -390,7 +390,7 @@ func (m *SnapshotRequest) Reset() { *m = SnapshotRequest{} } func (m *SnapshotRequest) String() string { return proto.CompactTextString(m) } func (*SnapshotRequest) ProtoMessage() {} func (*SnapshotRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5} } func (m *SnapshotRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -433,16 +433,26 @@ type SnapshotRequest_Header struct { // The priority of the snapshot. Priority SnapshotRequest_Priority `protobuf:"varint,6,opt,name=priority,enum=cockroach.storage.SnapshotRequest_Priority" json:"priority"` // The strategy of the snapshot. - Strategy SnapshotRequest_Strategy `protobuf:"varint,7,opt,name=strategy,enum=cockroach.storage.SnapshotRequest_Strategy" json:"strategy"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + Strategy SnapshotRequest_Strategy `protobuf:"varint,7,opt,name=strategy,enum=cockroach.storage.SnapshotRequest_Strategy" json:"strategy"` + // Whether the snapshot uses the unreplicated RaftTruncatedState or not. + // This is generally always true at 2.2 and above outside of the migration + // phase, though theoretically it could take a long time for all ranges + // to update to the new mechanism. This bool is true iff the Raft log at + // the snapshot's applied index is using the new key. In particular, it + // is true if the index itself carries out the migration (in which case + // the data in the snapshot contains neither key). + // + // See VersionUnreplicatedRaftTruncatedState. + UnreplicatedTruncatedState bool `protobuf:"varint,8,opt,name=unreplicated_truncated_state,json=unreplicatedTruncatedState" json:"unreplicated_truncated_state"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SnapshotRequest_Header) Reset() { *m = SnapshotRequest_Header{} } func (m *SnapshotRequest_Header) String() string { return proto.CompactTextString(m) } func (*SnapshotRequest_Header) ProtoMessage() {} func (*SnapshotRequest_Header) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{5, 0} + return fileDescriptor_raft_06448cf81da2fcd5, []int{5, 0} } func (m *SnapshotRequest_Header) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -478,7 +488,7 @@ func (m *SnapshotResponse) Reset() { *m = SnapshotResponse{} } func (m *SnapshotResponse) String() string { return proto.CompactTextString(m) } func (*SnapshotResponse) ProtoMessage() {} func (*SnapshotResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{6} + return fileDescriptor_raft_06448cf81da2fcd5, []int{6} } func (m *SnapshotResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -519,7 +529,7 @@ func (m *ConfChangeContext) Reset() { *m = ConfChangeContext{} } func (m *ConfChangeContext) String() string { return proto.CompactTextString(m) } func (*ConfChangeContext) ProtoMessage() {} func (*ConfChangeContext) Descriptor() ([]byte, []int) { - return fileDescriptor_raft_460a63b017d715a3, []int{7} + return fileDescriptor_raft_06448cf81da2fcd5, []int{7} } func (m *ConfChangeContext) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1053,6 +1063,14 @@ func (m *SnapshotRequest_Header) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x38 i++ i = encodeVarintRaft(dAtA, i, uint64(m.Strategy)) + dAtA[i] = 0x40 + i++ + if m.UnreplicatedTruncatedState { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ return i, nil } @@ -1256,6 +1274,7 @@ func (m *SnapshotRequest_Header) Size() (n int) { n += 1 + l + sovRaft(uint64(l)) n += 1 + sovRaft(uint64(m.Priority)) n += 1 + sovRaft(uint64(m.Strategy)) + n += 2 return n } @@ -2406,6 +2425,26 @@ func (m *SnapshotRequest_Header) Unmarshal(dAtA []byte) error { break } } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UnreplicatedTruncatedState", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.UnreplicatedTruncatedState = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -2770,80 +2809,82 @@ var ( ErrIntOverflowRaft = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("storage/raft.proto", fileDescriptor_raft_460a63b017d715a3) } - -var fileDescriptor_raft_460a63b017d715a3 = []byte{ - // 1147 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0xc5, - 0x17, 0xf7, 0xc6, 0xdf, 0xc7, 0x76, 0xb3, 0x9d, 0x7f, 0xf5, 0x67, 0x65, 0xc0, 0x36, 0x5b, 0x5a, - 0x99, 0x22, 0xad, 0x8b, 0x55, 0xb8, 0xe0, 0xce, 0x1f, 0xdb, 0xc6, 0x4d, 0xf3, 0xa1, 0x4d, 0x5b, - 0x04, 0x52, 0x65, 0x8d, 0xd7, 0x63, 0x7b, 0x15, 0x7b, 0x67, 0xbb, 0x3b, 0x2e, 0xb8, 0x4f, 0xc1, - 0x13, 0x20, 0x6e, 0xb8, 0xe1, 0x05, 0x78, 0x85, 0xdc, 0x20, 0x71, 0x59, 0x09, 0x14, 0x41, 0x78, - 0x8b, 0x5e, 0xa1, 0x99, 0x9d, 0x71, 0x9c, 0xc4, 0xd0, 0x04, 0x21, 0x6e, 0xb8, 0x49, 0xbc, 0xe7, - 0xcc, 0xef, 0x77, 0xf6, 0x9c, 0xdf, 0x39, 0x67, 0x16, 0x50, 0xc4, 0x68, 0x88, 0xc7, 0xa4, 0x11, - 0xe2, 0x11, 0xb3, 0x82, 0x90, 0x32, 0x8a, 0xae, 0xbb, 0xd4, 0x3d, 0x0c, 0x29, 0x76, 0x27, 0x96, - 0xf4, 0x96, 0x6f, 0x88, 0xc7, 0x60, 0xd0, 0x20, 0x61, 0x48, 0xc3, 0x28, 0x3e, 0x58, 0xfe, 0xbf, - 0xb2, 0xce, 0x08, 0xc3, 0x43, 0xcc, 0xb0, 0xb4, 0xbf, 0xab, 0x48, 0xe5, 0xff, 0x60, 0xd0, 0x88, - 0x18, 0x66, 0x44, 0xba, 0xdf, 0x26, 0xcc, 0x1d, 0x8a, 0x80, 0xe2, 0x4f, 0x30, 0x58, 0x09, 0x5e, - 0xbe, 0x31, 0xa6, 0x63, 0x2a, 0x7e, 0x36, 0xf8, 0xaf, 0xd8, 0x6a, 0xfe, 0x90, 0x84, 0x92, 0x83, - 0x47, 0x6c, 0x8b, 0xe0, 0x90, 0x0d, 0x08, 0x66, 0x68, 0x00, 0xb9, 0x10, 0xfb, 0x63, 0xd2, 0xf7, - 0x86, 0x86, 0x56, 0xd3, 0xea, 0xa9, 0xf6, 0x83, 0xa3, 0xe3, 0x6a, 0xe2, 0xe4, 0xb8, 0x9a, 0x75, - 0xb8, 0xbd, 0xd7, 0x7d, 0x7d, 0x5c, 0xbd, 0x37, 0xf6, 0xd8, 0x64, 0x3e, 0xb0, 0x5c, 0x3a, 0x6b, - 0x2c, 0x93, 0x1a, 0x0e, 0x4e, 0x7f, 0x37, 0x82, 0xc3, 0x71, 0x43, 0x66, 0x61, 0x49, 0x9c, 0x93, - 0x15, 0xc4, 0xbd, 0x21, 0xfa, 0x12, 0x36, 0x47, 0x21, 0x9d, 0xf5, 0x43, 0x12, 0x4c, 0x3d, 0x17, - 0xf3, 0x50, 0x1b, 0x35, 0xad, 0x5e, 0x6a, 0xef, 0xc9, 0x50, 0xa5, 0xfb, 0x21, 0x9d, 0x39, 0xb1, - 0x57, 0x04, 0xfc, 0xe4, 0x6a, 0x01, 0x15, 0xd2, 0x29, 0x8d, 0x56, 0x88, 0x86, 0xe8, 0x39, 0x94, - 0x18, 0x5d, 0x0d, 0x9b, 0x14, 0x61, 0x77, 0x64, 0xd8, 0xc2, 0x63, 0xfa, 0x4f, 0x04, 0x2d, 0x30, - 0x7a, 0x1a, 0xd2, 0x80, 0x14, 0x23, 0xe1, 0xcc, 0x48, 0x89, 0x5a, 0xa6, 0x78, 0x24, 0x47, 0x58, - 0xd0, 0x3b, 0x90, 0x71, 0xe9, 0x6c, 0xe6, 0x31, 0x23, 0xbd, 0xe2, 0x93, 0x36, 0x54, 0x81, 0xec, - 0xf3, 0xb9, 0x47, 0x22, 0x97, 0x18, 0x99, 0x9a, 0x56, 0xcf, 0x49, 0xb7, 0x32, 0x9a, 0x3f, 0xa7, - 0x00, 0x71, 0xe5, 0x76, 0x48, 0x14, 0xe1, 0x31, 0x71, 0xc8, 0xf3, 0x39, 0x89, 0xfe, 0x1d, 0xf9, - 0x76, 0xa0, 0xb8, 0x2a, 0x9f, 0xd0, 0xae, 0xd0, 0x7c, 0xdf, 0x3a, 0x6d, 0xef, 0x73, 0x35, 0xe9, - 0x92, 0xc8, 0x0d, 0xbd, 0x80, 0xd1, 0x50, 0x66, 0x51, 0x58, 0x91, 0x05, 0xf5, 0x00, 0x4e, 0x45, - 0x11, 0x8a, 0x5c, 0x8d, 0x2c, 0xbf, 0x2c, 0x37, 0x6a, 0x40, 0x76, 0x16, 0xd7, 0x43, 0xd4, 0xbb, - 0xd0, 0xdc, 0xb4, 0xe2, 0x49, 0xb0, 0x64, 0x99, 0x54, 0x15, 0xe5, 0xa9, 0xd5, 0x2a, 0xa7, 0xd7, - 0x54, 0x19, 0xdd, 0x07, 0x98, 0xa8, 0xd1, 0x88, 0x8c, 0x4c, 0x2d, 0x59, 0x2f, 0x34, 0x6b, 0xd6, - 0x85, 0x39, 0xb6, 0xce, 0xcc, 0x90, 0x24, 0x59, 0x41, 0xa2, 0x3d, 0xd8, 0x5c, 0x3e, 0xf5, 0x43, - 0x12, 0x05, 0x91, 0x91, 0xbd, 0x12, 0xd9, 0xb5, 0x25, 0xdc, 0xe1, 0x68, 0xf4, 0x0c, 0x36, 0x63, - 0x9d, 0x23, 0x86, 0x43, 0xd6, 0x3f, 0x24, 0x0b, 0x23, 0x57, 0xd3, 0xea, 0xc5, 0xf6, 0xc7, 0xaf, - 0x8f, 0xab, 0x1f, 0x5d, 0x4d, 0xdf, 0x6d, 0xb2, 0x70, 0x4a, 0x82, 0xed, 0x80, 0x93, 0x6d, 0x93, - 0x85, 0x39, 0x80, 0xb7, 0x2e, 0x36, 0x57, 0x1b, 0x33, 0x77, 0x82, 0x1e, 0x40, 0x2e, 0x8c, 0x9f, - 0x23, 0x43, 0x13, 0x39, 0xdc, 0xfa, 0x93, 0x1c, 0xce, 0xa1, 0xe3, 0x44, 0x96, 0x60, 0x73, 0x1f, - 0x8c, 0x33, 0xa7, 0xa2, 0x80, 0xfa, 0x11, 0x79, 0xe2, 0x7b, 0xd4, 0x47, 0x16, 0xa4, 0xc5, 0x46, - 0x14, 0x3d, 0x5c, 0x68, 0x1a, 0x6b, 0xda, 0xc1, 0xe6, 0x7e, 0x27, 0x3e, 0xf6, 0x69, 0xea, 0xe8, - 0xdb, 0xaa, 0x66, 0xfe, 0xb2, 0x01, 0xff, 0x5b, 0x43, 0xf9, 0x1f, 0x1f, 0x8a, 0x07, 0x90, 0x9e, - 0xf3, 0xa2, 0xca, 0x91, 0xf8, 0xf0, 0x4d, 0x6a, 0xad, 0xe8, 0x20, 0xc9, 0x62, 0xbc, 0xf9, 0x7d, - 0x1a, 0x36, 0x0f, 0x7c, 0x1c, 0x44, 0x13, 0xca, 0xd4, 0xbe, 0x69, 0x41, 0x66, 0x42, 0xf0, 0x90, - 0x28, 0xa5, 0x3e, 0x58, 0xc3, 0x7e, 0x0e, 0x63, 0x6d, 0x09, 0x80, 0x23, 0x81, 0xe8, 0x36, 0xe4, - 0x0e, 0x5f, 0xf4, 0x07, 0xbc, 0xb9, 0x44, 0xd5, 0x8a, 0xed, 0x02, 0x57, 0x66, 0xfb, 0xa9, 0xe8, - 0x37, 0x27, 0x7b, 0xf8, 0x22, 0x6e, 0xbc, 0x2a, 0x14, 0xa6, 0x74, 0xdc, 0x27, 0x3e, 0x0b, 0x3d, - 0x12, 0x19, 0xc9, 0x5a, 0xb2, 0x5e, 0x74, 0x60, 0x4a, 0xc7, 0x76, 0x6c, 0x41, 0x65, 0x48, 0x8f, - 0x3c, 0x1f, 0x4f, 0x45, 0xa2, 0x6a, 0x94, 0x63, 0x53, 0xf9, 0x9b, 0x24, 0x64, 0xe2, 0xb8, 0xe8, - 0x19, 0xdc, 0xe0, 0x4b, 0xa1, 0x2f, 0x77, 0x40, 0x5f, 0x36, 0xa4, 0x54, 0xec, 0x4a, 0xcd, 0x8c, - 0xc2, 0x8b, 0x1b, 0xf8, 0x26, 0x80, 0x9c, 0x4c, 0xef, 0x25, 0x11, 0xca, 0x25, 0x95, 0x26, 0xf1, - 0x8c, 0x79, 0x2f, 0x09, 0xba, 0x05, 0x05, 0x17, 0xfb, 0xfd, 0x21, 0x71, 0xa7, 0x9e, 0x4f, 0xce, - 0xbc, 0x30, 0xb8, 0xd8, 0xef, 0xc6, 0x76, 0x64, 0x43, 0x5a, 0x5c, 0xf0, 0x62, 0x39, 0xad, 0x2f, - 0xee, 0xf2, 0x53, 0x40, 0xb5, 0xc2, 0x01, 0x07, 0xa8, 0xe4, 0x05, 0x1a, 0xed, 0x40, 0x2e, 0x08, - 0x3d, 0x1a, 0x7a, 0x6c, 0x21, 0x2e, 0x93, 0x6b, 0x6b, 0x9b, 0xe0, 0xbc, 0x4c, 0xfb, 0x12, 0xa2, - 0x06, 0x57, 0x51, 0x70, 0xba, 0x88, 0x85, 0x98, 0x91, 0xf1, 0xc2, 0xc8, 0x5e, 0x9a, 0xee, 0x40, - 0x42, 0x14, 0x9d, 0xa2, 0x78, 0x98, 0xca, 0x69, 0xfa, 0x86, 0x79, 0x0f, 0x72, 0x2a, 0x20, 0x2a, - 0x40, 0xf6, 0xc9, 0xee, 0xf6, 0xee, 0xde, 0x67, 0xbb, 0x7a, 0x02, 0x15, 0x21, 0xe7, 0xd8, 0x9d, - 0xbd, 0xa7, 0xb6, 0xf3, 0xb9, 0xae, 0xa1, 0x12, 0xe4, 0x1d, 0xbb, 0xdd, 0x7a, 0xd4, 0xda, 0xed, - 0xd8, 0xfa, 0x86, 0x69, 0x40, 0x4e, 0xf1, 0xf2, 0x83, 0xdb, 0x4f, 0xfb, 0xed, 0xd6, 0xe3, 0xce, - 0x96, 0x9e, 0x30, 0x7f, 0xd4, 0x40, 0x3f, 0x7d, 0x05, 0xb9, 0x08, 0xb6, 0x20, 0xc3, 0x2b, 0x32, - 0x8f, 0x44, 0xb7, 0x5e, 0x6b, 0xde, 0xf9, 0xcb, 0xf7, 0x8e, 0x41, 0xd6, 0x81, 0x40, 0xa8, 0xeb, - 0x39, 0xc6, 0xf3, 0x8b, 0x43, 0xdd, 0x34, 0xbc, 0x6f, 0xf2, 0xe7, 0x2e, 0x16, 0xb3, 0x07, 0x99, - 0x18, 0x77, 0x21, 0x99, 0x56, 0xa7, 0x63, 0xef, 0x3f, 0xb6, 0xbb, 0xba, 0xc6, 0x5d, 0xad, 0xfd, - 0xfd, 0x47, 0x3d, 0xbb, 0xab, 0x6f, 0xa0, 0x3c, 0xa4, 0x6d, 0xc7, 0xd9, 0x73, 0xf4, 0x24, 0x3f, - 0xd5, 0xb5, 0x3b, 0x8f, 0x7a, 0xbb, 0x76, 0x57, 0x4f, 0x3d, 0x4c, 0xe5, 0x92, 0x7a, 0xca, 0xfc, - 0x4e, 0x83, 0xeb, 0x1d, 0xea, 0x8f, 0x3a, 0x13, 0xde, 0x44, 0x1d, 0xea, 0x33, 0xf2, 0x15, 0x43, - 0x77, 0x01, 0xf8, 0xf7, 0x02, 0xf6, 0x87, 0x6a, 0xb7, 0xe5, 0xdb, 0xd7, 0xe5, 0x6e, 0xcb, 0x77, - 0x62, 0x4f, 0xaf, 0xeb, 0xe4, 0xe5, 0x21, 0xf1, 0x3d, 0x92, 0x0d, 0xf0, 0x62, 0x4a, 0x71, 0xfc, - 0xcd, 0x55, 0x74, 0xd4, 0x23, 0xea, 0x42, 0xf6, 0xef, 0xef, 0x1b, 0x05, 0x6d, 0xbe, 0xd2, 0x20, - 0xbf, 0x33, 0x9f, 0x32, 0x8f, 0x0f, 0x0d, 0x9a, 0x82, 0xbe, 0x32, 0x3c, 0xf1, 0x1c, 0xdf, 0xb9, - 0xdc, 0x84, 0xf1, 0xb3, 0xe5, 0xdb, 0x97, 0x5b, 0x56, 0x66, 0xa2, 0xae, 0xdd, 0xd5, 0xd0, 0x33, - 0x28, 0x72, 0xa7, 0x52, 0x10, 0x99, 0x6f, 0x6e, 0xcb, 0xf2, 0xcd, 0x4b, 0xb4, 0x40, 0x4c, 0xdf, - 0x7e, 0xef, 0xe8, 0xb7, 0x4a, 0xe2, 0xe8, 0xa4, 0xa2, 0xfd, 0x74, 0x52, 0xd1, 0x5e, 0x9d, 0x54, - 0xb4, 0x5f, 0x4f, 0x2a, 0xda, 0xd7, 0xbf, 0x57, 0x12, 0x5f, 0x64, 0x25, 0xf2, 0x8f, 0x00, 0x00, - 0x00, 0xff, 0xff, 0xd4, 0x72, 0xff, 0x08, 0xf7, 0x0b, 0x00, 0x00, +func init() { proto.RegisterFile("storage/raft.proto", fileDescriptor_raft_06448cf81da2fcd5) } + +var fileDescriptor_raft_06448cf81da2fcd5 = []byte{ + // 1172 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xdd, 0x6e, 0x1b, 0x55, + 0x10, 0xce, 0xc6, 0xff, 0x63, 0xbb, 0x71, 0x0f, 0x15, 0xac, 0x4c, 0x71, 0xcc, 0x96, 0x56, 0xa6, + 0x48, 0x76, 0x89, 0x0a, 0x17, 0xdc, 0xc5, 0xf6, 0xb6, 0x71, 0xd3, 0xfc, 0x68, 0x93, 0x16, 0x81, + 0x54, 0x59, 0xc7, 0xeb, 0x13, 0x7b, 0x15, 0x7b, 0xcf, 0xf6, 0xec, 0x71, 0x21, 0x7d, 0x0a, 0x1e, + 0x81, 0x1b, 0x9e, 0x81, 0x57, 0xc8, 0x0d, 0x12, 0x97, 0x95, 0x40, 0x11, 0x04, 0x89, 0x07, 0xe0, + 0xb2, 0x57, 0xe8, 0xfc, 0x25, 0x9b, 0xc4, 0xd0, 0x04, 0x21, 0x6e, 0xb8, 0xb1, 0x77, 0x67, 0xe6, + 0x9b, 0xd9, 0x99, 0x6f, 0x66, 0xce, 0x01, 0x14, 0x73, 0xca, 0xf0, 0x88, 0xb4, 0x18, 0xde, 0xe3, + 0xcd, 0x88, 0x51, 0x4e, 0xd1, 0x75, 0x9f, 0xfa, 0xfb, 0x8c, 0x62, 0x7f, 0xdc, 0xd4, 0xda, 0xea, + 0x0d, 0xf9, 0x1a, 0x0d, 0x5a, 0x84, 0x31, 0xca, 0x62, 0x65, 0x58, 0x7d, 0xdb, 0x48, 0xa7, 0x84, + 0xe3, 0x21, 0xe6, 0x58, 0xcb, 0xdf, 0x33, 0x4e, 0xf5, 0x7f, 0x34, 0x68, 0xc5, 0x1c, 0x73, 0xa2, + 0xd5, 0xef, 0x12, 0xee, 0x0f, 0x65, 0x40, 0xf9, 0x13, 0x0d, 0x12, 0xc1, 0xab, 0x37, 0x46, 0x74, + 0x44, 0xe5, 0x63, 0x4b, 0x3c, 0x29, 0xa9, 0xf3, 0x7d, 0x0a, 0xca, 0x1e, 0xde, 0xe3, 0x6b, 0x04, + 0x33, 0x3e, 0x20, 0x98, 0xa3, 0x01, 0xe4, 0x19, 0x0e, 0x47, 0xa4, 0x1f, 0x0c, 0x6d, 0xab, 0x6e, + 0x35, 0xd2, 0xed, 0x87, 0x87, 0x47, 0xcb, 0x0b, 0xc7, 0x47, 0xcb, 0x39, 0x4f, 0xc8, 0x7b, 0xdd, + 0xd7, 0x47, 0xcb, 0xf7, 0x47, 0x01, 0x1f, 0xcf, 0x06, 0x4d, 0x9f, 0x4e, 0x5b, 0x27, 0x49, 0x0d, + 0x07, 0xa7, 0xcf, 0xad, 0x68, 0x7f, 0xd4, 0xd2, 0x59, 0x34, 0x35, 0xce, 0xcb, 0x49, 0xc7, 0xbd, + 0x21, 0xfa, 0x0a, 0x96, 0xf6, 0x18, 0x9d, 0xf6, 0x19, 0x89, 0x26, 0x81, 0x8f, 0x45, 0xa8, 0xc5, + 0xba, 0xd5, 0x28, 0xb7, 0xb7, 0x74, 0xa8, 0xf2, 0x03, 0x46, 0xa7, 0x9e, 0xd2, 0xca, 0x80, 0x9f, + 0x5e, 0x2d, 0xa0, 0x41, 0x7a, 0xe5, 0xbd, 0x84, 0xa3, 0x21, 0x7a, 0x0e, 0x65, 0x4e, 0x93, 0x61, + 0x53, 0x32, 0xec, 0x86, 0x0e, 0x5b, 0xdc, 0xa5, 0xff, 0x46, 0xd0, 0x22, 0xa7, 0xa7, 0x21, 0x6d, + 0x48, 0x73, 0xc2, 0xa6, 0x76, 0x5a, 0xd6, 0x32, 0x2d, 0x22, 0x79, 0x52, 0x82, 0x6e, 0x42, 0xd6, + 0xa7, 0xd3, 0x69, 0xc0, 0xed, 0x4c, 0x42, 0xa7, 0x65, 0xa8, 0x06, 0xb9, 0xe7, 0xb3, 0x80, 0xc4, + 0x3e, 0xb1, 0xb3, 0x75, 0xab, 0x91, 0xd7, 0x6a, 0x23, 0x74, 0x7e, 0x4a, 0x03, 0x12, 0xcc, 0x6d, + 0x90, 0x38, 0xc6, 0x23, 0xe2, 0x91, 0xe7, 0x33, 0x12, 0xff, 0x37, 0xf4, 0x6d, 0x40, 0x29, 0x49, + 0x9f, 0xe4, 0xae, 0xb8, 0xf2, 0x41, 0xf3, 0xb4, 0xbd, 0xcf, 0xd5, 0xa4, 0x4b, 0x62, 0x9f, 0x05, + 0x11, 0xa7, 0x4c, 0x67, 0x51, 0x4c, 0xd0, 0x82, 0x7a, 0x00, 0xa7, 0xa4, 0x48, 0x46, 0xae, 0xe6, + 0xac, 0x70, 0x52, 0x6e, 0xd4, 0x82, 0xdc, 0x54, 0xd5, 0x43, 0xd6, 0xbb, 0xb8, 0xb2, 0xd4, 0x54, + 0x93, 0xd0, 0xd4, 0x65, 0x32, 0x55, 0xd4, 0x56, 0xc9, 0x2a, 0x67, 0xe6, 0x54, 0x19, 0x3d, 0x00, + 0x18, 0x9b, 0xd1, 0x88, 0xed, 0x6c, 0x3d, 0xd5, 0x28, 0xae, 0xd4, 0x9b, 0x17, 0xe6, 0xb8, 0x79, + 0x66, 0x86, 0xb4, 0x93, 0x04, 0x12, 0x6d, 0xc1, 0xd2, 0xc9, 0x5b, 0x9f, 0x91, 0x38, 0x8a, 0xed, + 0xdc, 0x95, 0x9c, 0x5d, 0x3b, 0x81, 0x7b, 0x02, 0x8d, 0x9e, 0xc1, 0x92, 0xe2, 0x39, 0xe6, 0x98, + 0xf1, 0xfe, 0x3e, 0x39, 0xb0, 0xf3, 0x75, 0xab, 0x51, 0x6a, 0x7f, 0xf2, 0xfa, 0x68, 0xf9, 0xe3, + 0xab, 0xf1, 0xbb, 0x4e, 0x0e, 0xbc, 0xb2, 0xf4, 0xb6, 0x23, 0x9c, 0xad, 0x93, 0x03, 0x67, 0x00, + 0xef, 0x5c, 0x6c, 0xae, 0x36, 0xe6, 0xfe, 0x18, 0x3d, 0x84, 0x3c, 0x53, 0xef, 0xb1, 0x6d, 0xc9, + 0x1c, 0x6e, 0xff, 0x45, 0x0e, 0xe7, 0xd0, 0x2a, 0x91, 0x13, 0xb0, 0xb3, 0x0d, 0xf6, 0x19, 0xab, + 0x38, 0xa2, 0x61, 0x4c, 0x9e, 0x84, 0x01, 0x0d, 0x51, 0x13, 0x32, 0x72, 0x23, 0xca, 0x1e, 0x2e, + 0xae, 0xd8, 0x73, 0xda, 0xc1, 0x15, 0x7a, 0x4f, 0x99, 0x7d, 0x96, 0x3e, 0xfc, 0x76, 0xd9, 0x72, + 0x7e, 0x5e, 0x84, 0xb7, 0xe6, 0xb8, 0xfc, 0x9f, 0x0f, 0xc5, 0x43, 0xc8, 0xcc, 0x44, 0x51, 0xf5, + 0x48, 0x7c, 0xf4, 0x26, 0xb6, 0x12, 0x3c, 0x68, 0x67, 0x0a, 0xef, 0xfc, 0x91, 0x81, 0xa5, 0x9d, + 0x10, 0x47, 0xf1, 0x98, 0x72, 0xb3, 0x6f, 0x56, 0x21, 0x3b, 0x26, 0x78, 0x48, 0x0c, 0x53, 0x1f, + 0xce, 0xf1, 0x7e, 0x0e, 0xd3, 0x5c, 0x93, 0x00, 0x4f, 0x03, 0xd1, 0x1d, 0xc8, 0xef, 0xbf, 0xe8, + 0x0f, 0x44, 0x73, 0xc9, 0xaa, 0x95, 0xda, 0x45, 0xc1, 0xcc, 0xfa, 0x53, 0xd9, 0x6f, 0x5e, 0x6e, + 0xff, 0x85, 0x6a, 0xbc, 0x65, 0x28, 0x4e, 0xe8, 0xa8, 0x4f, 0x42, 0xce, 0x02, 0x12, 0xdb, 0xa9, + 0x7a, 0xaa, 0x51, 0xf2, 0x60, 0x42, 0x47, 0xae, 0x92, 0xa0, 0x2a, 0x64, 0xf6, 0x82, 0x10, 0x4f, + 0x64, 0xa2, 0x66, 0x94, 0x95, 0xa8, 0xfa, 0x7b, 0x0a, 0xb2, 0x2a, 0x2e, 0x7a, 0x06, 0x37, 0xc4, + 0x52, 0xe8, 0xeb, 0x1d, 0xd0, 0xd7, 0x0d, 0xa9, 0x19, 0xbb, 0x52, 0x33, 0x23, 0x76, 0x71, 0x03, + 0xdf, 0x02, 0xd0, 0x93, 0x19, 0xbc, 0x24, 0x92, 0xb9, 0x94, 0xe1, 0x44, 0xcd, 0x58, 0xf0, 0x92, + 0xa0, 0xdb, 0x50, 0xf4, 0x71, 0xd8, 0x1f, 0x12, 0x7f, 0x12, 0x84, 0xe4, 0xcc, 0x07, 0x83, 0x8f, + 0xc3, 0xae, 0x92, 0x23, 0x17, 0x32, 0xf2, 0x80, 0x97, 0xcb, 0x69, 0x7e, 0x71, 0x4f, 0xae, 0x02, + 0xa6, 0x15, 0x76, 0x04, 0xc0, 0x24, 0x2f, 0xd1, 0x68, 0x03, 0xf2, 0x11, 0x0b, 0x28, 0x0b, 0xf8, + 0x81, 0x3c, 0x4c, 0xae, 0xcd, 0x6d, 0x82, 0xf3, 0x34, 0x6d, 0x6b, 0x88, 0x19, 0x5c, 0xe3, 0x42, + 0xb8, 0x8b, 0x39, 0xc3, 0x9c, 0x8c, 0x0e, 0xec, 0xdc, 0xa5, 0xdd, 0xed, 0x68, 0x88, 0x71, 0x67, + 0x5c, 0xa0, 0x07, 0x70, 0x73, 0x16, 0xea, 0x4e, 0xe7, 0x64, 0xd8, 0xe7, 0x6c, 0x16, 0xaa, 0x27, + 0x95, 0x7b, 0x3e, 0x51, 0x9c, 0x6a, 0xd2, 0x72, 0xd7, 0x18, 0xca, 0x94, 0x1f, 0xa5, 0xf3, 0x56, + 0x65, 0xd1, 0xb9, 0x0f, 0x79, 0xf3, 0xe1, 0xa8, 0x08, 0xb9, 0x27, 0x9b, 0xeb, 0x9b, 0x5b, 0x9f, + 0x6f, 0x56, 0x16, 0x50, 0x09, 0xf2, 0x9e, 0xdb, 0xd9, 0x7a, 0xea, 0x7a, 0x5f, 0x54, 0x2c, 0x54, + 0x86, 0x82, 0xe7, 0xb6, 0x57, 0x1f, 0xaf, 0x6e, 0x76, 0xdc, 0xca, 0xa2, 0x63, 0x43, 0xde, 0x7c, + 0x9f, 0x30, 0x5c, 0x7f, 0xda, 0x6f, 0xaf, 0xee, 0x76, 0xd6, 0x2a, 0x0b, 0xce, 0x0f, 0x16, 0x54, + 0x4e, 0x53, 0xd1, 0x0b, 0x65, 0x0d, 0xb2, 0xe2, 0xdb, 0x66, 0xb1, 0xec, 0xfa, 0x6b, 0x2b, 0x77, + 0xff, 0x36, 0x7f, 0x05, 0x6a, 0xee, 0x48, 0x84, 0x39, 0xe6, 0x15, 0x5e, 0x1c, 0x40, 0xe6, 0xc4, + 0x12, 0xfd, 0x57, 0x38, 0x77, 0x40, 0x39, 0x3d, 0xc8, 0x2a, 0xdc, 0x85, 0x64, 0x56, 0x3b, 0x1d, + 0x77, 0x7b, 0xd7, 0xed, 0x56, 0x2c, 0xa1, 0x5a, 0xdd, 0xde, 0x7e, 0xdc, 0x73, 0xbb, 0x95, 0x45, + 0x54, 0x80, 0x8c, 0xeb, 0x79, 0x5b, 0x5e, 0x25, 0x25, 0xac, 0xba, 0x6e, 0xe7, 0x71, 0x6f, 0xd3, + 0xed, 0x56, 0xd2, 0x8f, 0xd2, 0xf9, 0x54, 0x25, 0xed, 0x7c, 0x67, 0xc1, 0xf5, 0x0e, 0x0d, 0xf7, + 0x3a, 0x63, 0xd1, 0x8c, 0x1d, 0x1a, 0x72, 0xf2, 0x35, 0x47, 0xf7, 0x00, 0xc4, 0xbd, 0x03, 0x87, + 0x43, 0xb3, 0x23, 0x0b, 0xed, 0xeb, 0x7a, 0x47, 0x16, 0x3a, 0x4a, 0xd3, 0xeb, 0x7a, 0x05, 0x6d, + 0x24, 0xef, 0x35, 0xb9, 0x08, 0x1f, 0x4c, 0x28, 0x56, 0x77, 0xb7, 0x92, 0x67, 0x5e, 0x51, 0x17, + 0x72, 0xff, 0x7c, 0x6f, 0x19, 0xe8, 0xca, 0x2b, 0x0b, 0x0a, 0x1b, 0xb3, 0x09, 0x0f, 0xc4, 0xf0, + 0xa1, 0x09, 0x54, 0x12, 0x43, 0xa8, 0xf6, 0xc1, 0xdd, 0xcb, 0x4d, 0xaa, 0xb0, 0xad, 0xde, 0xb9, + 0xdc, 0xd2, 0x73, 0x16, 0x1a, 0xd6, 0x3d, 0x0b, 0x3d, 0x83, 0x92, 0x50, 0x1a, 0x06, 0x91, 0xf3, + 0xe6, 0xf6, 0xae, 0xde, 0xba, 0x44, 0x0b, 0x28, 0xf7, 0xed, 0xf7, 0x0f, 0x7f, 0xad, 0x2d, 0x1c, + 0x1e, 0xd7, 0xac, 0x1f, 0x8f, 0x6b, 0xd6, 0xab, 0xe3, 0x9a, 0xf5, 0xcb, 0x71, 0xcd, 0xfa, 0xe6, + 0xb7, 0xda, 0xc2, 0x97, 0x39, 0x8d, 0xfc, 0x33, 0x00, 0x00, 0xff, 0xff, 0xde, 0x5c, 0x11, 0x4b, + 0x3f, 0x0c, 0x00, 0x00, } diff --git a/pkg/storage/raft.proto b/pkg/storage/raft.proto index 6a9faa355c01..9457f95a67c0 100644 --- a/pkg/storage/raft.proto +++ b/pkg/storage/raft.proto @@ -148,6 +148,17 @@ message SnapshotRequest { // The strategy of the snapshot. optional Strategy strategy = 7 [(gogoproto.nullable) = false]; + + // Whether the snapshot uses the unreplicated RaftTruncatedState or not. + // This is generally always true at 2.2 and above outside of the migration + // phase, though theoretically it could take a long time for all ranges + // to update to the new mechanism. This bool is true iff the Raft log at + // the snapshot's applied index is using the new key. In particular, it + // is true if the index itself carries out the migration (in which case + // the data in the snapshot contains neither key). + // + // See VersionUnreplicatedRaftTruncatedState. + optional bool unreplicated_truncated_state = 8 [(gogoproto.nullable) = false]; } optional Header header = 1; diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 22996086e46e..c25de5761740 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/causer" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -898,8 +899,29 @@ func (r *Replica) sendSnapshot( return &benignError{errors.New("raft status not initialized")} } + // TODO(tbg): send snapshots without the past raft log. This means replacing + // the state's truncated state with one whose index and term equal that of + // the RaftAppliedIndex of the snapshot. It looks like the code sending out + // the actual entries will do the right thing from then on (see anchor + // below). + _ = (*kvBatchSnapshotStrategy)(nil).Send + usesReplicatedTruncatedState, err := engine.MVCCGetProto( + ctx, snap.EngineSnap, keys.RaftTruncatedStateLegacyKey(r.RangeID), hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, + ) + if err != nil { + return errors.Wrap(err, "loading legacy truncated state") + } + req := SnapshotRequest_Header{ State: snap.State, + // Tell the recipient whether it needs to synthesize the new + // unreplicated TruncatedState. It could tell by itself by peeking into + // the data, but it uses a write only batch for performance which + // doesn't support that; this is easier. Notably, this is true if the + // snap index itself is the one at which the migration happens. + // + // See VersionUnreplicatedRaftTruncatedState. + UnreplicatedTruncatedState: !usesReplicatedTruncatedState, RaftMessageRequest: RaftMessageRequest{ RangeID: r.RangeID, FromReplica: fromRepDesc, diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index dd2541ee8ab6..13f34b45c9b9 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2241,7 +2241,16 @@ func (r *Replica) applyRaftCommand( oldRaftAppliedIndex, raftAppliedIndex) } - batch := r.store.Engine().NewWriteOnlyBatch() + haveTruncatedState := rResult.State != nil && rResult.State.TruncatedState != nil + var batch engine.Batch + if !haveTruncatedState { + batch = r.store.Engine().NewWriteOnlyBatch() + } else { + // When we update the truncated state, we may need to read the batch + // and can't use a WriteOnlyBatch. This is fine since log truncations + // are tiny batches. + batch = r.store.Engine().NewBatch() + } defer batch.Close() if writeBatch != nil { @@ -2251,8 +2260,7 @@ func (r *Replica) applyRaftCommand( } // The only remaining use of the batch is for range-local keys which we know - // have not been previously written within this batch. Currently the only - // remaining writes are the raft applied index and the updated MVCC stats. + // have not been previously written within this batch. writer := batch.Distinct() // Special-cased MVCC stats handling to exploit commutativity of stats delta @@ -2304,30 +2312,27 @@ func (r *Replica) applyRaftCommand( } } - if rResult.State != nil && rResult.State.TruncatedState != nil { - newTruncatedState := rResult.State.TruncatedState - - // Truncate the Raft log from the entry after the previous - // truncation index to the new truncation index. This is performed - // atomically with the raft command application so that the - // TruncatedState index is always consistent with the state of the - // Raft log itself. We can use the distinct writer because we know - // all writes will be to distinct keys. - // - // Intentionally don't use range deletion tombstones (ClearRange()) - // due to performance concerns connected to having many range - // deletion tombstones. There is a chance that ClearRange will - // perform well here because the tombstones could be "collapsed", - // but it is hardly worth the risk at this point. - prefixBuf := &r.raftMu.stateLoader.RangeIDPrefixBuf - for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ { - // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to - // avoid allocating when constructing Raft log keys (16 bytes). - unsafeKey := prefixBuf.RaftLogKey(idx) - if err := writer.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { - err = errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) - return enginepb.MVCCStats{}, err - } + if haveTruncatedState { + apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer) + if err != nil { + return enginepb.MVCCStats{}, err + } + if !apply { + // TODO(tbg): As written, there is low confidence that nil'ing out + // the truncated state has the desired effect as our caller actually + // applies the side effects. It may have taken a copy and won't + // observe what we did. + // + // It's very difficult to test this functionality because of how + // difficult it is to test (*Replica).processRaftCommand (and this + // method). Instead of adding yet another terrible that that bends + // reality to its will in some clunky way, assert that we're never + // hitting this branch, which is supposed to be true until we stop + // sending the raft log in snapshots (#34287). + // Morally we would want to drop the command in checkForcedErrLocked, + // but that may be difficult to achieve. + log.Fatal(ctx, log.Safe(fmt.Sprintf("TruncatedState regressed:\nold: %+v\nnew: %+v", oldTruncatedState, rResult.State.TruncatedState))) + rResult.State.TruncatedState = nil } } @@ -2370,3 +2375,79 @@ func (r *Replica) applyRaftCommand( r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) return deltaStats, nil } + +func handleTruncatedStateBelowRaft( + ctx context.Context, + oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState, + loader stateloader.StateLoader, + distinctEng engine.ReadWriter, +) (_apply bool, _ error) { + // If this is a log truncation, load the resulting unreplicated or legacy + // replicated truncated state (in that order). If the migration is happening + // in this command, the result will be an empty message. In steady state + // after the migration, it's the unreplicated truncated state not taking + // into account the current truncation (since the key is unreplicated). + // Either way, we'll update it below. + // + // See VersionUnreplicatedRaftTruncatedState for details. + truncStatePostApply, truncStateIsLegacy, err := loader.LoadRaftTruncatedState(ctx, distinctEng) + if err != nil { + return false, errors.Wrap(err, "loading truncated state") + } + + // Truncate the Raft log from the entry after the previous + // truncation index to the new truncation index. This is performed + // atomically with the raft command application so that the + // TruncatedState index is always consistent with the state of the + // Raft log itself. We can use the distinct writer because we know + // all writes will be to distinct keys. + // + // Intentionally don't use range deletion tombstones (ClearRange()) + // due to performance concerns connected to having many range + // deletion tombstones. There is a chance that ClearRange will + // perform well here because the tombstones could be "collapsed", + // but it is hardly worth the risk at this point. + prefixBuf := &loader.RangeIDPrefixBuf + for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ { + // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to + // avoid allocating when constructing Raft log keys (16 bytes). + unsafeKey := prefixBuf.RaftLogKey(idx) + if err := distinctEng.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { + return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) + } + } + + if !truncStateIsLegacy { + if truncStatePostApply.Index < newTruncatedState.Index { + // There are two cases here (though handled just the same). In the + // first case, the Raft command has just deleted the legacy + // replicated truncated state key as part of the migration (so + // truncStateIsLegacy is now false for the first time and + // truncStatePostApply is zero) and we need to atomically write the + // new, unreplicated, key. Or we've already migrated earlier, in + // which case truncStatePostApply equals the current value of the + // new key (which wasn't touched by the batch), and we need to + // overwrite it if this truncation "moves it forward". + + // NB: before the first log truncation evaluated under the + // cluster version which activates this code (see anchor below) this + // block is never reached as truncStatePostApply will equal newTruncatedState. + _ = cluster.VersionUnreplicatedRaftTruncatedState + + if err := engine.MVCCPutProto( + ctx, distinctEng, nil /* ms */, prefixBuf.RaftTruncatedStateKey(), + hlc.Timestamp{}, nil /* txn */, newTruncatedState, + ); err != nil { + return false, errors.Wrap(err, "unable to migrate RaftTruncatedState") + } + // Have migrated and this new truncated state is moving us forward. + // Tell caller that we applied it and that so should they. + return true, nil + } + // Have migrated, but this truncated state moves the existing one + // backwards, so instruct caller to not update in-memory state. + return false, nil + } + // Haven't migrated yet, don't ever discard the update. + return true, nil +} diff --git a/pkg/storage/replica_raft_truncation_test.go b/pkg/storage/replica_raft_truncation_test.go new file mode 100644 index 000000000000..81caed41acc4 --- /dev/null +++ b/pkg/storage/replica_raft_truncation_test.go @@ -0,0 +1,119 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/stateloader" + "github.com/cockroachdb/cockroach/pkg/testutils/datadriven" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +func TestHandleTruncatedStateBelowRaft(t *testing.T) { + defer leaktest.AfterTest(t)() + + // This test verifies the expected behavior of the downstream-of-Raft log + // truncation code, in particular regarding the migration below: + _ = cluster.VersionUnreplicatedRaftTruncatedState + + ctx := context.Background() + + // neither exists (migration) + // old one exists (no migration) + // new one exists (migrated already) + // truncstate regresses + + var prevTruncatedState roachpb.RaftTruncatedState + datadriven.Walk(t, "testdata/truncated_state_migration", func(t *testing.T, path string) { + const rangeID = 12 + loader := stateloader.Make(rangeID) + eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) + defer eng.Close() + + datadriven.RunTest(t, path, func(d *datadriven.TestData) string { + switch d.Cmd { + case "prev": + d.ScanArgs(t, "index", &prevTruncatedState.Index) + d.ScanArgs(t, "term", &prevTruncatedState.Term) + return "" + case "put": + var index uint64 + var term uint64 + var legacy bool + d.ScanArgs(t, "index", &index) + d.ScanArgs(t, "term", &term) + d.ScanArgs(t, "legacy", &legacy) + + truncState := &roachpb.RaftTruncatedState{ + Index: index, + Term: term, + } + + if legacy { + assert.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState)) + } else { + assert.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState)) + } + return "" + case "handle": + var buf bytes.Buffer + + var index uint64 + var term uint64 + d.ScanArgs(t, "index", &index) + d.ScanArgs(t, "term", &term) + + newTruncatedState := &roachpb.RaftTruncatedState{ + Index: index, + Term: term, + } + + apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng) + if err != nil { + return err.Error() + } + fmt.Fprintf(&buf, "apply: %t\n", apply) + + for _, key := range []roachpb.Key{ + keys.RaftTruncatedStateLegacyKey(rangeID), + keys.RaftTruncatedStateKey(rangeID), + } { + var truncatedState roachpb.RaftTruncatedState + ok, err := engine.MVCCGetProto(ctx, eng, key, hlc.Timestamp{}, &truncatedState, engine.MVCCGetOptions{}) + if err != nil { + t.Fatal(err) + } + if !ok { + continue + } + fmt.Fprintf(&buf, "%s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term) + } + return buf.String() + default: + } + return fmt.Sprintf("unsupported: %s", d.Cmd) + }) + }) +} diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 4e1a6b88eb7c..695942ae56b3 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -212,7 +212,7 @@ func entries( } // No results, was it due to unavailability or truncation? - ts, err := rsl.LoadLegacyRaftTruncatedState(ctx, e) + ts, _, err := rsl.LoadRaftTruncatedState(ctx, e) if err != nil { return nil, err } @@ -281,7 +281,7 @@ func term( // sideloaded entries. We only need the term, so this is what we do. ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */) if err == raft.ErrCompacted { - ts, err := rsl.LoadLegacyRaftTruncatedState(ctx, eng) + ts, _, err := rsl.LoadRaftTruncatedState(ctx, eng) if err != nil { return 0, err } @@ -318,7 +318,7 @@ func (r *Replica) raftTruncatedStateLocked( if r.mu.state.TruncatedState != nil { return *r.mu.state.TruncatedState, nil } - ts, err := r.mu.stateLoader.LoadLegacyRaftTruncatedState(ctx, r.store.Engine()) + ts, _, err := r.mu.stateLoader.LoadRaftTruncatedState(ctx, r.store.Engine()) if err != nil { return ts, err } @@ -488,8 +488,17 @@ type IncomingSnapshot struct { // The Raft log entries for this snapshot. LogEntries [][]byte // The replica state at the time the snapshot was generated (never nil). - State *storagepb.ReplicaState - snapType string + State *storagepb.ReplicaState + // + // When true, this snapshot contains an unreplicated TruncatedState. When + // false, the TruncatedState is replicated (see the reference below) and the + // recipient must avoid also writing the unreplicated TruncatedState. The + // migration to an unreplicated TruncatedState will be carried out during + // the next log truncation (assuming cluster version is bumped at that + // point). + // See the comment on VersionUnreplicatedRaftTruncatedState for details. + UsesUnreplicatedTruncatedState bool + snapType string } // snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the @@ -861,6 +870,19 @@ func (r *Replica) applySnapshot( distinctBatch := batch.Distinct() stats.batch = timeutil.Now() + if inSnap.UsesUnreplicatedTruncatedState { + // We're using the unreplicated truncated state, which we need to + // manually persist to disk. If we're not taking this branch, the + // snapshot contains a legacy TruncatedState and we don't need to do + // anything (in fact, must not -- the invariant is that exactly one of + // them exists at any given point in the state machine). + if err := stateloader.Make(s.Desc.RangeID).SetRaftTruncatedState( + ctx, distinctBatch, s.TruncatedState, + ); err != nil { + return err + } + } + logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) for i, bytes := range inSnap.LogEntries { if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 1467bdc3d855..66752beac9bb 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -257,6 +257,7 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, hlc.Timestamp{}, hlc.Timestamp{}, bootstrapVersion.Version, + stateloader.TruncatedStateUnreplicated, ); err != nil { t.Fatal(err) } @@ -9829,7 +9830,10 @@ func TestReplicaBootstrapRangeAppliedStateKey(t *testing.T) { // Save the ReplicaState and perform persistent assertions again. repl.raftMu.Lock() repl.mu.Lock() - if _, err := repl.mu.stateLoader.Save(ctx, tc.engine, repl.mu.state); err != nil { + if _, err := repl.mu.stateLoader.Save( + ctx, tc.engine, repl.mu.state, + stateloader.TruncatedStateUnreplicated, + ); err != nil { t.Fatalf("could not save ReplicaState: %v", err) } repl.mu.Unlock() diff --git a/pkg/storage/stateloader/initial.go b/pkg/storage/stateloader/initial.go index 3027c5a481b5..aaaa58764b71 100644 --- a/pkg/storage/stateloader/initial.go +++ b/pkg/storage/stateloader/initial.go @@ -55,8 +55,14 @@ func WriteInitialReplicaState( gcThreshold hlc.Timestamp, txnSpanGCThreshold hlc.Timestamp, activeVersion roachpb.Version, + truncStateType TruncatedStateType, ) (enginepb.MVCCStats, error) { rsl := Make(desc.RangeID) + // NB: be careful using activeVersion here. One caller of this code is the + // split trigger, and the version with which the split trigger is called can + // vary across followers. Thus, actions which require coordination cannot + // use the version as a trigger (this is why this method takes a + // truncStateType argument). var s storagepb.ReplicaState s.TruncatedState = &roachpb.RaftTruncatedState{ @@ -101,7 +107,7 @@ func WriteInitialReplicaState( log.Fatalf(ctx, "expected trivial TxnSpanGCThreshold, but found %+v", existingTxnSpanGCThreshold) } - newMS, err := rsl.Save(ctx, eng, s) + newMS, err := rsl.Save(ctx, eng, s, truncStateType) if err != nil { return enginepb.MVCCStats{}, err } @@ -125,9 +131,10 @@ func WriteInitialState( gcThreshold hlc.Timestamp, txnSpanGCThreshold hlc.Timestamp, bootstrapVersion roachpb.Version, + truncStateType TruncatedStateType, ) (enginepb.MVCCStats, error) { newMS, err := WriteInitialReplicaState( - ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold, bootstrapVersion) + ctx, eng, ms, desc, lease, gcThreshold, txnSpanGCThreshold, bootstrapVersion, truncStateType) if err != nil { return enginepb.MVCCStats{}, err } diff --git a/pkg/storage/stateloader/stateloader.go b/pkg/storage/stateloader/stateloader.go index 995b19311136..14f43cc36070 100644 --- a/pkg/storage/stateloader/stateloader.go +++ b/pkg/storage/stateloader/stateloader.go @@ -105,7 +105,7 @@ func (rsl StateLoader) Load( // The truncated state should not be optional (i.e. the pointer is // pointless), but it is and the migration is not worth it. - truncState, err := rsl.LoadLegacyRaftTruncatedState(ctx, reader) + truncState, _, err := rsl.LoadRaftTruncatedState(ctx, reader) if err != nil { return storagepb.ReplicaState{}, err } @@ -114,6 +114,17 @@ func (rsl StateLoader) Load( return s, nil } +// TruncatedStateType determines whether to use a replicated (legacy) or an +// unreplicated TruncatedState. See VersionUnreplicatedRaftTruncatedStateKey. +type TruncatedStateType int + +const ( + // TruncatedStateLegacyReplicated means use the legacy (replicated) key. + TruncatedStateLegacyReplicated TruncatedStateType = iota + // TruncatedStateUnreplicated means use the new (unreplicated) key. + TruncatedStateUnreplicated +) + // Save persists the given ReplicaState to disk. It assumes that the contained // Stats are up-to-date and returns the stats which result from writing the // updated State. @@ -126,7 +137,10 @@ func (rsl StateLoader) Load( // missing whenever save is called. Optional values should be reserved // strictly for use in Result. Do before merge. func (rsl StateLoader) Save( - ctx context.Context, eng engine.ReadWriter, state storagepb.ReplicaState, + ctx context.Context, + eng engine.ReadWriter, + state storagepb.ReplicaState, + truncStateType TruncatedStateType, ) (enginepb.MVCCStats, error) { ms := state.Stats if err := rsl.SetLease(ctx, eng, ms, *state.Lease); err != nil { @@ -138,8 +152,14 @@ func (rsl StateLoader) Save( if err := rsl.SetTxnSpanGCThreshold(ctx, eng, ms, state.TxnSpanGCThreshold); err != nil { return enginepb.MVCCStats{}, err } - if err := rsl.SetLegacyRaftTruncatedState(ctx, eng, ms, state.TruncatedState); err != nil { - return enginepb.MVCCStats{}, err + if truncStateType == TruncatedStateLegacyReplicated { + if err := rsl.SetLegacyRaftTruncatedState(ctx, eng, ms, state.TruncatedState); err != nil { + return enginepb.MVCCStats{}, err + } + } else { + if err := rsl.SetRaftTruncatedState(ctx, eng, state.TruncatedState); err != nil { + return enginepb.MVCCStats{}, err + } } if state.UsingAppliedStateKey { rai, lai := state.RaftAppliedIndex, state.LeaseAppliedIndex @@ -416,19 +436,6 @@ func (rsl StateLoader) SetMVCCStats( return rsl.writeLegacyMVCCStatsInternal(ctx, eng, newMS) } -// LoadLegacyRaftTruncatedState loads the truncated state. -func (rsl StateLoader) LoadLegacyRaftTruncatedState( - ctx context.Context, reader engine.Reader, -) (roachpb.RaftTruncatedState, error) { - var truncState roachpb.RaftTruncatedState - if _, err := engine.MVCCGetProto( - ctx, reader, rsl.RaftTruncatedStateLegacyKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, - ); err != nil { - return roachpb.RaftTruncatedState{}, err - } - return truncState, nil -} - // SetLegacyRaftTruncatedState overwrites the truncated state. func (rsl StateLoader) SetLegacyRaftTruncatedState( ctx context.Context, @@ -508,7 +515,7 @@ func (rsl StateLoader) LoadLastIndex(ctx context.Context, reader engine.Reader) if lastIndex == 0 { // The log is empty, which means we are either starting from scratch // or the entire log has been truncated away. - lastEnt, err := rsl.LoadLegacyRaftTruncatedState(ctx, reader) + lastEnt, _, err := rsl.LoadRaftTruncatedState(ctx, reader) if err != nil { return 0, err } @@ -517,6 +524,48 @@ func (rsl StateLoader) LoadLastIndex(ctx context.Context, reader engine.Reader) return lastIndex, nil } +// LoadRaftTruncatedState loads the truncated state. The returned boolean returns +// whether the result was read from the TruncatedStateLegacyKey. If both keys +// are missing, it is false which is used to migrate into the unreplicated key. +// +// See VersionUnreplicatedRaftTruncatedState. +func (rsl StateLoader) LoadRaftTruncatedState( + ctx context.Context, reader engine.Reader, +) (_ roachpb.RaftTruncatedState, isLegacy bool, _ error) { + var truncState roachpb.RaftTruncatedState + if found, err := engine.MVCCGetProto( + ctx, reader, rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, + ); err != nil { + return roachpb.RaftTruncatedState{}, false, err + } else if found { + return truncState, false, nil + } + + // If the "new" truncated state isn't there (yet), fall back to the legacy + // truncated state. The next log truncation will atomically rewrite them + // assuming the cluster version has advanced sufficiently. + // + // See VersionUnreplicatedRaftTruncatedState. + legacyFound, err := engine.MVCCGetProto( + ctx, reader, rsl.RaftTruncatedStateLegacyKey(), hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{}, + ) + if err != nil { + return roachpb.RaftTruncatedState{}, false, err + } + return truncState, legacyFound, nil +} + +// SetRaftTruncatedState overwrites the truncated state. +func (rsl StateLoader) SetRaftTruncatedState( + ctx context.Context, eng engine.ReadWriter, truncState *roachpb.RaftTruncatedState, +) error { + if (*truncState == roachpb.RaftTruncatedState{}) { + return errors.New("cannot persist empty RaftTruncatedState") + } + return engine.MVCCPutProto(ctx, eng, nil, /* ms */ + rsl.RaftTruncatedStateKey(), hlc.Timestamp{}, nil, truncState) +} + // LoadReplicaDestroyedError loads the replica destroyed error for the specified // range. If there is no error, nil is returned. func (rsl StateLoader) LoadReplicaDestroyedError( @@ -575,7 +624,7 @@ func (rsl StateLoader) SynthesizeRaftState(ctx context.Context, eng engine.ReadW if err != nil { return err } - truncState, err := rsl.LoadLegacyRaftTruncatedState(ctx, eng) + truncState, _, err := rsl.LoadRaftTruncatedState(ctx, eng) if err != nil { return err } diff --git a/pkg/storage/stats_test.go b/pkg/storage/stats_test.go index 050170c1593a..fd1bc16af872 100644 --- a/pkg/storage/stats_test.go +++ b/pkg/storage/stats_test.go @@ -31,8 +31,8 @@ import ( // writeInitialState(). func initialStats() enginepb.MVCCStats { return enginepb.MVCCStats{ - SysBytes: 130, - SysCount: 4, + SysBytes: 98, + SysCount: 3, } } func TestRangeStatsEmpty(t *testing.T) { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index c5497e144017..79268f0e121a 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2174,12 +2174,25 @@ func (s *Store) WriteInitialData( } } + // See the cluster version for more details. We're basically saying that if the cluster + // is bootstrapped at a version that uses the unreplicated truncated state, initialize + // it with such a truncated state. + truncStateType := stateloader.TruncatedStateUnreplicated + if bootstrapVersion.Less(cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState)) { + truncStateType = stateloader.TruncatedStateLegacyReplicated + } + lease := roachpb.BootstrapLease() _, err := stateloader.WriteInitialState( ctx, batch, enginepb.MVCCStats{}, *desc, - lease, hlc.Timestamp{}, hlc.Timestamp{}, bootstrapVersion) + lease, + hlc.Timestamp{}, + hlc.Timestamp{}, + bootstrapVersion, + truncStateType, + ) if err != nil { return err } diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index e906b82edec2..b14202e65520 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -140,11 +140,12 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } inSnap := IncomingSnapshot{ - SnapUUID: snapUUID, - Batches: batches, - LogEntries: logEntries, - State: &header.State, - snapType: snapTypeRaft, + UsesUnreplicatedTruncatedState: header.UnreplicatedTruncatedState, + SnapUUID: snapUUID, + Batches: batches, + LogEntries: logEntries, + State: &header.State, + snapType: snapTypeRaft, } if header.RaftMessageRequest.ToReplica.ReplicaID == 0 { inSnap.snapType = snapTypePreemptive diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index b63285db8b7d..d8c81ebdd8b7 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1348,6 +1348,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep context.Background(), store.engine, enginepb.MVCCStats{}, *desc, roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, store.ClusterSettings().Version.Version().Version, + stateloader.TruncatedStateUnreplicated, ); err != nil { t.Fatal(err) } @@ -2778,6 +2779,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { ctx, s.Engine(), enginepb.MVCCStats{}, *repl1.Desc(), roachpb.Lease{}, hlc.Timestamp{}, hlc.Timestamp{}, s.ClusterSettings().Version.Version().Version, + stateloader.TruncatedStateUnreplicated, ); err != nil { t.Fatal(err) } diff --git a/pkg/storage/testdata/truncated_state_migration/migration b/pkg/storage/testdata/truncated_state_migration/migration new file mode 100644 index 000000000000..d2c6ea1e6d83 --- /dev/null +++ b/pkg/storage/testdata/truncated_state_migration/migration @@ -0,0 +1,25 @@ +# Migrating after VersionUnreplicatedRaftTruncatedState was enabled. During the +# migration, no TruncatedState is on disk, but we write the new, unreplicated, +# one (note the /u/ in the key) + +prev index=100 term=9 +---- + +handle index=150 term=9 +---- +apply: true +/Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9 + +# Simulate another truncation that moves forward. + +handle index=170 term=9 +---- +apply: true +/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 + +# ... and one that moves backwards and should not take effect. + +handle index=150 term=9 +---- +apply: false +/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 diff --git a/pkg/storage/testdata/truncated_state_migration/pre_migration b/pkg/storage/testdata/truncated_state_migration/pre_migration new file mode 100644 index 000000000000..e84177bd0b51 --- /dev/null +++ b/pkg/storage/testdata/truncated_state_migration/pre_migration @@ -0,0 +1,26 @@ +# Mode of operation below VersionUnreplicatedRaftTruncatedState. +# We don't mess with the on-disk state nor do we ever drop updates. + +prev index=100 term=9 +---- + +put legacy=true index=100 term=9 +---- + +handle index=100 term=9 +---- +apply: true +/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 + +# Note that the below aren't actually possible in practice +# as a divergence won't happen before the migration. + +handle index=150 term=9 +---- +apply: true +/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 + +handle index=60 term=9 +---- +apply: true +/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9