Skip to content

Commit

Permalink
storage: remove optimization to use in-memory RaftCommand when sidelo…
Browse files Browse the repository at this point in the history
…ading SSTs

Fixes #36861.

This optimization relied on the fact that `RaftCommands` in `Replica.mu.proposals`
were immutable over the lifetime of a Raft proposal. This invariant was violated
by #35261, which allowed a lease index error to trigger an immediate reproposal.
This reproposal mutated the corresponding `RaftCommand` in `Replica.mu.proposals`.
Combined with aliasing between multiple Raft proposals due to reproposals due to
ticks, this resulted in cases where a leaseholder's Raft logs could diverge from
its followers and cause Raft groups to become inconsistent.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 19, 2019
1 parent b47269e commit d6fb710
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 107 deletions.
23 changes: 22 additions & 1 deletion pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,21 @@ func registerTPCC(r *registry) {
CPUs: 4,

LoadWarehouses: 1000,
EstimatedMax: 300,
EstimatedMax: gceOrAws(cloud, 400, 600),
})
registerTPCCBenchSpec(r, tpccBenchSpec{
Nodes: 3,
CPUs: 16,

LoadWarehouses: gceOrAws(cloud, 2000, 2500),
EstimatedMax: gceOrAws(cloud, 1600, 2300),
})
registerTPCCBenchSpec(r, tpccBenchSpec{
Nodes: 12,
CPUs: 16,

LoadWarehouses: gceOrAws(cloud, 8000, 10000),
EstimatedMax: gceOrAws(cloud, 7000, 7000),
})
registerTPCCBenchSpec(r, tpccBenchSpec{
Nodes: 6,
Expand Down Expand Up @@ -370,6 +384,13 @@ func maxVersion(vers ...string) string {
return max.String()
}

func gceOrAws(cloud string, gce, aws int) int {
if cloud == "aws" {
return aws
}
return gce
}

func maybeMinVersionForFixturesImport(cloud string) string {
const minVersionForFixturesImport = "v2.2.0"
if cloud == "aws" {
Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,14 @@ type Replica struct {
//
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first. The notable exception is the
// contained RaftCommand, which we treat as immutable.
// element is removed from the map first.
//
// Due to Raft reproposals, multiple in-flight Raft entries can have
// the same CmdIDKey, all corresponding to the same KV request. However,
// not all Raft entries with a given command ID will correspond directly
// to the *RaftCommand contained in its associated *ProposalData. This
// is because the *RaftCommand can be mutated during reproposals by
// Replica.tryReproposeWithNewLeaseIndex.
proposals map[storagebase.CmdIDKey]*ProposalData
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2052,6 +2052,8 @@ func (r *Replica) processRaftCommand(
// a new one. This is important for pipelined writes, since they
// don't have a client watching to retry, so a failure to
// eventually apply the proposal would be a user-visible error.
// TODO(nvanbenschoten): This reproposal is not tracked by the
// quota pool. We should fix that.
if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) {
return false
}
Expand Down Expand Up @@ -2088,7 +2090,9 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool {
// can happen if there are multiple copies of the command in the
// logs; see TestReplicaRefreshMultiple). We must not create
// multiple copies with multiple lease indexes, so don't repropose
// it again.
// it again. This ensures that at any time, there is only up to a
// single lease index that has a chance of succeeding in the Raft
// log for a given command.
//
// Note that the caller has already removed the current version of
// the proposal from the pending proposals map. We must re-add it
Expand Down
56 changes: 12 additions & 44 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/raftentry"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -70,36 +69,19 @@ type SideloadStorage interface {
func (r *Replica) maybeSideloadEntriesRaftMuLocked(
ctx context.Context, entriesToAppend []raftpb.Entry,
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {
// TODO(tschottdorf): allocating this closure could be expensive. If so make
// it a method on Replica.
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) {
r.mu.Lock()
defer r.mu.Unlock()
cmd, ok := r.mu.proposals[cmdID]
if ok {
return *cmd.command, true
}
return storagepb.RaftCommand{}, false
}
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded, maybeRaftCommand)
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded)
}

// maybeSideloadEntriesImpl iterates through the provided slice of entries. If
// no sideloadable entries are found, it returns the same slice. Otherwise, it
// returns a new slice in which all applicable entries have been sideloaded to
// the specified SideloadStorage. maybeRaftCommand is called when sideloading is
// necessary and can optionally supply a pre-Unmarshaled RaftCommand (which
// usually is provided by the Replica in-flight proposal map.
// the specified SideloadStorage.
func maybeSideloadEntriesImpl(
ctx context.Context,
entriesToAppend []raftpb.Entry,
sideloaded SideloadStorage,
maybeRaftCommand func(storagebase.CmdIDKey) (storagepb.RaftCommand, bool),
ctx context.Context, entriesToAppend []raftpb.Entry, sideloaded SideloadStorage,
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {

cow := false
for i := range entriesToAppend {
var err error
if sniffSideloadedRaftCommand(entriesToAppend[i].Data) {
log.Event(ctx, "sideloading command in append")
if !cow {
Expand All @@ -112,31 +94,16 @@ func maybeSideloadEntriesImpl(

ent := &entriesToAppend[i]
cmdID, data := DecodeRaftCommand(ent.Data) // cheap
strippedCmd, ok := maybeRaftCommand(cmdID)
if ok {
// Happy case: we have this proposal locally (i.e. we proposed
// it). In this case, we can save unmarshalling the fat proposal
// because it's already in-memory.
if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
log.Fatalf(ctx, "encountered sideloaded non-AddSSTable command: %+v", strippedCmd)
}
log.Eventf(ctx, "command already in memory")
// The raft proposal is immutable. To respect that, shallow-copy
// the (nullable) AddSSTable struct which we intend to modify.
addSSTableCopy := *strippedCmd.ReplicatedEvalResult.AddSSTable
strippedCmd.ReplicatedEvalResult.AddSSTable = &addSSTableCopy
} else {
// Bad luck: we didn't have the proposal in-memory, so we'll
// have to unmarshal it.
log.Event(ctx, "proposal not already in memory; unmarshaling")
if err := protoutil.Unmarshal(data, &strippedCmd); err != nil {
return nil, 0, err
}

// Unmarshal the command into an object that we can mutate.
var strippedCmd storagepb.RaftCommand
if err := protoutil.Unmarshal(data, &strippedCmd); err != nil {
return nil, 0, err
}

if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
// Still no AddSSTable; someone must've proposed a v2 command
// but not becaused it contains an inlined SSTable. Strange, but
// but not because it contains an inlined SSTable. Strange, but
// let's be future proof.
log.Warning(ctx, "encountered sideloaded Raft command without inlined payload")
continue
Expand All @@ -146,8 +113,9 @@ func maybeSideloadEntriesImpl(
dataToSideload := strippedCmd.ReplicatedEvalResult.AddSSTable.Data
strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil

// Marshal the command and attach to the Raft entry.
{
data = make([]byte, raftCommandPrefixLen+strippedCmd.Size())
data := make([]byte, raftCommandPrefixLen+strippedCmd.Size())
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID)
_, err := protoutil.MarshalToWithoutFuzzing(&strippedCmd, data[raftCommandPrefixLen:])
if err != nil {
Expand All @@ -157,7 +125,7 @@ func maybeSideloadEntriesImpl(
}

log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
if err := sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, 0, err
}
sideloadedEntriesSize += int64(len(dataToSideload))
Expand Down
61 changes: 2 additions & 59 deletions pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,67 +569,9 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
}
}

func TestRaftSSTableSideloadingInflight(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
defer cancel()

sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(5), roachpb.ReplicaID(7), ".")

// We'll set things up so that while sideloading this entry, there
// unmarshaled one is already in memory (so the payload here won't even be
// looked at).
preEnts := []raftpb.Entry{mkEnt(raftVersionSideloaded, 7, 1, &storagepb.ReplicatedEvalResult_AddSSTable{
Data: []byte("not the payload you're looking for"),
CRC32: 0, // not checked
})}

origBytes := []byte("compare me")

// Pretend there's an inflight command that actually has an SSTable in it.
var pendingCmd storagepb.RaftCommand
pendingCmd.ReplicatedEvalResult.AddSSTable = &storagepb.ReplicatedEvalResult_AddSSTable{
Data: origBytes, CRC32: 0, // not checked
}
maybeCmd := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) {
return pendingCmd, true
}

// The entry should be recognized as "to be sideloaded", then maybeCmd is
// invoked and supplies the RaftCommand, whose SSTable is then persisted.
postEnts, size, err := maybeSideloadEntriesImpl(ctx, preEnts, sideloaded, maybeCmd)
if err != nil {
t.Fatal(err)
}

if len(postEnts) != 1 {
t.Fatalf("expected exactly one entry: %+v", postEnts)
}
if size != int64(len(origBytes)) {
t.Fatalf("expected %d sideloadedSize, but found %d", len(origBytes), size)
}

if b, err := sideloaded.Get(ctx, preEnts[0].Index, preEnts[0].Term); err != nil {
t.Fatal(err)
} else if !bytes.Equal(b, origBytes) {
t.Fatalf("expected payload %s, got %s", origBytes, b)
}

re := regexp.MustCompile(`(?ms)copying entries slice of length 1.*command already in memory.*writing payload`)
if trace := tracing.FormatRecordedSpans(collect()); !re.MatchString(trace) {
t.Fatalf("trace did not match %s:\n%s", re, trace)
}
}

func TestRaftSSTableSideloadingSideload(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
noCmd := func(storagebase.CmdIDKey) (cmd storagepb.RaftCommand, ok bool) {
return
}

addSST := storagepb.ReplicatedEvalResult_AddSSTable{
Data: []byte("foo"), CRC32: 0, // not checked
}
Expand Down Expand Up @@ -684,8 +626,9 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(3), roachpb.ReplicaID(17), ".")
postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded, noCmd)
postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit d6fb710

Please sign in to comment.