Skip to content

Commit

Permalink
cache,db: de-dup concurrent attempts to read the same block
Browse files Browse the repository at this point in the history
Concurrent reads of the same block have been observed to cause very high
memory usage, and are wasteful of disk bandwidth. We now coordinate
across multiple concurrent attempts to read the same block via a
readEntry, which makes the readers take turns until one succeeds.

The readEntries are embedded in a map that is part of a readShard, where
there is a readShard for each cache.Shard. See the long comment in the
readShard declaration for motivation. The Options.LoadBlockSema is
integrated into the readEntry, to simplify the waiting logic in the
caller.

Callers interact with this new behavior via Cache.GetWithReadHandle,
which is only for callers that intend to do a read and then populate
the cache. If this method returns a ReadHandle, the caller must
first wait for permission to do a read. See the ReadHandle comment for
details of the contract.

Fixes #4138
  • Loading branch information
sumeerbhola committed Nov 11, 2024
1 parent 9f68a21 commit 0be60e5
Show file tree
Hide file tree
Showing 8 changed files with 599 additions and 18 deletions.
67 changes: 61 additions & 6 deletions internal/cache/clockpro.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"sync/atomic"

"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
)
Expand Down Expand Up @@ -114,9 +115,22 @@ type shard struct {
countHot int64
countCold int64
countTest int64
}

func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
// Some fields in readShard are protected by mu. See comments in declaration
// of readShard.
readShard readShard
}

// GetWithMaybeReadHandle is the internal helper for implementing
// Cache.{Get,GetWithReadHandle}. loadBlockSema can be nil, and a non-nil
// value is only relevant when desireReadHandle is true.
func (c *shard) GetWithMaybeReadHandle(
id ID,
fileNum base.DiskFileNum,
offset uint64,
desireReadHandle bool,
loadBlockSema *fifo.Semaphore,
) (Handle, ReadHandle) {
c.mu.RLock()
var value *Value
if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
Expand All @@ -126,12 +140,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
}
}
c.mu.RUnlock()
var rh ReadHandle
if value == nil && desireReadHandle {
c.mu.Lock()
// After the c.mu.RUnlock(), someone could have inserted the value in the
// cache. We could tolerate the race and do a file read, or do another map
// lookup. We choose to do the latter, since the cost of a map lookup is
// insignificant compared to the cost of reading a block from a file.
if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
value = e.acquireValue()
if value != nil {
e.referenced.Store(true)
}
}
if value == nil {
rh = c.readShard.getReadHandleLocked(id, fileNum, offset, loadBlockSema)
}
c.mu.Unlock()
}
if value == nil {
c.misses.Add(1)
return Handle{}
} else {
c.hits.Add(1)
}
c.hits.Add(1)
return Handle{value: value}
return Handle{value: value}, rh
}

func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle {
Expand Down Expand Up @@ -170,6 +202,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value
value.ref.trace("add-hot")
c.sizeHot += delta
} else {
// TODO(sumeer): unclear why we don't set e.ptype to etHot on this path.
// In the default case below, where the state is etTest we set it to
// etHot. But etTest is "colder" than etCold, since the only transition
// into etTest is etCold => etTest, so since etTest transitions to
// etHot, then etCold should also transition.
value.ref.trace("add-cold")
c.sizeCold += delta
}
Expand Down Expand Up @@ -746,6 +783,7 @@ func newShards(size int64, shards int) *Cache {
}
c.shards[i].blocks.Init(16)
c.shards[i].files.Init(16)
c.shards[i].readShard.Init(&c.shards[i])
}

// Note: this is a no-op if invariants are disabled or race is enabled.
Expand Down Expand Up @@ -822,7 +860,24 @@ func (c *Cache) Unref() {
// Get retrieves the cache value for the specified file and offset, returning
// nil if no value is present.
func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
return c.getShard(id, fileNum, offset).Get(id, fileNum, offset)
h, rh := c.getShard(id, fileNum, offset).GetWithMaybeReadHandle(
id, fileNum, offset, false, nil)
if invariants.Enabled && rh.Valid() {
panic("ReadHandle should not be valid")
}
return h
}

// GetWithReadHandle retrieves the cache value for the specified ID, fileNum
// and offset. If found, a valid Handle is returned, else a valid ReadHandle
// is returned. See the ReadHandle declaration for the contract must satisfy
// when getting a valid ReadHandle. The loadBlockSema if non-nil, is used by
// the ReadHandle -- see Options.LoadBlockSema for details.
func (c *Cache) GetWithReadHandle(
id ID, fileNum base.DiskFileNum, offset uint64, loadBlockSema *fifo.Semaphore,
) (Handle, ReadHandle) {
return c.getShard(id, fileNum, offset).GetWithMaybeReadHandle(
id, fileNum, offset, true, loadBlockSema)
}

// Set sets the cache value for the specified file and offset, overwriting an
Expand Down
Loading

0 comments on commit 0be60e5

Please sign in to comment.