Skip to content

Commit

Permalink
cache,db: de-dup concurrent attempts to read the same block
Browse files Browse the repository at this point in the history
Concurrent reads of the same block have been observed to cause very high
memory usage, and cause significant CPU usage for allocations/deallocations.
We now coordinate across multiple concurrent attempts to read the same
block via a readEntry, which makes the readers take turns until one
succeeds.

The readEntries are embedded in a map that is part of a readShard, where
there is a readShard for each cache.Shard. See the long comment in the
readShard declaration for motivation.

Callers interact with this new behavior via Cache.GetWithReadHandle,
which is only for callers that intend to do a read and then populate
the cache. If this method returns a ReadHandle, the caller has permission
to do a read. See the ReadHandle comment for details of the contract.

Fixes #4138
  • Loading branch information
sumeerbhola committed Dec 10, 2024
1 parent ab9741a commit 4f7bb5f
Show file tree
Hide file tree
Showing 10 changed files with 1,043 additions and 38 deletions.
109 changes: 95 additions & 14 deletions internal/cache/clockpro.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package cache // import "github.com/cockroachdb/pebble/internal/cache"

import (
"context"
"fmt"
"os"
"runtime"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
Expand Down Expand Up @@ -114,9 +116,31 @@ type shard struct {
countHot int64
countCold int64
countTest int64

// Some fields in readShard are protected by mu. See comments in declaration
// of readShard.
readShard readShard
}

func (c *shard) init(maxSize int64) {
*c = shard{
maxSize: maxSize,
coldTarget: maxSize,
}
if entriesGoAllocated {
c.entries = make(map[*entry]struct{})
}
c.blocks.Init(16)
c.files.Init(16)
c.readShard.Init(c)
}

func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
// getWithMaybeReadEntry is the internal helper for implementing
// Cache.{Get,GetWithReadHandle}. When desireReadEntry is true, and the block
// is not in the cache (!Handle.Valid()), a non-nil readEntry is returned.
func (c *shard) getWithMaybeReadEntry(
id ID, fileNum base.DiskFileNum, offset uint64, desireReadEntry bool,
) (Handle, *readEntry) {
c.mu.RLock()
var value *Value
if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
Expand All @@ -126,12 +150,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
}
}
c.mu.RUnlock()
var re *readEntry
if value == nil && desireReadEntry {
c.mu.Lock()
// After the c.mu.RUnlock(), someone could have inserted the value in the
// cache. We could tolerate the race and do a file read, or do another map
// lookup. We choose to do the latter, since the cost of a map lookup is
// insignificant compared to the cost of reading a block from a file.
if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
value = e.acquireValue()
if value != nil {
e.referenced.Store(true)
}
}
if value == nil {
re = c.readShard.getReadEntryLocked(id, fileNum, offset)
}
c.mu.Unlock()
}
if value == nil {
c.misses.Add(1)
return Handle{}
} else {
c.hits.Add(1)
}
c.hits.Add(1)
return Handle{value: value}
return Handle{value: value}, re
}

func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle {
Expand Down Expand Up @@ -170,6 +212,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value
value.ref.trace("add-hot")
c.sizeHot += delta
} else {
// TODO(sumeer): unclear why we don't set e.ptype to etHot on this path.
// In the default case below, where the state is etTest we set it to
// etHot. But etTest is "colder" than etCold, since the only transition
// into etTest is etCold => etTest, so since etTest transitions to
// etHot, then etCold should also transition.
value.ref.trace("add-cold")
c.sizeCold += delta
}
Expand Down Expand Up @@ -737,15 +784,7 @@ func newShards(size int64, shards int) *Cache {
c.idAlloc.Store(1)
c.trace("alloc", c.refs.Load())
for i := range c.shards {
c.shards[i] = shard{
maxSize: size / int64(len(c.shards)),
coldTarget: size / int64(len(c.shards)),
}
if entriesGoAllocated {
c.shards[i].entries = make(map[*entry]struct{})
}
c.shards[i].blocks.Init(16)
c.shards[i].files.Init(16)
c.shards[i].init(size / int64(len(c.shards)))
}

// Note: this is a no-op if invariants are disabled or race is enabled.
Expand Down Expand Up @@ -822,7 +861,49 @@ func (c *Cache) Unref() {
// Get retrieves the cache value for the specified file and offset, returning
// nil if no value is present.
func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
return c.getShard(id, fileNum, offset).Get(id, fileNum, offset)
h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry(
id, fileNum, offset, false)
if invariants.Enabled && re != nil {
panic("readEntry should be nil")
}
return h
}

// GetWithReadHandle retrieves the cache value for the specified ID, fileNum
// and offset. If found, a valid Handle is returned (with cacheHit set to
// true), else a valid ReadHandle is returned.
//
// See the ReadHandle declaration for the contract the caller must satisfy
// when getting a valid ReadHandle.
//
// This method can block before returning since multiple concurrent gets for
// the same cache value will take turns getting a ReadHandle, which represents
// permission to do the read. This blocking respects context cancellation, in
// which case an error is returned (and not a valid ReadHandle).
//
// When blocking, the errorDuration return value can be non-zero and is
// populated with the total duration that other readers that observed an error
// (see ReadHandle.SetReadError) spent in doing the read. This duration can be
// greater than the time spent blocked in this method, since some of these
// errors could have occurred prior to this call. But it serves as a rough
// indicator of whether turn taking could have caused higher latency due to
// context cancellation of other readers.
//
// While waiting, someone else may successfully read the value, which results
// in a valid Handle being returned. This is a case where cacheHit=false.
func (c *Cache) GetWithReadHandle(
ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64,
) (h Handle, rh ReadHandle, errorDuration time.Duration, cacheHit bool, err error) {
h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry(
id, fileNum, offset, true)
if h.Valid() {
return h, ReadHandle{}, 0, true, nil
}
h, errorDuration, err = re.waitForReadPermissionOrHandle(ctx)
if err != nil || h.Valid() {
return h, ReadHandle{}, errorDuration, false, err
}
return Handle{}, ReadHandle{entry: re}, errorDuration, false, nil
}

// Set sets the cache value for the specified file and offset, overwriting an
Expand Down
Loading

0 comments on commit 4f7bb5f

Please sign in to comment.