From 4f7bb5f48114d50d64a6e1f5b79b179aac1d3394 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Fri, 8 Nov 2024 20:57:20 -0500 Subject: [PATCH] cache,db: de-dup concurrent attempts to read the same block Concurrent reads of the same block have been observed to cause very high memory usage, and cause significant CPU usage for allocations/deallocations. We now coordinate across multiple concurrent attempts to read the same block via a readEntry, which makes the readers take turns until one succeeds. The readEntries are embedded in a map that is part of a readShard, where there is a readShard for each cache.Shard. See the long comment in the readShard declaration for motivation. Callers interact with this new behavior via Cache.GetWithReadHandle, which is only for callers that intend to do a read and then populate the cache. If this method returns a ReadHandle, the caller has permission to do a read. See the ReadHandle comment for details of the contract. Fixes #4138 --- internal/cache/clockpro.go | 109 +++++++- internal/cache/read_shard.go | 414 +++++++++++++++++++++++++++++ internal/cache/read_shard_test.go | 264 ++++++++++++++++++ internal/cache/refcnt_normal.go | 13 + internal/cache/refcnt_tracing.go | 14 + internal/cache/testdata/read_shard | 163 ++++++++++++ sstable/block/buffer_pool.go | 14 +- sstable/colblk/index_block_test.go | 2 +- sstable/colblk/keyspan_test.go | 2 +- sstable/reader.go | 86 ++++-- 10 files changed, 1043 insertions(+), 38 deletions(-) create mode 100644 internal/cache/read_shard.go create mode 100644 internal/cache/read_shard_test.go create mode 100644 internal/cache/testdata/read_shard diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 2459ec7b69..53f756cb96 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -18,6 +18,7 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( + "context" "fmt" "os" "runtime" @@ -25,6 +26,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" @@ -114,9 +116,31 @@ type shard struct { countHot int64 countCold int64 countTest int64 + + // Some fields in readShard are protected by mu. See comments in declaration + // of readShard. + readShard readShard +} + +func (c *shard) init(maxSize int64) { + *c = shard{ + maxSize: maxSize, + coldTarget: maxSize, + } + if entriesGoAllocated { + c.entries = make(map[*entry]struct{}) + } + c.blocks.Init(16) + c.files.Init(16) + c.readShard.Init(c) } -func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { +// getWithMaybeReadEntry is the internal helper for implementing +// Cache.{Get,GetWithReadHandle}. When desireReadEntry is true, and the block +// is not in the cache (!Handle.Valid()), a non-nil readEntry is returned. +func (c *shard) getWithMaybeReadEntry( + id ID, fileNum base.DiskFileNum, offset uint64, desireReadEntry bool, +) (Handle, *readEntry) { c.mu.RLock() var value *Value if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { @@ -126,12 +150,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { } } c.mu.RUnlock() + var re *readEntry + if value == nil && desireReadEntry { + c.mu.Lock() + // After the c.mu.RUnlock(), someone could have inserted the value in the + // cache. We could tolerate the race and do a file read, or do another map + // lookup. We choose to do the latter, since the cost of a map lookup is + // insignificant compared to the cost of reading a block from a file. + if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { + value = e.acquireValue() + if value != nil { + e.referenced.Store(true) + } + } + if value == nil { + re = c.readShard.getReadEntryLocked(id, fileNum, offset) + } + c.mu.Unlock() + } if value == nil { c.misses.Add(1) - return Handle{} + } else { + c.hits.Add(1) } - c.hits.Add(1) - return Handle{value: value} + return Handle{value: value}, re } func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle { @@ -170,6 +212,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value value.ref.trace("add-hot") c.sizeHot += delta } else { + // TODO(sumeer): unclear why we don't set e.ptype to etHot on this path. + // In the default case below, where the state is etTest we set it to + // etHot. But etTest is "colder" than etCold, since the only transition + // into etTest is etCold => etTest, so since etTest transitions to + // etHot, then etCold should also transition. value.ref.trace("add-cold") c.sizeCold += delta } @@ -737,15 +784,7 @@ func newShards(size int64, shards int) *Cache { c.idAlloc.Store(1) c.trace("alloc", c.refs.Load()) for i := range c.shards { - c.shards[i] = shard{ - maxSize: size / int64(len(c.shards)), - coldTarget: size / int64(len(c.shards)), - } - if entriesGoAllocated { - c.shards[i].entries = make(map[*entry]struct{}) - } - c.shards[i].blocks.Init(16) - c.shards[i].files.Init(16) + c.shards[i].init(size / int64(len(c.shards))) } // Note: this is a no-op if invariants are disabled or race is enabled. @@ -822,7 +861,49 @@ func (c *Cache) Unref() { // Get retrieves the cache value for the specified file and offset, returning // nil if no value is present. func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { - return c.getShard(id, fileNum, offset).Get(id, fileNum, offset) + h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry( + id, fileNum, offset, false) + if invariants.Enabled && re != nil { + panic("readEntry should be nil") + } + return h +} + +// GetWithReadHandle retrieves the cache value for the specified ID, fileNum +// and offset. If found, a valid Handle is returned (with cacheHit set to +// true), else a valid ReadHandle is returned. +// +// See the ReadHandle declaration for the contract the caller must satisfy +// when getting a valid ReadHandle. +// +// This method can block before returning since multiple concurrent gets for +// the same cache value will take turns getting a ReadHandle, which represents +// permission to do the read. This blocking respects context cancellation, in +// which case an error is returned (and not a valid ReadHandle). +// +// When blocking, the errorDuration return value can be non-zero and is +// populated with the total duration that other readers that observed an error +// (see ReadHandle.SetReadError) spent in doing the read. This duration can be +// greater than the time spent blocked in this method, since some of these +// errors could have occurred prior to this call. But it serves as a rough +// indicator of whether turn taking could have caused higher latency due to +// context cancellation of other readers. +// +// While waiting, someone else may successfully read the value, which results +// in a valid Handle being returned. This is a case where cacheHit=false. +func (c *Cache) GetWithReadHandle( + ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64, +) (h Handle, rh ReadHandle, errorDuration time.Duration, cacheHit bool, err error) { + h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry( + id, fileNum, offset, true) + if h.Valid() { + return h, ReadHandle{}, 0, true, nil + } + h, errorDuration, err = re.waitForReadPermissionOrHandle(ctx) + if err != nil || h.Valid() { + return h, ReadHandle{}, errorDuration, false, err + } + return Handle{}, ReadHandle{entry: re}, errorDuration, false, nil } // Set sets the cache value for the specified file and offset, overwriting an diff --git a/internal/cache/read_shard.go b/internal/cache/read_shard.go new file mode 100644 index 0000000000..432edc1e4c --- /dev/null +++ b/internal/cache/read_shard.go @@ -0,0 +1,414 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/swiss" +) + +// readShard coordinates the read of a block that will be put in the cache. It +// ensures only one goroutine is reading a block, and other callers block +// until that goroutine is done (with success or failure). In the case of +// success, the other goroutines will use the value that was read, even if it +// is too large to be placed in the cache, or got evicted from the cache +// before they got scheduled. In the case of a failure (read error or context +// cancellation), one of the waiters will be given a turn to do the read. +// +// This turn-taking ensures that a large number of concurrent attempts to read +// the same block that is not in the cache does not result in the same number +// of reads from the filesystem (or remote storage). We have seen large spikes +// in memory usage (and CPU usage for memory allocation/deallocation) without +// this turn-taking. +// +// It introduces a small risk related to context cancellation -- if many +// readers assigned a turn exceed their deadline while doing the read and +// report an error, a reader with a longer deadline can unnecessarily wait. We +// accept this risk for now since the primary production use in CockroachDB is +// filesystem reads, where context cancellation is not respected. We do +// introduce an error duration metric emitted in traces that can be used to +// quantify such wasteful waiting. Note that this same risk extends to waiting +// on the Options.LoadBlockSema, so the error duration metric includes the +// case of an error when waiting on the semaphore (as a side effect of that +// waiting happening in the caller, sstable.Reader). +// +// Design choices and motivation: +// +// - readShard is tightly integrated with a cache shard: At its core, +// readShard is a map with synchronization. For the same reason the cache is +// sharded (for higher concurrency by sharding the mutex), it is beneficial +// to shard synchronization on readShard. By making readShard a member of +// shard, this sharding is trivially accomplished. Additionally, the code +// feels cleaner when there isn't a race between a cache miss, followed by +// creating a readEntry that is no longer needed because someone else has +// done the read since the miss and inserted into the cache. By making the +// readShard use shard.mu, such a race is avoided. A side benefit is that +// the cache interaction can be hidden behind readEntry.SetReadValue. One +// disadvantage of this tightly integrated design is that it does not +// encompass readers that will put the read value into a block.BufferPool -- +// we don't worry about those since block.BufferPool is only used for +// compactions and there is at most one compaction reader of a block. There +// is the possibility that the compaction reader and a user-facing iterator +// reader will do duplicate reads, but we accept that deficiency. +// +// - readMap is separate from shard.blocks map: One could have a design which +// extends the cache entry and unifies the two maps. However, we never want +// to evict a readEntry while there are readers waiting for the block read +// (including the case where the corresponding file is being removed from +// shard.files). Also, the number of stable cache entries is huge and +// therefore is manually allocated, while the number of readEntries is small +// (so manual allocation isn't necessary). For these reasons we maintain a +// separate map. This separation also results in more modular code, instead +// of piling more stuff into shard. +type readShard struct { + // shard is only used for locking, and calling shard.Set. + shard *shard + // Protected by shard.mu. + // + // shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared + // resource and must be released quickly. + shardMu struct { + readMap swiss.Map[key, *readEntry] + } +} + +func (rs *readShard) Init(shard *shard) *readShard { + *rs = readShard{ + shard: shard, + } + // Choice of 16 is arbitrary. + rs.shardMu.readMap.Init(16) + return rs +} + +// getReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is +// already write locked. +func (rs *readShard) getReadEntryLocked(id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + k := key{fileKey{id, fileNum}, offset} + e, ok := rs.shardMu.readMap.Get(k) + if !ok { + e = newReadEntry(rs, id, fileNum, offset) + rs.shardMu.readMap.Put(k, e) + } else { + e.refCount.acquireAllowZero() + } + return e +} + +func (rs *readShard) lenForTesting() int { + rs.shard.mu.Lock() + defer rs.shard.mu.Unlock() + return rs.shardMu.readMap.Len() +} + +// readEntry is used to coordinate between concurrent attempted readers of the +// same block. +type readEntry struct { + readShard *readShard + id ID + fileNum base.DiskFileNum + offset uint64 + mu struct { + sync.RWMutex + // v, when non-nil, has a ref from readEntry, which is unreffed when + // readEntry is deleted from the readMap. + v *Value + // isReading and ch together capture the state of whether someone has been + // granted a turn to read, and of readers waiting for that read to finish. + // ch is lazily allocated since most readEntries will not see concurrent + // readers. This lazy allocation results in one transition of ch from nil + // to non-nil, so waiters can read this non-nil ch and block on reading + // from it without holding mu. + // + // ch is written to, to signal one waiter to start doing the read. ch is + // closed when the value is successfully read and has been stored in v, so + // that all waiters wake up and read v. ch is a buffered channel with a + // capacity of 1. + // + // State transitions when trying to wait for turn: + // Case !isReading: + // set isReading=true; Drain the ch if non-nil and non-empty; proceed + // with turn to do the read. + // Case isReading: + // allocate ch if nil; wait on ch + // Finished reading successfully: + // set isReading=false; if ch is non-nil, close ch. + // Finished reading with failure: + // set isReading=false; if ch is non-nil, write to ch. + // + // INVARIANT: + // isReading => ch is nil or ch is empty. + isReading bool + ch chan struct{} + // Total duration of reads and semaphore waiting that resulted in error. + errorDuration time.Duration + readStart time.Time + } + // Count of ReadHandles that refer to this readEntry. Increments always hold + // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe + // to delete readEntry from readShard.shardMu.readMap. + refCount refcnt +} + +var readEntryPool = sync.Pool{ + New: func() interface{} { + return &readEntry{} + }, +} + +func newReadEntry(rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + e := readEntryPool.Get().(*readEntry) + *e = readEntry{ + readShard: rs, + id: id, + fileNum: fileNum, + offset: offset, + } + e.refCount.init(1) + return e +} + +// waitForReadPermissionOrHandle returns either an already read value (in +// Handle), an error (if the context was cancelled), or neither, which is a +// directive to the caller to do the read. In this last case the caller must +// call either setReadValue or setReadError. +// +// In all cases, errorDuration is populated with the total duration that +// readers that observed an error (setReadError) spent in doing the read. This +// duration can be greater than the time spend in waitForReadPermissionHandle, +// since some of these errors could have occurred prior to this call. But it +// serves as a rough indicator of whether turn taking could have caused higher +// latency due to context cancellation. +func (e *readEntry) waitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + constructHandleLocked := func() Handle { + if e.mu.v == nil { + panic("value is nil") + } + e.mu.v.acquire() + return Handle{value: e.mu.v} + } + becomeReaderLocked := func() { + if e.mu.v != nil { + panic("value is non-nil") + } + if e.mu.isReading { + panic("isReading is already true") + } + e.mu.isReading = true + if e.mu.ch != nil { + // Drain the channel, so that no one else mistakenly believes they + // should read. + select { + case <-e.mu.ch: + default: + } + } + e.mu.readStart = time.Now() + } + unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) { + removeState := e.makeTryRemoveStateLocked() + errorDuration = e.mu.errorDuration + if readLock { + e.mu.RUnlock() + } else { + e.mu.Unlock() + } + unrefAndTryRemoveFromMap(removeState) + return errorDuration + } + + for { + e.mu.Lock() + if e.mu.v != nil { + // Value has already been read. + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return h, errorDuration, nil + } + // Not already read. Wait for turn to do the read or for someone else to do + // the read. + if !e.mu.isReading { + // Have permission to do the read. + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } + if e.mu.ch == nil { + // Rare case when multiple readers are concurrently trying to read. If + // this turns out to be common enough we could use a sync.Pool. + e.mu.ch = make(chan struct{}, 1) + } + ch := e.mu.ch + e.mu.Unlock() + select { + case <-ctx.Done(): + e.mu.Lock() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, ctx.Err() + case _, ok := <-ch: + if !ok { + // Channel closed, so value was read. + e.mu.RLock() + if e.mu.v == nil { + panic("value is nil") + } + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(true) + return h, errorDuration, nil + } + // Else, probably granted permission to do the read. NB: since isReading + // is false, someone else can slip through before this thread acquires + // e.mu, and take the turn. So try to actually get the turn by trying + // again in the loop. + } + } +} + +// tryRemoveState captures the state needed by tryRemoveFromMap. The caller +// constructs it before calling unrefAndTryRemoveFromMap since it typically +// held readEntry.mu, which avoids acquiring it again in +// unrefAndTryRemoveFromMap. +type tryRemoveState struct { + rs *readShard + k key + e *readEntry +} + +// makeTryRemoveStateLocked initializes tryRemoveState. +func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState { + return tryRemoveState{ + rs: e.readShard, + k: key{fileKey{e.id, e.fileNum}, e.offset}, + e: e, + } +} + +// unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs. +// It is possible that after unreffing that s.e has already been removed, and +// is now back in the sync.Pool, or being reused (for the same or different +// key). This is because after unreffing, which caused the s.e.refCount to +// become zero, but before acquiring shard.mu, it could have been incremented +// and decremented concurrently, and some other goroutine could have observed +// a different decrement to 0, and raced ahead and deleted s.e from the +// readMap. +func unrefAndTryRemoveFromMap(s tryRemoveState) { + if !s.e.refCount.release() { + return + } + s.rs.shard.mu.Lock() + e2, ok := s.rs.shardMu.readMap.Get(s.k) + if !ok || e2 != s.e { + // Already removed. + s.rs.shard.mu.Unlock() + return + } + if s.e.refCount.value() != 0 { + s.rs.shard.mu.Unlock() + return + } + // k => e and e.refCount == 0. And it cannot be incremented since + // shard.mu.Lock() is held. So remove from map. + s.rs.shardMu.readMap.Delete(s.k) + s.rs.shard.mu.Unlock() + + // Free s.e. + s.e.mu.Lock() + if s.e.mu.v != nil { + s.e.mu.v.release() + s.e.mu.v = nil + } + s.e.mu.Unlock() + *s.e = readEntry{} + readEntryPool.Put(s.e) +} + +func (e *readEntry) setReadValue(v *Value) Handle { + // Add to the cache before taking another ref for readEntry, since the cache + // expects ref=1 when it is called. + // + // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure + // that it is added as etHot. The common case will be e.refCount = 1, and we + // don't want to acquire e.mu twice, so one way to do this would be relax + // the invariant in shard.Set that requires Value.refs() == 1. Then we can + // do the work under e.mu before calling shard.Set. + h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v) + e.mu.Lock() + // Acquire a ref for readEntry, since we are going to remember it in e.mu.v. + v.acquire() + e.mu.v = v + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + // Inform all waiters so they can use e.mu.v. Not all readers have called + // readEntry.waitForReadPermissionOrHandle, and those will also use + // e.mu.v. + close(e.mu.ch) + } + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) + return h +} + +func (e *readEntry) setReadError(err error) { + e.mu.Lock() + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + select { + case e.mu.ch <- struct{}{}: + default: + panic("channel is not empty") + } + } + e.mu.errorDuration += time.Since(e.mu.readStart) + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) +} + +// ReadHandle represents a contract with a caller that had a miss when doing a +// cache lookup, and wants to do a read and insert the read block into the +// cache. The contract applies when ReadHandle.Valid returns true, in which +// case the caller has been assigned the turn to do the read (and others are +// potentially waiting for it). +// +// Contract: +// +// The caller must immediately start doing a read, or can first wait on a +// shared resource that would also block a different reader if it was assigned +// the turn instead (specifically, this refers to Options.LoadBlockSema). +// After the read, it must either call SetReadValue or SetReadError depending +// on whether the read succeeded or failed. +type ReadHandle struct { + entry *readEntry +} + +// Valid returns true for a valid ReadHandle. +func (rh ReadHandle) Valid() bool { + return rh.entry != nil +} + +// SetReadValue provides the Value that the caller has read. The caller is +// responsible for releasing the returned Handle when it is no longer needed. +func (rh ReadHandle) SetReadValue(v *Value) Handle { + return rh.entry.setReadValue(v) +} + +// SetReadError specifies that the caller has encountered a read error. +func (rh ReadHandle) SetReadError(err error) { + rh.entry.setReadError(err) +} diff --git a/internal/cache/read_shard_test.go b/internal/cache/read_shard_test.go new file mode 100644 index 0000000000..d6c1821633 --- /dev/null +++ b/internal/cache/read_shard_test.go @@ -0,0 +1,264 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + crand "crypto/rand" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" + "github.com/stretchr/testify/require" +) + +type testReader struct { + ctx context.Context + id ID + fileNum base.DiskFileNum + offset uint64 + re *readEntry + mu struct { + *sync.Mutex + finishedWait bool + waitedNeedToRead bool + waitedErr error + waitedValue string + + cond *sync.Cond + } +} + +func newTestReader( + ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64, mu *sync.Mutex, +) *testReader { + r := &testReader{ + ctx: ctx, + id: id, + fileNum: fileNum, + offset: offset, + } + r.mu.Mutex = mu + r.mu.cond = sync.NewCond(mu) + return r +} + +func (r *testReader) getAsync(shard *shard) *string { + h, re := shard.getWithMaybeReadEntry(r.id, r.fileNum, r.offset, true) + if h.Valid() { + v := string(h.RawBuffer()) + h.Release() + return &v + } + r.re = re + go func() { + h, _, err := re.waitForReadPermissionOrHandle(r.ctx) + r.mu.Lock() + defer r.mu.Unlock() + r.mu.finishedWait = true + r.mu.cond.Signal() + if h.Valid() { + r.mu.waitedValue = string(h.RawBuffer()) + h.Release() + return + } + if err != nil { + r.mu.waitedErr = err + return + } + r.mu.waitedNeedToRead = true + }() + return nil +} + +func (r *testReader) waitUntilFinishedWait() (*string, error) { + r.mu.Lock() + defer r.mu.Unlock() + for !r.mu.finishedWait { + r.mu.cond.Wait() + } + if r.mu.waitedNeedToRead { + return nil, nil + } + if r.mu.waitedErr != nil { + return nil, r.mu.waitedErr + } + return &r.mu.waitedValue, nil +} + +func (r *testReader) setReadValue(t *testing.T, v string) { + val := Alloc(len(v)) + copy(val.Buf(), []byte(v)) + h := ReadHandle{entry: r.re}.SetReadValue(val) + require.Equal(t, v, string(h.RawBuffer())) + h.Release() +} + +func (r *testReader) setError(err error) { + ReadHandle{entry: r.re}.SetReadError(err) +} + +func TestReadShard(t *testing.T) { + var c *shard + var readers map[string]*testReader + var mu sync.Mutex + + datadriven.RunTest(t, "testdata/read_shard", + func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "init": + var maxSize int64 + td.ScanArgs(t, "max-size", &maxSize) + c = &shard{} + c.init(maxSize) + if len(readers) > 0 { + t.Fatalf("have %d readers that have not completed", len(readers)) + } + readers = map[string]*testReader{} + return "" + + case "get": + var name string + td.ScanArgs(t, "name", &name) + if _, ok := readers[name]; ok { + t.Fatalf("reader with name %s already exists", name) + } + var id, fileNum, offset int + td.ScanArgs(t, "id", &id) + td.ScanArgs(t, "file-num", &fileNum) + td.ScanArgs(t, "offset", &offset) + ctx := context.Background() + if td.HasArg("cancelled-context") { + var cancelFunc context.CancelFunc + ctx, cancelFunc = context.WithCancel(ctx) + cancelFunc() + } + r := newTestReader(ctx, ID(id), base.DiskFileNum(fileNum), uint64(offset), &mu) + val := r.getAsync(c) + if val != nil { + return fmt.Sprintf("val: %s", *val) + } + readers[name] = r + time.Sleep(10 * time.Millisecond) + return fmt.Sprintf("waiting\nmap-len: %d", c.readShard.lenForTesting()) + + case "wait": + var name string + td.ScanArgs(t, "name", &name) + val, err := readers[name].waitUntilFinishedWait() + if val != nil || err != nil { + delete(readers, name) + if val != nil { + return fmt.Sprintf("val: %s\nmap-len: %d", *val, c.readShard.lenForTesting()) + } + if err != nil { + return fmt.Sprintf("err: %s\nmap-len: %d", err.Error(), c.readShard.lenForTesting()) + } + } + return fmt.Sprintf("turn to read\nmap-len: %d", c.readShard.lenForTesting()) + + case "set-read-value": + var name string + td.ScanArgs(t, "name", &name) + var val string + td.ScanArgs(t, "val", &val) + readers[name].setReadValue(t, val) + delete(readers, name) + time.Sleep(10 * time.Millisecond) + return fmt.Sprintf("map-len: %d", c.readShard.lenForTesting()) + + case "set-error": + var name string + td.ScanArgs(t, "name", &name) + readers[name].setError(errors.Errorf("read error: %s", name)) + delete(readers, name) + time.Sleep(10 * time.Millisecond) + return fmt.Sprintf("map-len: %d", c.readShard.lenForTesting()) + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + + } + }) +} + +// testSyncReaders is the config for multiple readers concurrently reading the +// same block. +type testSyncReaders struct { + // id, fileNum, offset are the key. + id ID + fileNum base.DiskFileNum + offset uint64 + // val will be the value read, if not found in the cache. + val []byte + // numReaders is the number of concurrent readers. + numReaders int + // readerWithErrIndex is a reader that will have a read error and hand a + // turn to another reader. + readerWithErrIndex int + // sleepDuration is the duration that the reader with the turns sleeps + // before setting the value or error. + sleepDuration time.Duration + // wg is used to wait for all reader goroutines to be done. + wg sync.WaitGroup +} + +func TestReadShardConcurrent(t *testing.T) { + cache := New(rand.Int63n(20 << 10)) + defer cache.Unref() + var differentReaders []*testSyncReaders + // 50 blocks are read. + for i := 0; i < 50; i++ { + valLen := rand.Intn(100) + 1 + val := make([]byte, valLen) + crand.Read(val) + readers := &testSyncReaders{ + id: ID(rand.Uint64()), + fileNum: base.DiskFileNum(rand.Uint64()), + offset: rand.Uint64(), + val: val, + numReaders: 5, + readerWithErrIndex: rand.Intn(5), + sleepDuration: time.Duration(rand.Intn(2)) * time.Millisecond, + } + readers.wg.Add(readers.numReaders) + differentReaders = append(differentReaders, readers) + } + for _, r := range differentReaders { + for j := 0; j < r.numReaders; j++ { + go func(r *testSyncReaders, index int) { + h, rh, _, _, err := cache.GetWithReadHandle(context.Background(), r.id, r.fileNum, r.offset) + require.NoError(t, err) + if h.Valid() { + require.Equal(t, r.val, h.RawBuffer()) + h.Release() + r.wg.Done() + return + } + if r.sleepDuration > 0 { + time.Sleep(r.sleepDuration) + } + if r.readerWithErrIndex == index { + rh.SetReadError(errors.Errorf("error")) + r.wg.Done() + return + } + v := Alloc(len(r.val)) + copy(v.Buf(), r.val) + h = rh.SetReadValue(v) + h.Release() + r.wg.Done() + }(r, j) + } + } + for _, r := range differentReaders { + r.wg.Wait() + } +} diff --git a/internal/cache/refcnt_normal.go b/internal/cache/refcnt_normal.go index 9ab3348613..bf1a475691 100644 --- a/internal/cache/refcnt_normal.go +++ b/internal/cache/refcnt_normal.go @@ -37,6 +37,15 @@ func (v *refcnt) acquire() { } } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the container. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) +} + func (v *refcnt) release() bool { switch v := v.val.Add(-1); { case v < 0: @@ -48,6 +57,10 @@ func (v *refcnt) release() bool { } } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { } diff --git a/internal/cache/refcnt_tracing.go b/internal/cache/refcnt_tracing.go index 1d5e6c0219..dd94088e24 100644 --- a/internal/cache/refcnt_tracing.go +++ b/internal/cache/refcnt_tracing.go @@ -41,6 +41,16 @@ func (v *refcnt) acquire() { v.trace("acquire") } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the container. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) + v.trace("acquire") +} + func (v *refcnt) release() bool { n := v.val.Add(-1) switch { @@ -51,6 +61,10 @@ func (v *refcnt) release() bool { return n == 0 } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack()) v.Lock() diff --git a/internal/cache/testdata/read_shard b/internal/cache/testdata/read_shard new file mode 100644 index 0000000000..48b0321ba2 --- /dev/null +++ b/internal/cache/testdata/read_shard @@ -0,0 +1,163 @@ +# Cache cannot store anything. +init max-size=0 +---- + +# Get misses the cache and has turn to read. +get name=s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s1_f2_off3 +---- +turn to read +map-len: 1 + +# Sets the value read, but will be forgotten by the cache. +set-read-value name=s1_f2_off3 val=potato +---- +map-len: 0 + +# Another get for the same block, which gets the turn to read. +get name=s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s1_f2_off3 +---- +turn to read +map-len: 1 + +# A concurrent get for the same block. +get name=another_s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +# A get with a different id but the same file-num and offset. It is a +# different block. +get name=s2_f2_off3 id=2 file-num=2 offset=3 +---- +waiting +map-len: 2 + +# The get with id=2 also gets a turn to read, and sets an error. +wait name=s2_f2_off3 +---- +turn to read +map-len: 2 + +set-error name=s2_f2_off3 +---- +map-len: 1 + +# The first get reads and sets a value. +set-read-value name=s1_f2_off3 val=eggplant +---- +map-len: 0 + +# The concurrent get sees this value, which was never successfully stored in +# the cache. +wait name=another_s1_f2_off3 +---- +val: eggplant +map-len: 0 + +# Get with id=2 gets a turn to read. +get name=s2_f2_off3 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s2_f2_off3 +---- +turn to read +map-len: 1 + +# Two more concurrent gets, one with a cancelled context. +get name=s2_f2_off3_2 id=2 file-num=2 offset=3 cancelled-context +---- +waiting +map-len: 1 + +get name=s2_f2_off3_3 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s2_f2_off3_2 +---- +err: context canceled +map-len: 1 + +# The get with the read turn sets an error. +set-error name=s2_f2_off3 +---- +map-len: 1 + +# The waiting reader gets a turn. +wait name=s2_f2_off3_3 +---- +turn to read +map-len: 1 + +# Two more concurrent gets. +get name=s2_f2_off3_4 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +get name=s2_f2_off3_5 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +# The get with a read turn sets a value. +set-read-value name=s2_f2_off3_3 val=aubergine +---- +map-len: 0 + +wait name=s2_f2_off3_4 +---- +val: aubergine +map-len: 0 + +wait name=s2_f2_off3_5 +---- +val: aubergine +map-len: 0 + +# Cache with some storage capacity +init max-size=100 +---- + +# Get misses the cache and has turn to read. +get name=s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s1_f2_off3 +---- +turn to read +map-len: 1 + +# Concurrent reader. +get name=s1_f2_off3_2 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +set-read-value name=s1_f2_off3 val=brinjal +---- +map-len: 0 + +wait name=s1_f2_off3_2 +---- +val: brinjal +map-len: 0 + +get name=s1_f2_off3_3 id=1 file-num=2 offset=3 +---- +val: brinjal diff --git a/sstable/block/buffer_pool.go b/sstable/block/buffer_pool.go index 38809b41ed..4df870fe81 100644 --- a/sstable/block/buffer_pool.go +++ b/sstable/block/buffer_pool.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" + "github.com/cockroachdb/pebble/internal/invariants" ) // Alloc allocates a new Value for a block of length n (excluding the block @@ -52,12 +53,21 @@ func (b Value) BlockMetadata() *Metadata { // backed by a buffer pool, MakeHandle inserts the value into the block cache, // returning a handle to the now resident value. func (b Value) MakeHandle( - c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, + crh cache.ReadHandle, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, ) BufferHandle { if b.buf.Valid() { + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle was valid") + } return BufferHandle{b: b.buf} } - return BufferHandle{h: c.Set(cacheID, fileNum, offset, b.v)} + return BufferHandle{h: crh.SetReadValue(b.v)} +} + +func (b Value) SetInCacheForTesting( + c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, +) cache.Handle { + return c.Set(cacheID, fileNum, offset, b.v) } // Release releases the handle. diff --git a/sstable/colblk/index_block_test.go b/sstable/colblk/index_block_test.go index 87f5d6a6bf..3be6ff408e 100644 --- a/sstable/colblk/index_block_test.go +++ b/sstable/colblk/index_block_test.go @@ -125,7 +125,7 @@ func TestIndexIterInitHandle(t *testing.T) { d := (*IndexBlockDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(blockData) - h := v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0) + h := v.SetInCacheForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) defer h.Release() getBlockAndIterate := func(it *IndexIter) { diff --git a/sstable/colblk/keyspan_test.go b/sstable/colblk/keyspan_test.go index 9592dabcc1..9f7c99d870 100644 --- a/sstable/colblk/keyspan_test.go +++ b/sstable/colblk/keyspan_test.go @@ -88,7 +88,7 @@ func TestKeyspanBlockPooling(t *testing.T) { copy(v.BlockData(), b) d := (*KeyspanDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(v.BlockData()) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheForTesting(c, cache.ID(1), base.DiskFileNum(1), 0).Release() getBlockAndIterate := func() { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/reader.go b/sstable/reader.go index 8d4f96dfb7..48853d980d 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -507,17 +507,48 @@ func (r *Reader) readBlockInternal( bh block.Handle, initBlockMetadataFn func(*block.Metadata, []byte) error, ) (handle block.BufferHandle, _ error) { - if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Valid() { - // Cache hit. - if readHandle != nil { - readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + var ch cache.Handle + var crh cache.ReadHandle + hit := true + if env.BufferPool == nil { + var errorDuration time.Duration + var err error + ch, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle( + ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) { + r.logger.Eventf( + ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String()) + } + // TODO(sumeer): consider tracing when waited longer than some duration + // for turn to do the read. + if err != nil { + return block.BufferHandle{}, err + } + } else { + // The compaction path uses env.BufferPool, and does not coordinate read + // using a cache.ReadHandle. This is ok since only a single compaction is + // reading a block. + ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if ch.Valid() { + hit = true } - env.BlockServedFromCache(bh.Length) - return block.CacheBufferHandle(h), nil + } + // INVARIANT: hit => ch.Valid() + if ch.Valid() { + if hit { + // Cache hit. + if readHandle != nil { + readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + } + env.BlockServedFromCache(bh.Length) + } + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle must not be valid") + } + return block.CacheBufferHandle(ch), nil } - // Cache miss. - + // Need to read. First acquire loadBlockSema, if needed. if sema := r.loadBlockSema; sema != nil { if err := sema.Acquire(ctx, 1); err != nil { // An error here can only come from the context. @@ -525,7 +556,26 @@ func (r *Reader) readBlockInternal( } defer sema.Release(1) } + value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn) + if err != nil { + if crh.Valid() { + crh.SetReadError(err) + } + return block.BufferHandle{}, err + } + h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + return h, nil +} +// doRead is a helper for readBlockInternal that does the read, checksum +// check, decompression, and returns either a block.Value or an error. +func (r *Reader) doRead( + ctx context.Context, + env readBlockEnv, + readHandle objstorage.ReadHandle, + bh block.Handle, + initBlockMetadataFn func(*block.Metadata, []byte) error, +) (block.Value, error) { compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool) readStopwatch := makeStopwatch() var err error @@ -548,17 +598,15 @@ func (r *Reader) readBlockInternal( } if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } env.BlockRead(bh.Length, readDuration) - if err := checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { + if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - typ := block.CompressionIndicator(compressed.BlockData()[bh.Length]) compressed.Truncate(int(bh.Length)) - var decompressed block.Value if typ == block.NoCompressionIndicator { decompressed = compressed @@ -567,23 +615,21 @@ func (r *Reader) readBlockInternal( decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData()) if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - decompressed = block.Alloc(decodedLen, env.BufferPool) err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData()) compressed.Release() if err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } } - if err := initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { + if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) - return h, nil + return decompressed, nil } func (r *Reader) readMetaindex(