diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 2459ec7b69..c7cc870427 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -26,6 +26,7 @@ import ( "sync" "sync/atomic" + "github.com/cockroachdb/fifo" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" ) @@ -114,9 +115,22 @@ type shard struct { countHot int64 countCold int64 countTest int64 -} -func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { + // Some fields in readShard are protected by mu. See comments in declaration + // of readShard. + readShard readShard +} + +// GetWithMaybeReadHandle is the internal helper for implementing +// Cache.{Get,GetWithReadHandle}. loadBlockSema can be nil, and a non-nil +// value is only relevant when desireReadHandle is true. +func (c *shard) GetWithMaybeReadHandle( + id ID, + fileNum base.DiskFileNum, + offset uint64, + desireReadHandle bool, + loadBlockSema *fifo.Semaphore, +) (Handle, ReadHandle) { c.mu.RLock() var value *Value if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { @@ -126,12 +140,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { } } c.mu.RUnlock() + var rh ReadHandle + if value == nil && desireReadHandle { + c.mu.Lock() + // After the c.mu.RUnlock(), someone could have inserted the value in the + // cache. We could tolerate the race and do a file read, or do another map + // lookup. We choose to do the latter, since the cost of a map lookup is + // insignificant compared to the cost of reading a block from a file. + if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { + value = e.acquireValue() + if value != nil { + e.referenced.Store(true) + } + } + if value == nil { + rh = c.readShard.getReadHandleLocked(id, fileNum, offset, loadBlockSema) + } + c.mu.Unlock() + } if value == nil { c.misses.Add(1) - return Handle{} + } else { + c.hits.Add(1) } - c.hits.Add(1) - return Handle{value: value} + return Handle{value: value}, rh } func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle { @@ -170,6 +202,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value value.ref.trace("add-hot") c.sizeHot += delta } else { + // TODO(sumeer): unclear why we don't set e.ptype to etHot on this path. + // In the default case below, where the state is etTest we set it to + // etHot. But etTest is "colder" than etCold, since the only transition + // into etTest is etCold => etTest, so since etTest transitions to + // etHot, then etCold should also transition. value.ref.trace("add-cold") c.sizeCold += delta } @@ -746,6 +783,7 @@ func newShards(size int64, shards int) *Cache { } c.shards[i].blocks.Init(16) c.shards[i].files.Init(16) + c.shards[i].readShard.Init(&c.shards[i]) } // Note: this is a no-op if invariants are disabled or race is enabled. @@ -822,7 +860,24 @@ func (c *Cache) Unref() { // Get retrieves the cache value for the specified file and offset, returning // nil if no value is present. func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { - return c.getShard(id, fileNum, offset).Get(id, fileNum, offset) + h, rh := c.getShard(id, fileNum, offset).GetWithMaybeReadHandle( + id, fileNum, offset, false, nil) + if invariants.Enabled && rh.Valid() { + panic("ReadHandle should not be valid") + } + return h +} + +// GetWithReadHandle retrieves the cache value for the specified ID, fileNum +// and offset. If found, a valid Handle is returned, else a valid ReadHandle +// is returned. See the ReadHandle declaration for the contract must satisfy +// when getting a valid ReadHandle. The loadBlockSema if non-nil, is used by +// the ReadHandle -- see Options.LoadBlockSema for details. +func (c *Cache) GetWithReadHandle( + id ID, fileNum base.DiskFileNum, offset uint64, loadBlockSema *fifo.Semaphore, +) (Handle, ReadHandle) { + return c.getShard(id, fileNum, offset).GetWithMaybeReadHandle( + id, fileNum, offset, true, loadBlockSema) } // Set sets the cache value for the specified file and offset, overwriting an diff --git a/internal/cache/read_shard.go b/internal/cache/read_shard.go new file mode 100644 index 0000000000..fc9d7528d6 --- /dev/null +++ b/internal/cache/read_shard.go @@ -0,0 +1,447 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/fifo" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/swiss" +) + +// readShard coordinates the read of a block that will be put in the cache. It +// ensures only one goroutine is reading a block, and other callers block +// until that goroutine is done (with success or failure). In the case of +// success, the other goroutines will use the value that was read, even if it +// is too large to be placed in the cache, or got evicted from the cache +// before they got scheduled. In the case of a failure (read error or context +// cancellation), one of the waiters will be given a turn to do the read. +// +// This turn-taking ensures that a large number of concurrent attempts to read +// the same block that is not in the cache does not result in the same number +// of reads from the filesystem (or remote storage). We have seen large spikes +// in memory usage and read bandwidth without this turn-taking. +// +// It also introduces a small risk related to context cancellation -- if many +// readers assigned a turn exceed their deadline while doing the read and +// report an error, a reader with a longer deadline can unnecessarily wait. We +// accept this risk for now since the primary production use in CockroachDB is +// filesystem reads, where context cancellation is not respected. We do +// introduce an error duration metric emitted in traces that can be used to +// quantify such wasteful waiting. Note that this same risk extends to waiting +// on the Options.LoadBlockSema, so the error duration metric includes the +// case of an error when waiting on the semaphore. +// +// Design choices and motivation: +// +// - readShard is tightly integrated with a cache shard: At its core, +// readShard is a map with synchronization. For the same reason the cache is +// sharded (for higher concurrency by sharding the mutex), it is beneficial +// to shard synchronization on readShard. By making readShard a member of +// shard, this sharding is trivially accomplished. Additionally, the code +// feels cleaner when there isn't a race between a cache miss, followed by +// creating a readEntry that is no longer needed because someone else has +// done the read since the miss and inserted into the cache. By making the +// readShard use shard.mu, such a race is avoided. A side benefit is that +// the cache interaction can be hidden behind readEntry.SetReadValue. One +// disadvantage of this tightly integrated design is that it does not +// encompass readers that will put the read value into a block.BufferPool -- +// we don't worry about those since block.BufferPool is only used for +// compactions and there is at most one compaction reader of a block. There +// is the possibility that the compaction reader and a user-facing iterator +// reader will do duplicate reads, but we accept that deficiency. +// +// - readMap is separate from shard.blocks map: One could have a design which +// extends the cache entry and unifies the two maps. However, we never want +// to evict a readEntry while there are readers waiting for the block read +// (including the case where the corresponding file is being removed from +// shard.files). Also, the number of stable cache entries is huge and +// therefore is manually allocated, while the number of readEntries is small +// (so manual allocation isn't necessary). For these reasons we maintain a +// separate map. This separation also results in more modular code, instead +// of piling more stuff into shard. +type readShard struct { + shard *shard + // Protected by shard.mu. + // + // shard.mu must never be held when acquiring readEntry.mu since + // readEntry.mu can be held while waiting on readEntry.loadBlockSema. + // shard.mu is a shared resource and must be released quickly (also it can + // cause subtle deadlocks). + shardMu struct { + readMap swiss.Map[key, *readEntry] + } +} + +func (rs *readShard) Init(shard *shard) *readShard { + *rs = readShard{ + shard: shard, + } + // Choice of 16 is arbitrary. + rs.shardMu.readMap.Init(16) + return rs +} + +// getReadHandleLocked gets a ReadHandle for (id, fileNum, offset). shard.mu +// is already write locked. All callers with the same ID represent the same +// DB, and should pass the same loadBlockSema. NB: we cannot place the +// loadBlockSema in the readShard since the readShard can be shared across the +// DBs, just like the block cache. +func (rs *readShard) getReadHandleLocked( + id ID, fileNum base.DiskFileNum, offset uint64, loadBlockSema *fifo.Semaphore, +) ReadHandle { + k := key{fileKey{id, fileNum}, offset} + e, ok := rs.shardMu.readMap.Get(k) + if !ok { + e = newReadEntry(rs, id, fileNum, offset, loadBlockSema) + rs.shardMu.readMap.Put(k, e) + } else { + e.refCount.acquireAllowZero() + } + return ReadHandle{entry: e} +} + +// readEntry is used to coordinate between concurrent attempted readers of the +// same block. +type readEntry struct { + readShard *readShard + id ID + fileNum base.DiskFileNum + offset uint64 + loadBlockSema *fifo.Semaphore + mu struct { + // Mutex is held while waiting on loadBlockSema, and naturally protects + // everything in this struct. + sync.RWMutex + // If readShard.loadBlockSema != nil, this represents whether the + // semaphore is held of not. It will transition at most once from false to + // true and back to false. While there are waiting readers and the + // reader(s) assigned the turn are failing (SetReadError), this will + // continue to be held. + loadBlockSemaHeld bool + // v, when non-nil, has a ref from readEntry, which is unreffed when + // readEntry is deleted from the readMap. + v *Value + // isReading and ch together capture the state of whether someone has been + // granted a turn to read, and of readers waiting for that read to finish. + // ch is lazily allocated since most readEntries will not see concurrent + // readers. This lazy allocation results in one transition of ch from nil + // to non-nil, so waiters can read this non-nil ch and block on reading + // from it without holding mu. + // + // ch is written to, to signal one waiter to start doing the read. ch is + // closed when the value is successfully read and has been stored in v, so + // that all waiters wake up and read v. ch is a buffered channel with a + // capacity of 1. + // + // State transitions when trying to wait for turn: + // Case !isReading: + // set isReading=true; Drain the ch if non-nil and non-empty; proceed + // with turn to do the read. + // Case isReading: + // allocate ch if nil; wait on ch + // Finished reading successfully: + // set isReading=false; if ch is non-nil, close ch. + // Finished reading with failure: + // set isReading=false; if ch is non-nil, write to ch. + // + // INVARIANT: + // isReading => ch is nil or ch is empty. + isReading bool + ch chan struct{} + // Total duration of reads and semaphore waiting that resulted in error. + errorDuration time.Duration + readStart time.Time + } + // Count of ReadHandles that refer to this readEntry. Increments always hold + // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe + // to delete readEntry from readShard.shardMu.readMap. + refCount refcnt +} + +var readEntryPool = sync.Pool{ + New: func() interface{} { + return &readEntry{} + }, +} + +func newReadEntry( + rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64, loadBlockSema *fifo.Semaphore, +) *readEntry { + e := readEntryPool.Get().(*readEntry) + *e = readEntry{ + readShard: rs, + id: id, + fileNum: fileNum, + offset: offset, + loadBlockSema: loadBlockSema, + } + e.refCount.init(1) + return e +} + +func (e *readEntry) waitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + constructHandleLocked := func() Handle { + if e.mu.v == nil { + panic("value is nil") + } + e.mu.v.acquire() + return Handle{value: e.mu.v} + } + becomeReaderLocked := func() { + if e.mu.v != nil { + panic("value is non-nil") + } + e.mu.isReading = true + if e.mu.ch != nil { + // Drain the channel, so that no one else mistakenly believes they + // should read. + select { + case <-e.mu.ch: + default: + } + } + e.mu.readStart = time.Now() + } + unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) { + removeState := e.makeTryRemoveStateLocked() + errorDuration = e.mu.errorDuration + if readLock { + e.mu.RUnlock() + } else { + e.mu.Unlock() + } + unrefAndTryRemoveFromMap(removeState) + return errorDuration + } + e.mu.Lock() + if e.mu.v != nil { + // Value has already been read. + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return h, errorDuration, nil + } + // Not already read. + if e.loadBlockSema != nil && !e.mu.loadBlockSemaHeld { + waitStart := time.Now() + err := e.loadBlockSema.Acquire(ctx, 1) + if err != nil { + e.mu.errorDuration += time.Since(waitStart) + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, err + } + e.mu.loadBlockSemaHeld = true + } + // Wait for turn to do the read or for someone else to do the read. + if !e.mu.isReading { + // Have permission to do the read. + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } + if e.mu.ch == nil { + // Rare case when multiple readers are concurrently trying to read. If + // this turns out to be common enough we could use a sync.Pool. + e.mu.ch = make(chan struct{}, 1) + } + ch := e.mu.ch + e.mu.Unlock() + select { + case <-ctx.Done(): + e.mu.Lock() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, ctx.Err() + case _, ok := <-ch: + if ok { + // Granted permission to do the read. + e.mu.Lock() + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } else { + // Channel closed, so value was read. + e.mu.RLock() + if e.mu.v == nil { + panic("value is nil") + } + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(true) + return h, errorDuration, nil + } + } +} + +// tryRemoveState captures the state needed by tryRemoveFromMap. The caller +// constructs it before calling unrefAndTryRemoveFromMap since it typically +// held readEntry.mu, which avoids acquiring it again in +// unrefAndTryRemoveFromMap. +type tryRemoveState struct { + rs *readShard + k key + e *readEntry +} + +// makeTryRemoveStateLocked initializes tryRemoveState. +func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState { + return tryRemoveState{ + rs: e.readShard, + k: key{fileKey{e.id, e.fileNum}, e.offset}, + e: e, + } +} + +// unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs. +// It is possible that after unreffing that s.e has already been removed, and +// is now back in the sync.Pool, or being reused (for the same or different +// key). This is because after unreffing, which caused the s.e.refCount to +// become zero, but before acquiring shard.mu, it could have been incremented +// and decremented concurrently, and some other goroutine could have observed +// a different decrement to 0, and raced ahead and deleted s.e from the +// readMap. +func unrefAndTryRemoveFromMap(s tryRemoveState) { + if !s.e.refCount.release() { + return + } + s.rs.shard.mu.Lock() + e2, ok := s.rs.shardMu.readMap.Get(s.k) + if !ok || e2 != s.e { + // Already removed. + s.rs.shard.mu.Unlock() + return + } + if s.e.refCount.value() != 0 { + s.rs.shard.mu.Unlock() + return + } + // k => e and e.refCount == 0. And it cannot be incremented since + // shard.mu.Lock() is held. So remove from map. + s.rs.shardMu.readMap.Delete(s.k) + s.rs.shard.mu.Unlock() + + // Free s.e. + s.e.mu.Lock() + if s.e.mu.loadBlockSemaHeld { + s.e.loadBlockSema.Release(1) + s.e.mu.loadBlockSemaHeld = false + } + if s.e.mu.v != nil { + s.e.mu.v.release() + s.e.mu.v = nil + } + s.e.mu.Unlock() + *s.e = readEntry{} + readEntryPool.Put(s.e) +} + +func (e *readEntry) setReadValue(v *Value) Handle { + // Add to the cache before taking another ref for readEntry, since the cache + // expects ref=1 when it is called. + // + // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure + // that it is added as etHot. The common case will be e.refCount = 1, and we + // don't want to acquire e.mu twice, so one way to do this would be relax + // the invariant in shard.Set that requires Value.refs() == 1. Then we can + // do the work under e.mu before calling shard.Set. + h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v) + e.mu.Lock() + // Acquire a ref for readEntry, since we are going to remember it in e.mu.v. + v.acquire() + e.mu.v = v + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + // Inform all waiters so they can use e.mu.v. Not all readers have called + // readEntry.waitForReadPermissionOrHandle, and those will also use + // e.mu.v. + close(e.mu.ch) + } + if e.mu.loadBlockSemaHeld { + // Release the loadBlockSema now since it is a shared resource, and we + // don't want to wait until refCount drops to 0. + e.loadBlockSema.Release(1) + e.mu.loadBlockSemaHeld = false + } + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) + return h +} + +func (e *readEntry) setReadError(err error) { + e.mu.Lock() + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + select { + case e.mu.ch <- struct{}{}: + default: + panic("channel is not empty") + } + } + e.mu.errorDuration += time.Since(e.mu.readStart) + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) +} + +// ReadHandle represents a contract with a caller that had a miss when doing a +// cache lookup, and wants to do a read and insert the read block into the +// cache. The contract applies when ReadHandle.Valid returns true. +// +// Contract: +// +// The caller must call WaitForPermissionOrHandle. If method returns with an +// error, or a valid Handle, the caller is done. In the latter case, someone +// else did the read on behalf of the caller. If neither, the caller has been +// granted a turn to do the read. It must immediately attempt to do a read +// (other readers are potentially waiting for it), and then call SetReadValue +// or SetReadError depending on whether the read succeeded or failed. +type ReadHandle struct { + entry *readEntry +} + +// Valid returns true for a valid ReadHandle. +func (rh ReadHandle) Valid() bool { + return rh.entry != nil +} + +// WaitForReadPermissionOrHandle is called on a valid ReadHandle and returns +// either an already read value (in Handle), an error (if the context was +// cancelled), or neither, which is a directive to the caller to do the read. +// In this last case the caller must call either SetReadValue or SetReadError. +// +// In all cases, errorDuration is populated with the total duration that +// readers that observed an error (SetReadError) spent in doing the read. This +// duration can be greater than the time spend in WaitForReadPermissionHandle, +// since some of these errors could have occurred prior to this call. But it +// serves as a rough indicator of whether turn taking could have caused higher +// latency due to context cancellation. +func (rh ReadHandle) WaitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + return rh.entry.waitForReadPermissionOrHandle(ctx) +} + +// SetReadValue provides the Value that the caller has read. The caller is +// responsible for releasing the returned Handle when it is no longer needed. +func (rh ReadHandle) SetReadValue(v *Value) Handle { + return rh.entry.setReadValue(v) +} + +// SetReadError specifies that the caller has encountered a read error. +func (rh ReadHandle) SetReadError(err error) { + rh.entry.setReadError(err) +} diff --git a/internal/cache/refcnt_normal.go b/internal/cache/refcnt_normal.go index 9ab3348613..94beb232f0 100644 --- a/internal/cache/refcnt_normal.go +++ b/internal/cache/refcnt_normal.go @@ -37,6 +37,15 @@ func (v *refcnt) acquire() { } } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the map. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) +} + func (v *refcnt) release() bool { switch v := v.val.Add(-1); { case v < 0: @@ -48,6 +57,10 @@ func (v *refcnt) release() bool { } } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { } diff --git a/internal/cache/refcnt_tracing.go b/internal/cache/refcnt_tracing.go index 1d5e6c0219..a0329c69a7 100644 --- a/internal/cache/refcnt_tracing.go +++ b/internal/cache/refcnt_tracing.go @@ -51,6 +51,10 @@ func (v *refcnt) release() bool { return n == 0 } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack()) v.Lock() diff --git a/sstable/block/buffer_pool.go b/sstable/block/buffer_pool.go index 38809b41ed..80bf550cd0 100644 --- a/sstable/block/buffer_pool.go +++ b/sstable/block/buffer_pool.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" + "github.com/cockroachdb/pebble/internal/invariants" ) // Alloc allocates a new Value for a block of length n (excluding the block @@ -52,12 +53,22 @@ func (b Value) BlockMetadata() *Metadata { // backed by a buffer pool, MakeHandle inserts the value into the block cache, // returning a handle to the now resident value. func (b Value) MakeHandle( - c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, + crh cache.ReadHandle, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, ) BufferHandle { if b.buf.Valid() { + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle was valid") + } return BufferHandle{b: b.buf} } - return BufferHandle{h: c.Set(cacheID, fileNum, offset, b.v)} + return BufferHandle{h: crh.SetReadValue(b.v)} +} + +func (b Value) SetInCacheAndReleaseForTesting( + c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, +) { + h := c.Set(cacheID, fileNum, offset, b.v) + h.Release() } // Release releases the handle. diff --git a/sstable/colblk/index_block_test.go b/sstable/colblk/index_block_test.go index c7e7d6a3dc..970535455e 100644 --- a/sstable/colblk/index_block_test.go +++ b/sstable/colblk/index_block_test.go @@ -125,7 +125,7 @@ func TestIndexIterInitHandle(t *testing.T) { d := (*IndexBlockDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(blockData) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func(it *IndexIter) { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/colblk/keyspan_test.go b/sstable/colblk/keyspan_test.go index 9592dabcc1..a306e7f192 100644 --- a/sstable/colblk/keyspan_test.go +++ b/sstable/colblk/keyspan_test.go @@ -88,7 +88,7 @@ func TestKeyspanBlockPooling(t *testing.T) { copy(v.BlockData(), b) d := (*KeyspanDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(v.BlockData()) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func() { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/reader.go b/sstable/reader.go index 093b31865c..851ec5d7c5 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -498,28 +498,67 @@ func (r *Reader) readBlockInternal( bh block.Handle, initBlockMetadataFn func(*block.Metadata, []byte) error, ) (handle block.BufferHandle, _ error) { - if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Valid() { + var ch cache.Handle + var crh cache.ReadHandle + if env.BufferPool == nil { + ch, crh = r.cacheOpts.Cache.GetWithReadHandle( + r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset, r.loadBlockSema) + } else { + ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + } + if ch.Valid() { // Cache hit. if readHandle != nil { readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) } env.BlockServedFromCache(bh.Length) - return block.CacheBufferHandle(h), nil + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle must not be valid") + } + return block.CacheBufferHandle(ch), nil } // Cache miss. - - if sema := r.loadBlockSema; sema != nil { - if err := sema.Acquire(ctx, 1); err != nil { - // An error here can only come from the context. + if crh.Valid() { + var err error + var errorDuration time.Duration + ch, errorDuration, err = crh.WaitForReadPermissionOrHandle(ctx) + if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) { + r.logger.Eventf( + ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String()) + } + if err != nil { return block.BufferHandle{}, err } - defer sema.Release(1) + if ch.Valid() { + return block.CacheBufferHandle(ch), nil + } + // TODO(sumeer): consider tracing when waited longer than some duration + // for turn to do the read. + } else { + // The compaction path uses env.BufferPool, and does not coordinate read + // using a cache.ReadHandle. This is ok since only a single compaction is + // reading a block. + if sema := r.loadBlockSema; sema != nil { + if err := sema.Acquire(ctx, 1); err != nil { + // An error here can only come from the context. + return block.BufferHandle{}, err + } + defer sema.Release(1) + } } + // INVARIANT: !ch.Valid(). compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool) readStopwatch := makeStopwatch() var err error + defer func() { + if err != nil { + if crh.Valid() { + crh.SetReadError(err) + } + } + }() if readHandle != nil { err = readHandle.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset)) } else { @@ -542,7 +581,7 @@ func (r *Reader) readBlockInternal( return block.BufferHandle{}, err } env.BlockRead(bh.Length, readDuration) - if err := checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { + if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { compressed.Release() return block.BufferHandle{}, err } @@ -555,7 +594,8 @@ func (r *Reader) readBlockInternal( decompressed = compressed } else { // Decode the length of the decompressed value. - decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData()) + var decodedLen, prefixLen int + decodedLen, prefixLen, err = block.DecompressedLen(typ, compressed.BlockData()) if err != nil { compressed.Release() return block.BufferHandle{}, err @@ -569,11 +609,11 @@ func (r *Reader) readBlockInternal( return block.BufferHandle{}, err } } - if err := initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { + if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { decompressed.Release() return block.BufferHandle{}, err } - h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + h := decompressed.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) return h, nil }