From 47ef7fdb62a3461eb3aae927661e6d3865be4ffd Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 9 Feb 2023 15:33:41 +0100 Subject: [PATCH 1/4] kvserver: indent unreplicatedSST creation This will be fully extracted mechanically in next commit. Review with whitespace suppressed for best results. The only real change is that we're dropping the raft entry cache a tad later now. But, the order here is completely irrelevant; the raft entry cache doesn't care if we've created (not ingested) some SSTs yet. Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/replica.go | 5 ++ pkg/kv/kvserver/replica_raftstorage.go | 91 ++++++++++++++------------ 2 files changed, 53 insertions(+), 43 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 4043399b974b..60be58efe729 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -863,6 +863,11 @@ func (r *Replica) ReplicaID() roachpb.ReplicaID { return r.replicaID } +// ID returns the FullReplicaID for the Replica. +func (r *Replica) ID() storage.FullReplicaID { + return storage.FullReplicaID{RangeID: r.RangeID, ReplicaID: r.replicaID} +} + // cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. // It requires that Replica.mu is exclusively held. diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 5a28563740e8..5a359d1a64cc 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -482,59 +482,64 @@ func (r *Replica) applySnapshot( log.Infof(ctx, "applied %s (%s)", inSnap, logDetails) }(timeutil.Now()) - unreplicatedSSTFile := &storage.MemFile{} - unreplicatedSST := storage.MakeIngestionSSTWriter( - ctx, r.ClusterSettings(), unreplicatedSSTFile, - ) - defer unreplicatedSST.Close() - - // Clearing the unreplicated state. - // - // NB: We do not expect to see range keys in the unreplicated state, so - // we don't drop a range tombstone across the range key space. - unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) - unreplicatedStart := unreplicatedPrefixKey - unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() - if err = unreplicatedSST.ClearRawRange( - unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ - ); err != nil { - return errors.Wrapf(err, "error clearing range of unreplicated SST writer") - } - - // Update HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { - return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") - } - // We've cleared all the raft state above, so we are forced to write the - // RaftReplicaID again here. - if err := r.raftMu.stateLoader.SetRaftReplicaID( - ctx, &unreplicatedSST, r.replicaID); err != nil { - return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") - } + writeUnreplicatedSST := func(ctx context.Context, id storage.FullReplicaID, st *cluster.Settings, meta raftpb.SnapshotMetadata, hs raftpb.HardState, sl *logstore.StateLoader) (*storage.MemFile, bool, error) { + unreplicatedSSTFile := &storage.MemFile{} + unreplicatedSST := storage.MakeIngestionSSTWriter( + ctx, st, unreplicatedSSTFile, + ) + defer unreplicatedSST.Close() + + // Clearing the unreplicated state. + // + // NB: We do not expect to see range keys in the unreplicated state, so + // we don't drop a range tombstone across the range key space. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID) + unreplicatedStart := unreplicatedPrefixKey + unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() + if err := unreplicatedSST.ClearRawRange( + unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ + ); err != nil { + return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer") + } - // Update Raft entries. - r.store.raftEntryCache.Drop(r.RangeID) + // Update HardState. + if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil { + return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + // We've cleared all the raft state above, so we are forced to write the + // RaftReplicaID again here. + if err := sl.SetRaftReplicaID( + ctx, &unreplicatedSST, id.ReplicaID); err != nil { + return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } - if err := r.raftMu.stateLoader.SetRaftTruncatedState( - ctx, &unreplicatedSST, - &roachpb.RaftTruncatedState{ - Index: nonemptySnap.Metadata.Index, - Term: nonemptySnap.Metadata.Term, - }, - ); err != nil { - return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") - } + if err := sl.SetRaftTruncatedState( + ctx, &unreplicatedSST, + &roachpb.RaftTruncatedState{ + Index: meta.Index, + Term: meta.Term, + }, + ); err != nil { + return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") + } - if err := unreplicatedSST.Finish(); err != nil { - return err + if err := unreplicatedSST.Finish(); err != nil { + return nil, false, err + } + return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil } - if unreplicatedSST.DataSize > 0 { + unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST( + ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader, + ) + if nonempty { // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than // buffering in a MemFile first. if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { return err } } + // Update Raft entries. + r.store.raftEntryCache.Drop(r.RangeID) // If we're subsuming a replica below, we don't have its last NextReplicaID, // nor can we obtain it. That's OK: we can just be conservative and use the From 9d18c630ca19d5e55a2ee2e8cf670ac2fa918da6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 9 Feb 2023 15:53:46 +0100 Subject: [PATCH 2/4] kvserver: extract writeUnreplicatedSST CTRL-X CTRL-V Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/replica_raftstorage.go | 100 +++++++++++++------------ 1 file changed, 54 insertions(+), 46 deletions(-) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 5a359d1a64cc..0f037790698c 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -482,52 +482,6 @@ func (r *Replica) applySnapshot( log.Infof(ctx, "applied %s (%s)", inSnap, logDetails) }(timeutil.Now()) - writeUnreplicatedSST := func(ctx context.Context, id storage.FullReplicaID, st *cluster.Settings, meta raftpb.SnapshotMetadata, hs raftpb.HardState, sl *logstore.StateLoader) (*storage.MemFile, bool, error) { - unreplicatedSSTFile := &storage.MemFile{} - unreplicatedSST := storage.MakeIngestionSSTWriter( - ctx, st, unreplicatedSSTFile, - ) - defer unreplicatedSST.Close() - - // Clearing the unreplicated state. - // - // NB: We do not expect to see range keys in the unreplicated state, so - // we don't drop a range tombstone across the range key space. - unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID) - unreplicatedStart := unreplicatedPrefixKey - unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() - if err := unreplicatedSST.ClearRawRange( - unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ - ); err != nil { - return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer") - } - - // Update HardState. - if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil { - return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") - } - // We've cleared all the raft state above, so we are forced to write the - // RaftReplicaID again here. - if err := sl.SetRaftReplicaID( - ctx, &unreplicatedSST, id.ReplicaID); err != nil { - return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") - } - - if err := sl.SetRaftTruncatedState( - ctx, &unreplicatedSST, - &roachpb.RaftTruncatedState{ - Index: meta.Index, - Term: meta.Term, - }, - ); err != nil { - return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") - } - - if err := unreplicatedSST.Finish(); err != nil { - return nil, false, err - } - return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil - } unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST( ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader, ) @@ -704,6 +658,60 @@ func (r *Replica) applySnapshot( return nil } +func writeUnreplicatedSST( + ctx context.Context, + id storage.FullReplicaID, + st *cluster.Settings, + meta raftpb.SnapshotMetadata, + hs raftpb.HardState, + sl *logstore.StateLoader, +) (*storage.MemFile, bool, error) { + unreplicatedSSTFile := &storage.MemFile{} + unreplicatedSST := storage.MakeIngestionSSTWriter( + ctx, st, unreplicatedSSTFile, + ) + defer unreplicatedSST.Close() + + // Clearing the unreplicated state. + // + // NB: We do not expect to see range keys in the unreplicated state, so + // we don't drop a range tombstone across the range key space. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID) + unreplicatedStart := unreplicatedPrefixKey + unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() + if err := unreplicatedSST.ClearRawRange( + unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ + ); err != nil { + return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer") + } + + // Update HardState. + if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil { + return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + // We've cleared all the raft state above, so we are forced to write the + // RaftReplicaID again here. + if err := sl.SetRaftReplicaID( + ctx, &unreplicatedSST, id.ReplicaID); err != nil { + return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } + + if err := sl.SetRaftTruncatedState( + ctx, &unreplicatedSST, + &roachpb.RaftTruncatedState{ + Index: meta.Index, + Term: meta.Term, + }, + ); err != nil { + return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") + } + + if err := unreplicatedSST.Finish(); err != nil { + return nil, false, err + } + return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil +} + // clearSubsumedReplicaDiskData clears the on disk data of the subsumed // replicas by creating SSTs with range deletion tombstones. We have to be // careful here not to have overlapping ranges with the SSTs we have already From 43248a50a4c8447cb15432c18c9b2af1754fedee Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Feb 2023 13:09:22 +0100 Subject: [PATCH 3/4] kvserver: add missing err check in writeUnreplicatedSST Epic: none Epic: CRDB-8035 --- pkg/kv/kvserver/replica_raftstorage.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 0f037790698c..1d3bfa8b9ef9 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -485,6 +485,9 @@ func (r *Replica) applySnapshot( unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST( ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader, ) + if err != nil { + return err + } if nonempty { // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than // buffering in a MemFile first. From 31ae36f18a0eeb5310636d481f9d0a4e5e33c3ea Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Feb 2023 13:12:42 +0100 Subject: [PATCH 4/4] kvserver: add doc for writeUnreplicatedSST Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/replica_raftstorage.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 1d3bfa8b9ef9..9507093fca78 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -661,6 +661,13 @@ func (r *Replica) applySnapshot( return nil } +// writeUnreplicatedSST creates an SST for snapshot application that +// covers the RangeID-unreplicated keyspace. A range tombstone is +// laid down and the Raft state provided by the arguments is overlaid +// onto it. +// +// TODO(sep-raft-log): when is `nonempty` ever false? We always +// perform a number of writes to this SST. func writeUnreplicatedSST( ctx context.Context, id storage.FullReplicaID, @@ -668,7 +675,7 @@ func writeUnreplicatedSST( meta raftpb.SnapshotMetadata, hs raftpb.HardState, sl *logstore.StateLoader, -) (*storage.MemFile, bool, error) { +) (_ *storage.MemFile, nonempty bool, _ error) { unreplicatedSSTFile := &storage.MemFile{} unreplicatedSST := storage.MakeIngestionSSTWriter( ctx, st, unreplicatedSSTFile,