diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 2459ec7b69..53f756cb96 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -18,6 +18,7 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( + "context" "fmt" "os" "runtime" @@ -25,6 +26,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" @@ -114,9 +116,31 @@ type shard struct { countHot int64 countCold int64 countTest int64 + + // Some fields in readShard are protected by mu. See comments in declaration + // of readShard. + readShard readShard +} + +func (c *shard) init(maxSize int64) { + *c = shard{ + maxSize: maxSize, + coldTarget: maxSize, + } + if entriesGoAllocated { + c.entries = make(map[*entry]struct{}) + } + c.blocks.Init(16) + c.files.Init(16) + c.readShard.Init(c) } -func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { +// getWithMaybeReadEntry is the internal helper for implementing +// Cache.{Get,GetWithReadHandle}. When desireReadEntry is true, and the block +// is not in the cache (!Handle.Valid()), a non-nil readEntry is returned. +func (c *shard) getWithMaybeReadEntry( + id ID, fileNum base.DiskFileNum, offset uint64, desireReadEntry bool, +) (Handle, *readEntry) { c.mu.RLock() var value *Value if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { @@ -126,12 +150,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { } } c.mu.RUnlock() + var re *readEntry + if value == nil && desireReadEntry { + c.mu.Lock() + // After the c.mu.RUnlock(), someone could have inserted the value in the + // cache. We could tolerate the race and do a file read, or do another map + // lookup. We choose to do the latter, since the cost of a map lookup is + // insignificant compared to the cost of reading a block from a file. + if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { + value = e.acquireValue() + if value != nil { + e.referenced.Store(true) + } + } + if value == nil { + re = c.readShard.getReadEntryLocked(id, fileNum, offset) + } + c.mu.Unlock() + } if value == nil { c.misses.Add(1) - return Handle{} + } else { + c.hits.Add(1) } - c.hits.Add(1) - return Handle{value: value} + return Handle{value: value}, re } func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle { @@ -170,6 +212,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value value.ref.trace("add-hot") c.sizeHot += delta } else { + // TODO(sumeer): unclear why we don't set e.ptype to etHot on this path. + // In the default case below, where the state is etTest we set it to + // etHot. But etTest is "colder" than etCold, since the only transition + // into etTest is etCold => etTest, so since etTest transitions to + // etHot, then etCold should also transition. value.ref.trace("add-cold") c.sizeCold += delta } @@ -737,15 +784,7 @@ func newShards(size int64, shards int) *Cache { c.idAlloc.Store(1) c.trace("alloc", c.refs.Load()) for i := range c.shards { - c.shards[i] = shard{ - maxSize: size / int64(len(c.shards)), - coldTarget: size / int64(len(c.shards)), - } - if entriesGoAllocated { - c.shards[i].entries = make(map[*entry]struct{}) - } - c.shards[i].blocks.Init(16) - c.shards[i].files.Init(16) + c.shards[i].init(size / int64(len(c.shards))) } // Note: this is a no-op if invariants are disabled or race is enabled. @@ -822,7 +861,49 @@ func (c *Cache) Unref() { // Get retrieves the cache value for the specified file and offset, returning // nil if no value is present. func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { - return c.getShard(id, fileNum, offset).Get(id, fileNum, offset) + h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry( + id, fileNum, offset, false) + if invariants.Enabled && re != nil { + panic("readEntry should be nil") + } + return h +} + +// GetWithReadHandle retrieves the cache value for the specified ID, fileNum +// and offset. If found, a valid Handle is returned (with cacheHit set to +// true), else a valid ReadHandle is returned. +// +// See the ReadHandle declaration for the contract the caller must satisfy +// when getting a valid ReadHandle. +// +// This method can block before returning since multiple concurrent gets for +// the same cache value will take turns getting a ReadHandle, which represents +// permission to do the read. This blocking respects context cancellation, in +// which case an error is returned (and not a valid ReadHandle). +// +// When blocking, the errorDuration return value can be non-zero and is +// populated with the total duration that other readers that observed an error +// (see ReadHandle.SetReadError) spent in doing the read. This duration can be +// greater than the time spent blocked in this method, since some of these +// errors could have occurred prior to this call. But it serves as a rough +// indicator of whether turn taking could have caused higher latency due to +// context cancellation of other readers. +// +// While waiting, someone else may successfully read the value, which results +// in a valid Handle being returned. This is a case where cacheHit=false. +func (c *Cache) GetWithReadHandle( + ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64, +) (h Handle, rh ReadHandle, errorDuration time.Duration, cacheHit bool, err error) { + h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry( + id, fileNum, offset, true) + if h.Valid() { + return h, ReadHandle{}, 0, true, nil + } + h, errorDuration, err = re.waitForReadPermissionOrHandle(ctx) + if err != nil || h.Valid() { + return h, ReadHandle{}, errorDuration, false, err + } + return Handle{}, ReadHandle{entry: re}, errorDuration, false, nil } // Set sets the cache value for the specified file and offset, overwriting an diff --git a/internal/cache/read_shard.go b/internal/cache/read_shard.go new file mode 100644 index 0000000000..432edc1e4c --- /dev/null +++ b/internal/cache/read_shard.go @@ -0,0 +1,414 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/swiss" +) + +// readShard coordinates the read of a block that will be put in the cache. It +// ensures only one goroutine is reading a block, and other callers block +// until that goroutine is done (with success or failure). In the case of +// success, the other goroutines will use the value that was read, even if it +// is too large to be placed in the cache, or got evicted from the cache +// before they got scheduled. In the case of a failure (read error or context +// cancellation), one of the waiters will be given a turn to do the read. +// +// This turn-taking ensures that a large number of concurrent attempts to read +// the same block that is not in the cache does not result in the same number +// of reads from the filesystem (or remote storage). We have seen large spikes +// in memory usage (and CPU usage for memory allocation/deallocation) without +// this turn-taking. +// +// It introduces a small risk related to context cancellation -- if many +// readers assigned a turn exceed their deadline while doing the read and +// report an error, a reader with a longer deadline can unnecessarily wait. We +// accept this risk for now since the primary production use in CockroachDB is +// filesystem reads, where context cancellation is not respected. We do +// introduce an error duration metric emitted in traces that can be used to +// quantify such wasteful waiting. Note that this same risk extends to waiting +// on the Options.LoadBlockSema, so the error duration metric includes the +// case of an error when waiting on the semaphore (as a side effect of that +// waiting happening in the caller, sstable.Reader). +// +// Design choices and motivation: +// +// - readShard is tightly integrated with a cache shard: At its core, +// readShard is a map with synchronization. For the same reason the cache is +// sharded (for higher concurrency by sharding the mutex), it is beneficial +// to shard synchronization on readShard. By making readShard a member of +// shard, this sharding is trivially accomplished. Additionally, the code +// feels cleaner when there isn't a race between a cache miss, followed by +// creating a readEntry that is no longer needed because someone else has +// done the read since the miss and inserted into the cache. By making the +// readShard use shard.mu, such a race is avoided. A side benefit is that +// the cache interaction can be hidden behind readEntry.SetReadValue. One +// disadvantage of this tightly integrated design is that it does not +// encompass readers that will put the read value into a block.BufferPool -- +// we don't worry about those since block.BufferPool is only used for +// compactions and there is at most one compaction reader of a block. There +// is the possibility that the compaction reader and a user-facing iterator +// reader will do duplicate reads, but we accept that deficiency. +// +// - readMap is separate from shard.blocks map: One could have a design which +// extends the cache entry and unifies the two maps. However, we never want +// to evict a readEntry while there are readers waiting for the block read +// (including the case where the corresponding file is being removed from +// shard.files). Also, the number of stable cache entries is huge and +// therefore is manually allocated, while the number of readEntries is small +// (so manual allocation isn't necessary). For these reasons we maintain a +// separate map. This separation also results in more modular code, instead +// of piling more stuff into shard. +type readShard struct { + // shard is only used for locking, and calling shard.Set. + shard *shard + // Protected by shard.mu. + // + // shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared + // resource and must be released quickly. + shardMu struct { + readMap swiss.Map[key, *readEntry] + } +} + +func (rs *readShard) Init(shard *shard) *readShard { + *rs = readShard{ + shard: shard, + } + // Choice of 16 is arbitrary. + rs.shardMu.readMap.Init(16) + return rs +} + +// getReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is +// already write locked. +func (rs *readShard) getReadEntryLocked(id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + k := key{fileKey{id, fileNum}, offset} + e, ok := rs.shardMu.readMap.Get(k) + if !ok { + e = newReadEntry(rs, id, fileNum, offset) + rs.shardMu.readMap.Put(k, e) + } else { + e.refCount.acquireAllowZero() + } + return e +} + +func (rs *readShard) lenForTesting() int { + rs.shard.mu.Lock() + defer rs.shard.mu.Unlock() + return rs.shardMu.readMap.Len() +} + +// readEntry is used to coordinate between concurrent attempted readers of the +// same block. +type readEntry struct { + readShard *readShard + id ID + fileNum base.DiskFileNum + offset uint64 + mu struct { + sync.RWMutex + // v, when non-nil, has a ref from readEntry, which is unreffed when + // readEntry is deleted from the readMap. + v *Value + // isReading and ch together capture the state of whether someone has been + // granted a turn to read, and of readers waiting for that read to finish. + // ch is lazily allocated since most readEntries will not see concurrent + // readers. This lazy allocation results in one transition of ch from nil + // to non-nil, so waiters can read this non-nil ch and block on reading + // from it without holding mu. + // + // ch is written to, to signal one waiter to start doing the read. ch is + // closed when the value is successfully read and has been stored in v, so + // that all waiters wake up and read v. ch is a buffered channel with a + // capacity of 1. + // + // State transitions when trying to wait for turn: + // Case !isReading: + // set isReading=true; Drain the ch if non-nil and non-empty; proceed + // with turn to do the read. + // Case isReading: + // allocate ch if nil; wait on ch + // Finished reading successfully: + // set isReading=false; if ch is non-nil, close ch. + // Finished reading with failure: + // set isReading=false; if ch is non-nil, write to ch. + // + // INVARIANT: + // isReading => ch is nil or ch is empty. + isReading bool + ch chan struct{} + // Total duration of reads and semaphore waiting that resulted in error. + errorDuration time.Duration + readStart time.Time + } + // Count of ReadHandles that refer to this readEntry. Increments always hold + // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe + // to delete readEntry from readShard.shardMu.readMap. + refCount refcnt +} + +var readEntryPool = sync.Pool{ + New: func() interface{} { + return &readEntry{} + }, +} + +func newReadEntry(rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + e := readEntryPool.Get().(*readEntry) + *e = readEntry{ + readShard: rs, + id: id, + fileNum: fileNum, + offset: offset, + } + e.refCount.init(1) + return e +} + +// waitForReadPermissionOrHandle returns either an already read value (in +// Handle), an error (if the context was cancelled), or neither, which is a +// directive to the caller to do the read. In this last case the caller must +// call either setReadValue or setReadError. +// +// In all cases, errorDuration is populated with the total duration that +// readers that observed an error (setReadError) spent in doing the read. This +// duration can be greater than the time spend in waitForReadPermissionHandle, +// since some of these errors could have occurred prior to this call. But it +// serves as a rough indicator of whether turn taking could have caused higher +// latency due to context cancellation. +func (e *readEntry) waitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + constructHandleLocked := func() Handle { + if e.mu.v == nil { + panic("value is nil") + } + e.mu.v.acquire() + return Handle{value: e.mu.v} + } + becomeReaderLocked := func() { + if e.mu.v != nil { + panic("value is non-nil") + } + if e.mu.isReading { + panic("isReading is already true") + } + e.mu.isReading = true + if e.mu.ch != nil { + // Drain the channel, so that no one else mistakenly believes they + // should read. + select { + case <-e.mu.ch: + default: + } + } + e.mu.readStart = time.Now() + } + unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) { + removeState := e.makeTryRemoveStateLocked() + errorDuration = e.mu.errorDuration + if readLock { + e.mu.RUnlock() + } else { + e.mu.Unlock() + } + unrefAndTryRemoveFromMap(removeState) + return errorDuration + } + + for { + e.mu.Lock() + if e.mu.v != nil { + // Value has already been read. + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return h, errorDuration, nil + } + // Not already read. Wait for turn to do the read or for someone else to do + // the read. + if !e.mu.isReading { + // Have permission to do the read. + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } + if e.mu.ch == nil { + // Rare case when multiple readers are concurrently trying to read. If + // this turns out to be common enough we could use a sync.Pool. + e.mu.ch = make(chan struct{}, 1) + } + ch := e.mu.ch + e.mu.Unlock() + select { + case <-ctx.Done(): + e.mu.Lock() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, ctx.Err() + case _, ok := <-ch: + if !ok { + // Channel closed, so value was read. + e.mu.RLock() + if e.mu.v == nil { + panic("value is nil") + } + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(true) + return h, errorDuration, nil + } + // Else, probably granted permission to do the read. NB: since isReading + // is false, someone else can slip through before this thread acquires + // e.mu, and take the turn. So try to actually get the turn by trying + // again in the loop. + } + } +} + +// tryRemoveState captures the state needed by tryRemoveFromMap. The caller +// constructs it before calling unrefAndTryRemoveFromMap since it typically +// held readEntry.mu, which avoids acquiring it again in +// unrefAndTryRemoveFromMap. +type tryRemoveState struct { + rs *readShard + k key + e *readEntry +} + +// makeTryRemoveStateLocked initializes tryRemoveState. +func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState { + return tryRemoveState{ + rs: e.readShard, + k: key{fileKey{e.id, e.fileNum}, e.offset}, + e: e, + } +} + +// unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs. +// It is possible that after unreffing that s.e has already been removed, and +// is now back in the sync.Pool, or being reused (for the same or different +// key). This is because after unreffing, which caused the s.e.refCount to +// become zero, but before acquiring shard.mu, it could have been incremented +// and decremented concurrently, and some other goroutine could have observed +// a different decrement to 0, and raced ahead and deleted s.e from the +// readMap. +func unrefAndTryRemoveFromMap(s tryRemoveState) { + if !s.e.refCount.release() { + return + } + s.rs.shard.mu.Lock() + e2, ok := s.rs.shardMu.readMap.Get(s.k) + if !ok || e2 != s.e { + // Already removed. + s.rs.shard.mu.Unlock() + return + } + if s.e.refCount.value() != 0 { + s.rs.shard.mu.Unlock() + return + } + // k => e and e.refCount == 0. And it cannot be incremented since + // shard.mu.Lock() is held. So remove from map. + s.rs.shardMu.readMap.Delete(s.k) + s.rs.shard.mu.Unlock() + + // Free s.e. + s.e.mu.Lock() + if s.e.mu.v != nil { + s.e.mu.v.release() + s.e.mu.v = nil + } + s.e.mu.Unlock() + *s.e = readEntry{} + readEntryPool.Put(s.e) +} + +func (e *readEntry) setReadValue(v *Value) Handle { + // Add to the cache before taking another ref for readEntry, since the cache + // expects ref=1 when it is called. + // + // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure + // that it is added as etHot. The common case will be e.refCount = 1, and we + // don't want to acquire e.mu twice, so one way to do this would be relax + // the invariant in shard.Set that requires Value.refs() == 1. Then we can + // do the work under e.mu before calling shard.Set. + h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v) + e.mu.Lock() + // Acquire a ref for readEntry, since we are going to remember it in e.mu.v. + v.acquire() + e.mu.v = v + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + // Inform all waiters so they can use e.mu.v. Not all readers have called + // readEntry.waitForReadPermissionOrHandle, and those will also use + // e.mu.v. + close(e.mu.ch) + } + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) + return h +} + +func (e *readEntry) setReadError(err error) { + e.mu.Lock() + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + select { + case e.mu.ch <- struct{}{}: + default: + panic("channel is not empty") + } + } + e.mu.errorDuration += time.Since(e.mu.readStart) + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) +} + +// ReadHandle represents a contract with a caller that had a miss when doing a +// cache lookup, and wants to do a read and insert the read block into the +// cache. The contract applies when ReadHandle.Valid returns true, in which +// case the caller has been assigned the turn to do the read (and others are +// potentially waiting for it). +// +// Contract: +// +// The caller must immediately start doing a read, or can first wait on a +// shared resource that would also block a different reader if it was assigned +// the turn instead (specifically, this refers to Options.LoadBlockSema). +// After the read, it must either call SetReadValue or SetReadError depending +// on whether the read succeeded or failed. +type ReadHandle struct { + entry *readEntry +} + +// Valid returns true for a valid ReadHandle. +func (rh ReadHandle) Valid() bool { + return rh.entry != nil +} + +// SetReadValue provides the Value that the caller has read. The caller is +// responsible for releasing the returned Handle when it is no longer needed. +func (rh ReadHandle) SetReadValue(v *Value) Handle { + return rh.entry.setReadValue(v) +} + +// SetReadError specifies that the caller has encountered a read error. +func (rh ReadHandle) SetReadError(err error) { + rh.entry.setReadError(err) +} diff --git a/internal/cache/read_shard_test.go b/internal/cache/read_shard_test.go new file mode 100644 index 0000000000..d6c1821633 --- /dev/null +++ b/internal/cache/read_shard_test.go @@ -0,0 +1,264 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + crand "crypto/rand" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" + "github.com/stretchr/testify/require" +) + +type testReader struct { + ctx context.Context + id ID + fileNum base.DiskFileNum + offset uint64 + re *readEntry + mu struct { + *sync.Mutex + finishedWait bool + waitedNeedToRead bool + waitedErr error + waitedValue string + + cond *sync.Cond + } +} + +func newTestReader( + ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64, mu *sync.Mutex, +) *testReader { + r := &testReader{ + ctx: ctx, + id: id, + fileNum: fileNum, + offset: offset, + } + r.mu.Mutex = mu + r.mu.cond = sync.NewCond(mu) + return r +} + +func (r *testReader) getAsync(shard *shard) *string { + h, re := shard.getWithMaybeReadEntry(r.id, r.fileNum, r.offset, true) + if h.Valid() { + v := string(h.RawBuffer()) + h.Release() + return &v + } + r.re = re + go func() { + h, _, err := re.waitForReadPermissionOrHandle(r.ctx) + r.mu.Lock() + defer r.mu.Unlock() + r.mu.finishedWait = true + r.mu.cond.Signal() + if h.Valid() { + r.mu.waitedValue = string(h.RawBuffer()) + h.Release() + return + } + if err != nil { + r.mu.waitedErr = err + return + } + r.mu.waitedNeedToRead = true + }() + return nil +} + +func (r *testReader) waitUntilFinishedWait() (*string, error) { + r.mu.Lock() + defer r.mu.Unlock() + for !r.mu.finishedWait { + r.mu.cond.Wait() + } + if r.mu.waitedNeedToRead { + return nil, nil + } + if r.mu.waitedErr != nil { + return nil, r.mu.waitedErr + } + return &r.mu.waitedValue, nil +} + +func (r *testReader) setReadValue(t *testing.T, v string) { + val := Alloc(len(v)) + copy(val.Buf(), []byte(v)) + h := ReadHandle{entry: r.re}.SetReadValue(val) + require.Equal(t, v, string(h.RawBuffer())) + h.Release() +} + +func (r *testReader) setError(err error) { + ReadHandle{entry: r.re}.SetReadError(err) +} + +func TestReadShard(t *testing.T) { + var c *shard + var readers map[string]*testReader + var mu sync.Mutex + + datadriven.RunTest(t, "testdata/read_shard", + func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "init": + var maxSize int64 + td.ScanArgs(t, "max-size", &maxSize) + c = &shard{} + c.init(maxSize) + if len(readers) > 0 { + t.Fatalf("have %d readers that have not completed", len(readers)) + } + readers = map[string]*testReader{} + return "" + + case "get": + var name string + td.ScanArgs(t, "name", &name) + if _, ok := readers[name]; ok { + t.Fatalf("reader with name %s already exists", name) + } + var id, fileNum, offset int + td.ScanArgs(t, "id", &id) + td.ScanArgs(t, "file-num", &fileNum) + td.ScanArgs(t, "offset", &offset) + ctx := context.Background() + if td.HasArg("cancelled-context") { + var cancelFunc context.CancelFunc + ctx, cancelFunc = context.WithCancel(ctx) + cancelFunc() + } + r := newTestReader(ctx, ID(id), base.DiskFileNum(fileNum), uint64(offset), &mu) + val := r.getAsync(c) + if val != nil { + return fmt.Sprintf("val: %s", *val) + } + readers[name] = r + time.Sleep(10 * time.Millisecond) + return fmt.Sprintf("waiting\nmap-len: %d", c.readShard.lenForTesting()) + + case "wait": + var name string + td.ScanArgs(t, "name", &name) + val, err := readers[name].waitUntilFinishedWait() + if val != nil || err != nil { + delete(readers, name) + if val != nil { + return fmt.Sprintf("val: %s\nmap-len: %d", *val, c.readShard.lenForTesting()) + } + if err != nil { + return fmt.Sprintf("err: %s\nmap-len: %d", err.Error(), c.readShard.lenForTesting()) + } + } + return fmt.Sprintf("turn to read\nmap-len: %d", c.readShard.lenForTesting()) + + case "set-read-value": + var name string + td.ScanArgs(t, "name", &name) + var val string + td.ScanArgs(t, "val", &val) + readers[name].setReadValue(t, val) + delete(readers, name) + time.Sleep(10 * time.Millisecond) + return fmt.Sprintf("map-len: %d", c.readShard.lenForTesting()) + + case "set-error": + var name string + td.ScanArgs(t, "name", &name) + readers[name].setError(errors.Errorf("read error: %s", name)) + delete(readers, name) + time.Sleep(10 * time.Millisecond) + return fmt.Sprintf("map-len: %d", c.readShard.lenForTesting()) + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + + } + }) +} + +// testSyncReaders is the config for multiple readers concurrently reading the +// same block. +type testSyncReaders struct { + // id, fileNum, offset are the key. + id ID + fileNum base.DiskFileNum + offset uint64 + // val will be the value read, if not found in the cache. + val []byte + // numReaders is the number of concurrent readers. + numReaders int + // readerWithErrIndex is a reader that will have a read error and hand a + // turn to another reader. + readerWithErrIndex int + // sleepDuration is the duration that the reader with the turns sleeps + // before setting the value or error. + sleepDuration time.Duration + // wg is used to wait for all reader goroutines to be done. + wg sync.WaitGroup +} + +func TestReadShardConcurrent(t *testing.T) { + cache := New(rand.Int63n(20 << 10)) + defer cache.Unref() + var differentReaders []*testSyncReaders + // 50 blocks are read. + for i := 0; i < 50; i++ { + valLen := rand.Intn(100) + 1 + val := make([]byte, valLen) + crand.Read(val) + readers := &testSyncReaders{ + id: ID(rand.Uint64()), + fileNum: base.DiskFileNum(rand.Uint64()), + offset: rand.Uint64(), + val: val, + numReaders: 5, + readerWithErrIndex: rand.Intn(5), + sleepDuration: time.Duration(rand.Intn(2)) * time.Millisecond, + } + readers.wg.Add(readers.numReaders) + differentReaders = append(differentReaders, readers) + } + for _, r := range differentReaders { + for j := 0; j < r.numReaders; j++ { + go func(r *testSyncReaders, index int) { + h, rh, _, _, err := cache.GetWithReadHandle(context.Background(), r.id, r.fileNum, r.offset) + require.NoError(t, err) + if h.Valid() { + require.Equal(t, r.val, h.RawBuffer()) + h.Release() + r.wg.Done() + return + } + if r.sleepDuration > 0 { + time.Sleep(r.sleepDuration) + } + if r.readerWithErrIndex == index { + rh.SetReadError(errors.Errorf("error")) + r.wg.Done() + return + } + v := Alloc(len(r.val)) + copy(v.Buf(), r.val) + h = rh.SetReadValue(v) + h.Release() + r.wg.Done() + }(r, j) + } + } + for _, r := range differentReaders { + r.wg.Wait() + } +} diff --git a/internal/cache/refcnt_normal.go b/internal/cache/refcnt_normal.go index 9ab3348613..bf1a475691 100644 --- a/internal/cache/refcnt_normal.go +++ b/internal/cache/refcnt_normal.go @@ -37,6 +37,15 @@ func (v *refcnt) acquire() { } } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the container. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) +} + func (v *refcnt) release() bool { switch v := v.val.Add(-1); { case v < 0: @@ -48,6 +57,10 @@ func (v *refcnt) release() bool { } } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { } diff --git a/internal/cache/refcnt_tracing.go b/internal/cache/refcnt_tracing.go index 1d5e6c0219..dd94088e24 100644 --- a/internal/cache/refcnt_tracing.go +++ b/internal/cache/refcnt_tracing.go @@ -41,6 +41,16 @@ func (v *refcnt) acquire() { v.trace("acquire") } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the container. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) + v.trace("acquire") +} + func (v *refcnt) release() bool { n := v.val.Add(-1) switch { @@ -51,6 +61,10 @@ func (v *refcnt) release() bool { return n == 0 } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack()) v.Lock() diff --git a/internal/cache/testdata/read_shard b/internal/cache/testdata/read_shard new file mode 100644 index 0000000000..48b0321ba2 --- /dev/null +++ b/internal/cache/testdata/read_shard @@ -0,0 +1,163 @@ +# Cache cannot store anything. +init max-size=0 +---- + +# Get misses the cache and has turn to read. +get name=s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s1_f2_off3 +---- +turn to read +map-len: 1 + +# Sets the value read, but will be forgotten by the cache. +set-read-value name=s1_f2_off3 val=potato +---- +map-len: 0 + +# Another get for the same block, which gets the turn to read. +get name=s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s1_f2_off3 +---- +turn to read +map-len: 1 + +# A concurrent get for the same block. +get name=another_s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +# A get with a different id but the same file-num and offset. It is a +# different block. +get name=s2_f2_off3 id=2 file-num=2 offset=3 +---- +waiting +map-len: 2 + +# The get with id=2 also gets a turn to read, and sets an error. +wait name=s2_f2_off3 +---- +turn to read +map-len: 2 + +set-error name=s2_f2_off3 +---- +map-len: 1 + +# The first get reads and sets a value. +set-read-value name=s1_f2_off3 val=eggplant +---- +map-len: 0 + +# The concurrent get sees this value, which was never successfully stored in +# the cache. +wait name=another_s1_f2_off3 +---- +val: eggplant +map-len: 0 + +# Get with id=2 gets a turn to read. +get name=s2_f2_off3 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s2_f2_off3 +---- +turn to read +map-len: 1 + +# Two more concurrent gets, one with a cancelled context. +get name=s2_f2_off3_2 id=2 file-num=2 offset=3 cancelled-context +---- +waiting +map-len: 1 + +get name=s2_f2_off3_3 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s2_f2_off3_2 +---- +err: context canceled +map-len: 1 + +# The get with the read turn sets an error. +set-error name=s2_f2_off3 +---- +map-len: 1 + +# The waiting reader gets a turn. +wait name=s2_f2_off3_3 +---- +turn to read +map-len: 1 + +# Two more concurrent gets. +get name=s2_f2_off3_4 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +get name=s2_f2_off3_5 id=2 file-num=2 offset=3 +---- +waiting +map-len: 1 + +# The get with a read turn sets a value. +set-read-value name=s2_f2_off3_3 val=aubergine +---- +map-len: 0 + +wait name=s2_f2_off3_4 +---- +val: aubergine +map-len: 0 + +wait name=s2_f2_off3_5 +---- +val: aubergine +map-len: 0 + +# Cache with some storage capacity +init max-size=100 +---- + +# Get misses the cache and has turn to read. +get name=s1_f2_off3 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +wait name=s1_f2_off3 +---- +turn to read +map-len: 1 + +# Concurrent reader. +get name=s1_f2_off3_2 id=1 file-num=2 offset=3 +---- +waiting +map-len: 1 + +set-read-value name=s1_f2_off3 val=brinjal +---- +map-len: 0 + +wait name=s1_f2_off3_2 +---- +val: brinjal +map-len: 0 + +get name=s1_f2_off3_3 id=1 file-num=2 offset=3 +---- +val: brinjal diff --git a/sstable/block/buffer_pool.go b/sstable/block/buffer_pool.go index 38809b41ed..4df870fe81 100644 --- a/sstable/block/buffer_pool.go +++ b/sstable/block/buffer_pool.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" + "github.com/cockroachdb/pebble/internal/invariants" ) // Alloc allocates a new Value for a block of length n (excluding the block @@ -52,12 +53,21 @@ func (b Value) BlockMetadata() *Metadata { // backed by a buffer pool, MakeHandle inserts the value into the block cache, // returning a handle to the now resident value. func (b Value) MakeHandle( - c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, + crh cache.ReadHandle, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, ) BufferHandle { if b.buf.Valid() { + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle was valid") + } return BufferHandle{b: b.buf} } - return BufferHandle{h: c.Set(cacheID, fileNum, offset, b.v)} + return BufferHandle{h: crh.SetReadValue(b.v)} +} + +func (b Value) SetInCacheForTesting( + c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, +) cache.Handle { + return c.Set(cacheID, fileNum, offset, b.v) } // Release releases the handle. diff --git a/sstable/colblk/index_block_test.go b/sstable/colblk/index_block_test.go index 87f5d6a6bf..3be6ff408e 100644 --- a/sstable/colblk/index_block_test.go +++ b/sstable/colblk/index_block_test.go @@ -125,7 +125,7 @@ func TestIndexIterInitHandle(t *testing.T) { d := (*IndexBlockDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(blockData) - h := v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0) + h := v.SetInCacheForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) defer h.Release() getBlockAndIterate := func(it *IndexIter) { diff --git a/sstable/colblk/keyspan_test.go b/sstable/colblk/keyspan_test.go index 9592dabcc1..9f7c99d870 100644 --- a/sstable/colblk/keyspan_test.go +++ b/sstable/colblk/keyspan_test.go @@ -88,7 +88,7 @@ func TestKeyspanBlockPooling(t *testing.T) { copy(v.BlockData(), b) d := (*KeyspanDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(v.BlockData()) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheForTesting(c, cache.ID(1), base.DiskFileNum(1), 0).Release() getBlockAndIterate := func() { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/reader.go b/sstable/reader.go index 8d4f96dfb7..48853d980d 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -507,17 +507,48 @@ func (r *Reader) readBlockInternal( bh block.Handle, initBlockMetadataFn func(*block.Metadata, []byte) error, ) (handle block.BufferHandle, _ error) { - if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Valid() { - // Cache hit. - if readHandle != nil { - readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + var ch cache.Handle + var crh cache.ReadHandle + hit := true + if env.BufferPool == nil { + var errorDuration time.Duration + var err error + ch, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle( + ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) { + r.logger.Eventf( + ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String()) + } + // TODO(sumeer): consider tracing when waited longer than some duration + // for turn to do the read. + if err != nil { + return block.BufferHandle{}, err + } + } else { + // The compaction path uses env.BufferPool, and does not coordinate read + // using a cache.ReadHandle. This is ok since only a single compaction is + // reading a block. + ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if ch.Valid() { + hit = true } - env.BlockServedFromCache(bh.Length) - return block.CacheBufferHandle(h), nil + } + // INVARIANT: hit => ch.Valid() + if ch.Valid() { + if hit { + // Cache hit. + if readHandle != nil { + readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + } + env.BlockServedFromCache(bh.Length) + } + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle must not be valid") + } + return block.CacheBufferHandle(ch), nil } - // Cache miss. - + // Need to read. First acquire loadBlockSema, if needed. if sema := r.loadBlockSema; sema != nil { if err := sema.Acquire(ctx, 1); err != nil { // An error here can only come from the context. @@ -525,7 +556,26 @@ func (r *Reader) readBlockInternal( } defer sema.Release(1) } + value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn) + if err != nil { + if crh.Valid() { + crh.SetReadError(err) + } + return block.BufferHandle{}, err + } + h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + return h, nil +} +// doRead is a helper for readBlockInternal that does the read, checksum +// check, decompression, and returns either a block.Value or an error. +func (r *Reader) doRead( + ctx context.Context, + env readBlockEnv, + readHandle objstorage.ReadHandle, + bh block.Handle, + initBlockMetadataFn func(*block.Metadata, []byte) error, +) (block.Value, error) { compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool) readStopwatch := makeStopwatch() var err error @@ -548,17 +598,15 @@ func (r *Reader) readBlockInternal( } if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } env.BlockRead(bh.Length, readDuration) - if err := checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { + if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - typ := block.CompressionIndicator(compressed.BlockData()[bh.Length]) compressed.Truncate(int(bh.Length)) - var decompressed block.Value if typ == block.NoCompressionIndicator { decompressed = compressed @@ -567,23 +615,21 @@ func (r *Reader) readBlockInternal( decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData()) if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - decompressed = block.Alloc(decodedLen, env.BufferPool) err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData()) compressed.Release() if err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } } - if err := initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { + if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) - return h, nil + return decompressed, nil } func (r *Reader) readMetaindex(