Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.1: storage: remove optimization to use in-memory RaftCommand when sideloading SSTs #36945

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,14 @@ type Replica struct {
//
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first. The notable exception is the
// contained RaftCommand, which we treat as immutable.
// element is removed from the map first.
//
// Due to Raft reproposals, multiple in-flight Raft entries can have
// the same CmdIDKey, all corresponding to the same KV request. However,
// not all Raft entries with a given command ID will correspond directly
// to the *RaftCommand contained in its associated *ProposalData. This
// is because the *RaftCommand can be mutated during reproposals by
// Replica.tryReproposeWithNewLeaseIndex.
proposals map[storagebase.CmdIDKey]*ProposalData
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,8 @@ func (r *Replica) processRaftCommand(
// a new one. This is important for pipelined writes, since they
// don't have a client watching to retry, so a failure to
// eventually apply the proposal would be a user-visible error.
// TODO(nvanbenschoten): This reproposal is not tracked by the
// quota pool. We should fix that.
if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) {
return false
}
Expand Down Expand Up @@ -2090,7 +2092,9 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool {
// can happen if there are multiple copies of the command in the
// logs; see TestReplicaRefreshMultiple). We must not create
// multiple copies with multiple lease indexes, so don't repropose
// it again.
// it again. This ensures that at any time, there is only up to a
// single lease index that has a chance of succeeding in the Raft
// log for a given command.
//
// Note that the caller has already removed the current version of
// the proposal from the pending proposals map. We must re-add it
Expand Down
56 changes: 12 additions & 44 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/raftentry"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -70,36 +69,19 @@ type SideloadStorage interface {
func (r *Replica) maybeSideloadEntriesRaftMuLocked(
ctx context.Context, entriesToAppend []raftpb.Entry,
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {
// TODO(tschottdorf): allocating this closure could be expensive. If so make
// it a method on Replica.
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) {
r.mu.Lock()
defer r.mu.Unlock()
cmd, ok := r.mu.proposals[cmdID]
if ok {
return *cmd.command, true
}
return storagepb.RaftCommand{}, false
}
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded, maybeRaftCommand)
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded)
}

