From 2c29e0a006a4bf196d4251231ab78ff7b1770d42 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 18 Apr 2019 16:13:07 -0400 Subject: [PATCH] storage: remove optimization to use in-memory RaftCommand when sideloading SSTs Fixes #36861. This optimization relied on the fact that `RaftCommands` in `Replica.mu.proposals` were immutable over the lifetime of a Raft proposal. This invariant was violated by #35261, which allowed a lease index error to trigger an immediate reproposal. This reproposal mutated the corresponding `RaftCommand` in `Replica.mu.proposals`. Combined with aliasing between multiple Raft proposals due to reproposals due to ticks, this resulted in cases where a leaseholder's Raft logs could diverge from its followers and cause Raft groups to become inconsistent. Release note: None --- pkg/storage/replica.go | 10 ++++- pkg/storage/replica_raft.go | 6 ++- pkg/storage/replica_sideload.go | 56 ++++++------------------- pkg/storage/replica_sideload_test.go | 61 +--------------------------- 4 files changed, 27 insertions(+), 106 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 6bda6334a0a3..1f4bade76c25 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -296,8 +296,14 @@ type Replica struct { // // The *ProposalData in the map are "owned" by it. Elements from the // map must only be referenced while Replica.mu is held, except if the - // element is removed from the map first. The notable exception is the - // contained RaftCommand, which we treat as immutable. + // element is removed from the map first. + // + // Due to Raft reproposals, multiple in-flight Raft entries can have + // the same CmdIDKey, all corresponding to the same KV request. However, + // not all Raft entries with a given command ID will correspond directly + // to the *RaftCommand contained in its associated *ProposalData. This + // is because the *RaftCommand can be mutated during reproposals by + // Replica.tryReproposeWithNewLeaseIndex. proposals map[storagebase.CmdIDKey]*ProposalData internalRaftGroup *raft.RawNode // The ID of the replica within the Raft group. May be 0 if the replica has diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index b8491d1a4283..f3584a3c54af 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2054,6 +2054,8 @@ func (r *Replica) processRaftCommand( // a new one. This is important for pipelined writes, since they // don't have a client watching to retry, so a failure to // eventually apply the proposal would be a user-visible error. + // TODO(nvanbenschoten): This reproposal is not tracked by the + // quota pool. We should fix that. if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { return false } @@ -2090,7 +2092,9 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { // can happen if there are multiple copies of the command in the // logs; see TestReplicaRefreshMultiple). We must not create // multiple copies with multiple lease indexes, so don't repropose - // it again. + // it again. This ensures that at any time, there is only up to a + // single lease index that has a chance of succeeding in the Raft + // log for a given command. // // Note that the caller has already removed the current version of // the proposal from the pending proposals map. We must re-add it diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 63b373443ae6..d86cd4e4abfb 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/raftentry" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -70,36 +69,19 @@ type SideloadStorage interface { func (r *Replica) maybeSideloadEntriesRaftMuLocked( ctx context.Context, entriesToAppend []raftpb.Entry, ) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) { - // TODO(tschottdorf): allocating this closure could be expensive. If so make - // it a method on Replica. - maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) { - r.mu.Lock() - defer r.mu.Unlock() - cmd, ok := r.mu.proposals[cmdID] - if ok { - return *cmd.command, true - } - return storagepb.RaftCommand{}, false - } - return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded, maybeRaftCommand) + return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded) } // maybeSideloadEntriesImpl iterates through the provided slice of entries. If // no sideloadable entries are found, it returns the same slice. Otherwise, it // returns a new slice in which all applicable entries have been sideloaded to -// the specified SideloadStorage. maybeRaftCommand is called when sideloading is -// necessary and can optionally supply a pre-Unmarshaled RaftCommand (which -// usually is provided by the Replica in-flight proposal map. +// the specified SideloadStorage. func maybeSideloadEntriesImpl( - ctx context.Context, - entriesToAppend []raftpb.Entry, - sideloaded SideloadStorage, - maybeRaftCommand func(storagebase.CmdIDKey) (storagepb.RaftCommand, bool), + ctx context.Context, entriesToAppend []raftpb.Entry, sideloaded SideloadStorage, ) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) { cow := false for i := range entriesToAppend { - var err error if sniffSideloadedRaftCommand(entriesToAppend[i].Data) { log.Event(ctx, "sideloading command in append") if !cow { @@ -112,31 +94,16 @@ func maybeSideloadEntriesImpl( ent := &entriesToAppend[i] cmdID, data := DecodeRaftCommand(ent.Data) // cheap - strippedCmd, ok := maybeRaftCommand(cmdID) - if ok { - // Happy case: we have this proposal locally (i.e. we proposed - // it). In this case, we can save unmarshalling the fat proposal - // because it's already in-memory. - if strippedCmd.ReplicatedEvalResult.AddSSTable == nil { - log.Fatalf(ctx, "encountered sideloaded non-AddSSTable command: %+v", strippedCmd) - } - log.Eventf(ctx, "command already in memory") - // The raft proposal is immutable. To respect that, shallow-copy - // the (nullable) AddSSTable struct which we intend to modify. - addSSTableCopy := *strippedCmd.ReplicatedEvalResult.AddSSTable - strippedCmd.ReplicatedEvalResult.AddSSTable = &addSSTableCopy - } else { - // Bad luck: we didn't have the proposal in-memory, so we'll - // have to unmarshal it. - log.Event(ctx, "proposal not already in memory; unmarshaling") - if err := protoutil.Unmarshal(data, &strippedCmd); err != nil { - return nil, 0, err - } + + // Unmarshal the command into an object that we can mutate. + var strippedCmd storagepb.RaftCommand + if err := protoutil.Unmarshal(data, &strippedCmd); err != nil { + return nil, 0, err } if strippedCmd.ReplicatedEvalResult.AddSSTable == nil { // Still no AddSSTable; someone must've proposed a v2 command - // but not becaused it contains an inlined SSTable. Strange, but + // but not because it contains an inlined SSTable. Strange, but // let's be future proof. log.Warning(ctx, "encountered sideloaded Raft command without inlined payload") continue @@ -146,8 +113,9 @@ func maybeSideloadEntriesImpl( dataToSideload := strippedCmd.ReplicatedEvalResult.AddSSTable.Data strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil + // Marshal the command and attach to the Raft entry. { - data = make([]byte, raftCommandPrefixLen+strippedCmd.Size()) + data := make([]byte, raftCommandPrefixLen+strippedCmd.Size()) encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID) _, err := protoutil.MarshalToWithoutFuzzing(&strippedCmd, data[raftCommandPrefixLen:]) if err != nil { @@ -157,7 +125,7 @@ func maybeSideloadEntriesImpl( } log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term) - if err = sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil { + if err := sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil { return nil, 0, err } sideloadedEntriesSize += int64(len(dataToSideload)) diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index 177f9671db87..9e85f0c343e5 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -569,67 +569,9 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { } } -func TestRaftSSTableSideloadingInflight(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording") - defer cancel() - - sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(5), roachpb.ReplicaID(7), ".") - - // We'll set things up so that while sideloading this entry, there - // unmarshaled one is already in memory (so the payload here won't even be - // looked at). - preEnts := []raftpb.Entry{mkEnt(raftVersionSideloaded, 7, 1, &storagepb.ReplicatedEvalResult_AddSSTable{ - Data: []byte("not the payload you're looking for"), - CRC32: 0, // not checked - })} - - origBytes := []byte("compare me") - - // Pretend there's an inflight command that actually has an SSTable in it. - var pendingCmd storagepb.RaftCommand - pendingCmd.ReplicatedEvalResult.AddSSTable = &storagepb.ReplicatedEvalResult_AddSSTable{ - Data: origBytes, CRC32: 0, // not checked - } - maybeCmd := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) { - return pendingCmd, true - } - - // The entry should be recognized as "to be sideloaded", then maybeCmd is - // invoked and supplies the RaftCommand, whose SSTable is then persisted. - postEnts, size, err := maybeSideloadEntriesImpl(ctx, preEnts, sideloaded, maybeCmd) - if err != nil { - t.Fatal(err) - } - - if len(postEnts) != 1 { - t.Fatalf("expected exactly one entry: %+v", postEnts) - } - if size != int64(len(origBytes)) { - t.Fatalf("expected %d sideloadedSize, but found %d", len(origBytes), size) - } - - if b, err := sideloaded.Get(ctx, preEnts[0].Index, preEnts[0].Term); err != nil { - t.Fatal(err) - } else if !bytes.Equal(b, origBytes) { - t.Fatalf("expected payload %s, got %s", origBytes, b) - } - - re := regexp.MustCompile(`(?ms)copying entries slice of length 1.*command already in memory.*writing payload`) - if trace := tracing.FormatRecordedSpans(collect()); !re.MatchString(trace) { - t.Fatalf("trace did not match %s:\n%s", re, trace) - } -} - func TestRaftSSTableSideloadingSideload(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() - noCmd := func(storagebase.CmdIDKey) (cmd storagepb.RaftCommand, ok bool) { - return - } - addSST := storagepb.ReplicatedEvalResult_AddSSTable{ Data: []byte("foo"), CRC32: 0, // not checked } @@ -684,8 +626,9 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { + ctx := context.Background() sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(3), roachpb.ReplicaID(17), ".") - postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded, noCmd) + postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded) if err != nil { t.Fatal(err) }