Skip to content

Commit

Permalink
storage: account for sideloaded data in Replica.mu.raftLogSize
Browse files Browse the repository at this point in the history
  • Loading branch information
petermattis committed Aug 31, 2017
1 parent d9b67e6 commit 45396f8
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 12 deletions.
5 changes: 4 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3273,15 +3273,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if len(rd.Entries) > 0 {
// All of the entries are appended to distinct keys, returning a new
// last index.
thinEntries, err := r.maybeSideloadEntriesRaftMuLocked(ctx, rd.Entries)
thinEntries, sideLoadedEntriesSize, err := r.maybeSideloadEntriesRaftMuLocked(ctx, rd.Entries)
if err != nil {
return stats, err
}
oldSize := raftLogSize
raftLogSize += sideLoadedEntriesSize
if lastIndex, lastTerm, raftLogSize, err = r.append(
ctx, writer, lastIndex, lastTerm, raftLogSize, thinEntries,
); err != nil {
return stats, err
}
log.Infof(ctx, "raft log delta: %d", raftLogSize-oldSize)
}
if !raft.IsEmptyHardState(rd.HardState) {
if err := r.raftMu.stateLoader.setHardState(ctx, writer, rd.HardState); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,12 @@ func (r *Replica) applySnapshot(
thinEntries := logEntries
if replicaID != 0 {
var err error
thinEntries, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
var sideloadedEntriesSize int64
thinEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
}

// Write the snapshot's Raft log into the range.
Expand Down
13 changes: 7 additions & 6 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type sideloadStorage interface {
// The passed-in slice is not mutated.
func (r *Replica) maybeSideloadEntriesRaftMuLocked(
ctx context.Context, entriesToAppend []raftpb.Entry,
) ([]raftpb.Entry, error) {
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {
// TODO(tschottdorf): allocating this closure could be expensive. If so make
// it a method on Replica.
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) {
Expand All @@ -118,7 +118,7 @@ func maybeSideloadEntriesImpl(
entriesToAppend []raftpb.Entry,
sideloaded sideloadStorage,
maybeRaftCommand func(storagebase.CmdIDKey) (storagebase.RaftCommand, bool),
) ([]raftpb.Entry, error) {
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {

cow := false
for i := range entriesToAppend {
Expand Down Expand Up @@ -153,7 +153,7 @@ func maybeSideloadEntriesImpl(
// have to unmarshal it.
log.Event(ctx, "proposal not already in memory; unmarshaling")
if err := strippedCmd.Unmarshal(data); err != nil {
return nil, err
return nil, 0, err
}
}

Expand All @@ -173,18 +173,19 @@ func maybeSideloadEntriesImpl(
var err error
data, err = strippedCmd.Marshal()
if err != nil {
return nil, errors.Wrap(err, "while marshalling stripped sideloaded command")
return nil, 0, errors.Wrap(err, "while marshalling stripped sideloaded command")
}
}

ent.Data = encodeRaftCommandV2(cmdID, data)
log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.PutIfNotExists(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, err
return nil, 0, err
}
sideloadedEntriesSize += int64(len(dataToSideload))
}
}
return entriesToAppend, nil
return entriesToAppend, sideloadedEntriesSize, nil
}

func sniffSideloadedRaftCommand(data []byte) (sideloaded bool) {
Expand Down
28 changes: 24 additions & 4 deletions pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,17 @@ func TestRaftSSTableSideloadingInflight(t *testing.T) {

// The entry should be recognized as "to be sideloaded", then maybeCmd is
// invoked and supplies the RaftCommand, whose SSTable is then persisted.
postEnts, err := maybeSideloadEntriesImpl(ctx, preEnts, sideloaded, maybeCmd)
postEnts, size, err := maybeSideloadEntriesImpl(ctx, preEnts, sideloaded, maybeCmd)
if err != nil {
t.Fatal(err)
}

if len(postEnts) != 1 {
t.Fatalf("expected exactly one entry: %+v", postEnts)
}
if size != int64(len(origBytes)) {
t.Fatalf("expected %d sideloadedSize, but found %d", len(origBytes), size)
}

if b, err := sideloaded.Get(ctx, preEnts[0].Index, preEnts[0].Term); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -462,31 +465,45 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
name string
preEnts, postEnts []raftpb.Entry
ss []string
size int64
}

// Intentionally ignore the fact that real calls would always have an
// unbroken run of `entry.Index`.
testCases := []tc{
{name: "empty", preEnts: nil, postEnts: nil, ss: nil},
{name: "v1", preEnts: []raftpb.Entry{entV1Reg, entV1SST}, postEnts: []raftpb.Entry{entV1Reg, entV1SST}},
{
name: "empty",
preEnts: nil,
postEnts: nil,
ss: nil,
size: 0,
},
{
name: "v1",
preEnts: []raftpb.Entry{entV1Reg, entV1SST},
postEnts: []raftpb.Entry{entV1Reg, entV1SST},
size: 0,
},
{
name: "v2",
preEnts: []raftpb.Entry{entV2SST, entV2Reg},
postEnts: []raftpb.Entry{entV2SSTStripped, entV2Reg},
ss: []string{"i13t99"},
size: int64(len(addSST.Data)),
},
{
name: "mixed",
preEnts: []raftpb.Entry{entV1Reg, entV1SST, entV2Reg, entV2SST},
postEnts: []raftpb.Entry{entV1Reg, entV1SST, entV2Reg, entV2SSTStripped},
ss: []string{"i13t99"},
size: int64(len(addSST.Data)),
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
sideloaded := mustNewInMemSideloadStorage(roachpb.RangeID(3), roachpb.ReplicaID(17), ".")
postEnts, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded, noCmd)
postEnts, size, err := maybeSideloadEntriesImpl(ctx, test.preEnts, sideloaded, noCmd)
if err != nil {
t.Fatal(err)
}
Expand All @@ -496,6 +513,9 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
if !reflect.DeepEqual(postEnts, test.postEnts) {
t.Fatalf("result differs from expected: %s", pretty.Diff(postEnts, test.postEnts))
}
if test.size != size {
t.Fatalf("expected %d sideloadedSize, but found %d", test.size, size)
}
var actKeys []string
for k := range sideloaded.(*inMemSideloadStorage).m {
actKeys = append(actKeys, fmt.Sprintf("i%dt%d", k.index, k.term))
Expand Down

0 comments on commit 45396f8

Please sign in to comment.