diff --git a/pkg/storage/entry_cache.go b/pkg/storage/entry_cache.go index 6ab43cd0c626..b385359b088e 100644 --- a/pkg/storage/entry_cache.go +++ b/pkg/storage/entry_cache.go @@ -137,15 +137,16 @@ func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint6 // getEntries returns entries between [lo, hi) for specified range. // If any entries are returned for the specified indexes, they will // start with index lo and proceed sequentially without gaps until -// 1) all entries exclusive of hi are fetched, 2) > maxBytes of -// entries data is fetched, or 3) a cache miss occurs. +// 1) all entries exclusive of hi are fetched, 2) fetching another entry +// would add up to more than maxBytes of data, or 3) a cache miss occurs. +// The returned size reflects the size of the returned entries. func (rec *raftEntryCache) getEntries( ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64, -) ([]raftpb.Entry, uint64, uint64) { +) (_ []raftpb.Entry, size uint64, nextIndex uint64, exceededMaxBytes bool) { rec.Lock() defer rec.Unlock() var bytes uint64 - nextIndex := lo + nextIndex = lo rec.fromKey = entryCacheKey{RangeID: rangeID, Index: lo} rec.toKey = entryCacheKey{RangeID: rangeID, Index: hi} @@ -155,16 +156,20 @@ func (rec *raftEntryCache) getEntries( return true } ent := v.(*raftpb.Entry) - ents = append(ents, *ent) - bytes += uint64(ent.Size()) - nextIndex++ - if maxBytes > 0 && bytes > maxBytes { - return true + size := uint64(ent.Size()) + if bytes+size > maxBytes { + exceededMaxBytes = true + if len(ents) > 0 { + return true + } } - return false + nextIndex++ + bytes += size + ents = append(ents, *ent) + return exceededMaxBytes }, &rec.fromKey, &rec.toKey) - return ents, bytes, nextIndex + return ents, bytes, nextIndex, exceededMaxBytes } // delEntries deletes entries between [lo, hi) for specified range. diff --git a/pkg/storage/entry_cache_test.go b/pkg/storage/entry_cache_test.go index 3fec6d6e72c0..2429279ee428 100644 --- a/pkg/storage/entry_cache_test.go +++ b/pkg/storage/entry_cache_test.go @@ -15,6 +15,7 @@ package storage import ( + "math" "reflect" "testing" @@ -24,6 +25,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) +const noLimit = math.MaxUint64 + func newEntry(index, size uint64) raftpb.Entry { return raftpb.Entry{ Index: index, @@ -48,12 +51,12 @@ func verifyGet( expEnts []raftpb.Entry, expNextIndex uint64, ) { - ents, _, nextIndex := rec.getEntries(nil, rangeID, lo, hi, 0) + ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, noLimit) if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) { t.Fatalf("expected entries %+v; got %+v", expEnts, ents) } if nextIndex != expNextIndex { - t.Fatalf("expected next index %d; got %d", nextIndex, expNextIndex) + t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex) } for _, e := range ents { term, ok := rec.getTerm(rangeID, e.Index) @@ -115,10 +118,10 @@ func TestEntryCacheClearTo(t *testing.T) { rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)}) rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)}) rec.clearTo(rangeID, 21) - if ents, _, _ := rec.getEntries(nil, rangeID, 2, 21, 0); len(ents) != 0 { + if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, noLimit); len(ents) != 0 { t.Errorf("expected no entries after clearTo") } - if ents, _, _ := rec.getEntries(nil, rangeID, 21, 22, 0); len(ents) != 1 { + if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, noLimit); len(ents) != 1 { t.Errorf("expected entry 22 to remain in the cache clearTo") } } @@ -128,13 +131,13 @@ func TestEntryCacheEviction(t *testing.T) { rangeID := roachpb.RangeID(1) rec := newRaftEntryCache(100) rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)}) - ents, _, hi := rec.getEntries(nil, rangeID, 1, 3, 0) + ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, noLimit) if len(ents) != 2 || hi != 3 { t.Errorf("expected both entries; got %+v, %d", ents, hi) } // Add another entry to evict first. rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)}) - ents, _, hi = rec.getEntries(nil, rangeID, 2, 4, 0) + ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, noLimit) if len(ents) != 2 || hi != 4 { t.Errorf("expected only two entries; got %+v, %d", ents, hi) } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index f629b3b18aff..5d137ce4a52a 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -17,6 +17,7 @@ package storage import ( "context" "fmt" + "math" "time" "github.com/coreos/etcd/raft" @@ -76,8 +77,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, // Entries implements the raft.Storage interface. Note that maxBytes is advisory // and this method will always return at least one entry even if it exceeds -// maxBytes. Passing maxBytes equal to zero disables size checking. Sideloaded -// proposals count towards maxBytes with their payloads inlined. +// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined. func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) { readonly := r.store.Engine().NewReadOnly() defer readonly.Close() @@ -114,10 +114,11 @@ func entries( } ents := make([]raftpb.Entry, 0, n) - ents, size, hitIndex := eCache.getEntries(ents, rangeID, lo, hi, maxBytes) + ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes) + // Return results if the correct number of results came back or if // we ran into the max bytes limit. - if uint64(len(ents)) == hi-lo || (maxBytes > 0 && size > maxBytes) { + if uint64(len(ents)) == hi-lo || exceededMaxBytes { return ents, nil } @@ -130,7 +131,6 @@ func entries( canCache := true var ent raftpb.Entry - exceededMaxBytes := false scanFunc := func(kv roachpb.KeyValue) (bool, error) { if err := kv.Value.GetProto(&ent); err != nil { return false, err @@ -158,9 +158,13 @@ func entries( // Note that we track the size of proposals with payloads inlined. size += uint64(ent.Size()) - + if size > maxBytes { + exceededMaxBytes = true + if len(ents) > 0 { + return exceededMaxBytes, nil + } + } ents = append(ents, ent) - exceededMaxBytes = maxBytes > 0 && size > maxBytes return exceededMaxBytes, nil } @@ -274,7 +278,7 @@ func term( ) (uint64, error) { // entries() accepts a `nil` sideloaded storage and will skip inlining of // sideloaded entries. We only need the term, so this is what we do. - ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, 0) + ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */) if err == raft.ErrCompacted { ts, err := rsl.LoadTruncatedState(ctx, eng) if err != nil { diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 667c23e33e71..6cf21e4eea4f 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -185,7 +185,7 @@ func maybeInlineSideloadedRaftCommand( // We could unmarshal this yet again, but if it's committed we // are very likely to have appended it recently, in which case // we can save work. - cachedSingleton, _, _ := entryCache.getEntries( + cachedSingleton, _, _, _ := entryCache.getEntries( nil, rangeID, ent.Index, ent.Index+1, 1<<20, ) diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index 4599a689f64e..c8caabd5725d 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -823,7 +823,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { if len(entries) != 1 { t.Fatalf("no or too many entries returned from cache: %+v", entries) } - ents, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20) + ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20) if withSS { // We passed the sideload storage, so we expect to get our // inlined index back from the cache. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 399f84d0ad68..fc56056d8962 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6997,8 +6997,8 @@ func TestEntries(t *testing.T) { }}, // Case 5: Get a single entry from cache. {lo: indexes[5], hi: indexes[6], expResultCount: 1, expCacheCount: 1, setup: nil}, - // Case 6: Use MaxUint64 instead of 0 for maxBytes. - {lo: indexes[5], hi: indexes[9], maxBytes: math.MaxUint64, expResultCount: 4, expCacheCount: 4, setup: nil}, + // Case 6: Get range without size limitation. (Like case 4, without truncating). + {lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: nil}, // Case 7: maxBytes is set low so only a single value should be // returned. {lo: indexes[5], hi: indexes[9], maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil}, @@ -7044,7 +7044,10 @@ func TestEntries(t *testing.T) { if tc.setup != nil { tc.setup() } - cacheEntries, _, _ := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) + if tc.maxBytes == 0 { + tc.maxBytes = math.MaxUint64 + } + cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes) if len(cacheEntries) != tc.expCacheCount { t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries)) } @@ -7060,12 +7063,17 @@ func TestEntries(t *testing.T) { } if len(ents) != tc.expResultCount { t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents)) + } else if tc.expResultCount > 0 { + expHitLimit := ents[len(ents)-1].Index < tc.hi-1 + if hitLimit != expHitLimit { + t.Errorf("%d: unexpected hit limit: %t", i, hitLimit) + } } } // Case 23: Lo must be less than or equal to hi. repl.mu.Lock() - if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], 0); err == nil { + if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], math.MaxUint64); err == nil { t.Errorf("23: error expected, got none") } repl.mu.Unlock() @@ -7078,21 +7086,34 @@ func TestEntries(t *testing.T) { repl.mu.Lock() defer repl.mu.Unlock() - if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], 0); err == nil { + if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64); err == nil { t.Errorf("24: error expected, got none") } - // Case 25: don't hit the gap due to maxBytes. - ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1) - if err != nil { - t.Errorf("25: expected no error, got %s", err) + // Case 25a: don't hit the gap due to maxBytes, cache populated. + { + ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1) + if err != nil { + t.Errorf("25: expected no error, got %s", err) + } + if len(ents) != 1 { + t.Errorf("25: expected 1 entry, got %d", len(ents)) + } } - if len(ents) != 1 { - t.Errorf("25: expected 1 entry, got %d", len(ents)) + // Case 25b: don't hit the gap due to maxBytes, cache cleared. + { + repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1) + ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1) + if err != nil { + t.Errorf("25: expected no error, got %s", err) + } + if len(ents) != 1 { + t.Errorf("25: expected 1 entry, got %d", len(ents)) + } } // Case 26: don't hit the gap due to truncation. - if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], 0); err != raft.ErrCompacted { + if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64); err != raft.ErrCompacted { t.Errorf("26: expected error %s , got %s", raft.ErrCompacted, err) } }