// maybeSideloadEntriesImpl iterates through the provided slice of entries. If
// no sideloadable entries are found, it returns the same slice. Otherwise, it
// returns a new slice in which all applicable entries have been sideloaded to
// the specified SideloadStorage. maybeRaftCommand is called when sideloading is
// necessary and can optionally supply a pre-Unmarshaled RaftCommand (which
// usually is provided by the Replica in-flight proposal map.
// the specified SideloadStorage.
func maybeSideloadEntriesImpl(
ctx context.Context,
entriesToAppend []raftpb.Entry,
sideloaded SideloadStorage,
maybeRaftCommand func(storagebase.CmdIDKey) (storagepb.RaftCommand, bool),
ctx context.Context, entriesToAppend []raftpb.Entry, sideloaded SideloadStorage,
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {

cow := false
for i := range entriesToAppend {
var err error
if sniffSideloadedRaftCommand(entriesToAppend[i].Data) {
log.Event(ctx, "sideloading command in append")
if !cow {
Expand All @@ -112,31 +94,16 @@ func maybeSideloadEntriesImpl(

ent := &entriesToAppend[i]
cmdID, data := DecodeRaftCommand(ent.Data) // cheap
strippedCmd, ok := maybeRaftCommand(cmdID)
if ok {
// Happy case: we have this proposal locally (i.e. we proposed
// it). In this case, we can save unmarshalling the fat proposal
// because it's already in-memory.
if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
log.Fatalf(ctx, "encountered sideloaded non-AddSSTable command: %+v", strippedCmd)
}
log.Eventf(ctx, "command already in memory")
// The raft proposal is immutable. To respect that, shallow-copy
// the (nullable) AddSSTable struct which we intend to modify.
addSSTableCopy := *strippedCmd.ReplicatedEvalResult.AddSSTable
strippedCmd.ReplicatedEvalResult.AddSSTable = &addSSTableCopy
} else {
// Bad luck: we didn't have the proposal in-memory, so we'll
// have to unmarshal it.
log.Event(ctx, "proposal not already in memory; unmarshaling")
if err := protoutil.Unmarshal(data, &strippedCmd); err != nil {
return nil, 0, err
}

// Unmarshal the command into an object that we can mutate.
var strippedCmd storagepb.RaftCommand
if err := protoutil.Unmarshal(data, &strippedCmd); err != nil {
return nil, 0, err
}

if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
// Still no AddSSTable; someone must've proposed a v2 command
// but not becaused it contains an inlined SSTable. Strange, but
// but not because it contains an inlined SSTable. Strange, but
// let's be future proof.
log.Warning(ctx, "encountered sideloaded Raft command without inlined payload")
continue
Expand All @@ -146,8 +113,9 @@ func maybeSideloadEntriesImpl(
dataToSideload := strippedCmd.ReplicatedEvalResult.AddSSTable.Data
strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil

// Marshal the command and attach to the Raft entry.
{
data = make([]byte, raftCommandPrefixLen+strippedCmd.Size())
data := make([]byte, raftCommandPrefixLen+strippedCmd.Size())
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID)
_, err := protoutil.MarshalToWithoutFuzzing(&strippedCmd, data[raftCommandPrefixLen:])
if err != nil {
Expand All @@ -157,7 +125,7 @@ func maybeSideloadEntriesImpl(
}

log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
if err := sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, 0, err
}
sideloadedEntriesSize += int64(len(dataToSideload))
Expand Down
61 changes: 2 additions & 59 deletions pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,67 +569,9 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
}
}

func TestRaftSSTableSideloadingInflight(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
defer cancel()

sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(5), roachpb.ReplicaID(7), ".")

// We'll set things up so that while sideloading this entry, there
// unmarshaled one is already in memory (so the payload here won't even be
// looked at).
preEnts := []raftpb.Entry{mkEnt(raftVersionSideloaded, 7, 1, &storagepb.ReplicatedEvalResult_AddSSTable{
Data: []byte("not the payload you're looking for"),
CRC32: 0, // not checked
})}

origBytes := []byte("compare me")

// Pretend there's an inflight command that actually has an SSTable in it.
var pendingCmd storagepb.RaftCommand
pendingCmd.ReplicatedEvalResult.AddSSTable = &storagepb.ReplicatedEvalResult_AddSSTable{
Data: origBytes, CRC32: 0, // not checked
}
maybeCmd := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) {
return pendingCmd, true
}

// The entry should be recognized as "to be sideloaded", then maybeCmd is
// invoked and supplies the RaftCommand, whose SSTable is then persisted.
postEnts, size, err := maybeSideloadEntriesImpl(ctx, preEnts, sideloaded, maybeCmd)
if err != nil {
t.Fatal(err)
}

if len(postEnts) != 1 {
t.Fatalf("expected exactly one entry: %+v", postEnts)
}
if size != int64(len(origBytes)) {
t.Fatalf("expected %d sideloadedSize, but found %d", len(origBytes), size)
}

if b, err := sideloaded.Get(ctx, preEnts[0].Index, preEnts[0].Term); err != nil {
t.Fatal(err)
} else if !bytes.Equal(b, origBytes) {
t.Fatalf("expected payload %s, got %s", origBytes, b)
}

re := regexp.MustCompile(`(?ms)copying entries slice of length 1.*command already in memory.*writing payload`)
if trace := tracing.FormatRecordedSpans(collect()); !re.MatchString(trace) {
t.Fatalf("trace did not match %s:\n%s", re, trace)
}
}

func TestRaftSSTableSideloadingSideload(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
noCmd := func(storagebase.CmdIDKey) (cmd storagepb.RaftCommand, ok bool) {
return
}

addSST := storagepb.ReplicatedEvalResult_AddSSTable{
Data: []byte("foo"), CRC32: 0, // not checked
}
Expand Down Expand Up @@ -684,8 +626,9 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(3), roachpb.ReplicaID(17), ".")
postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded, noCmd)
postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded)
if err != nil {
t.Fatal(err)
}
Expand Down