Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache,db: de-dup concurrent attempts to read the same block #4157

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 95 additions & 14 deletions internal/cache/clockpro.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package cache // import "github.com/cockroachdb/pebble/internal/cache"

import (
"context"
"fmt"
"os"
"runtime"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
Expand Down Expand Up @@ -114,9 +116,31 @@ type shard struct {
countHot int64
countCold int64
countTest int64

// Some fields in readShard are protected by mu. See comments in declaration
// of readShard.
readShard readShard
}

func (c *shard) init(maxSize int64) {
*c = shard{
maxSize: maxSize,
coldTarget: maxSize,
}
if entriesGoAllocated {
c.entries = make(map[*entry]struct{})
}
c.blocks.Init(16)
c.files.Init(16)
c.readShard.Init(c)
}

func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
// getWithMaybeReadEntry is the internal helper for implementing
// Cache.{Get,GetWithReadHandle}. When desireReadEntry is true, and the block
// is not in the cache (!Handle.Valid()), a non-nil readEntry is returned.
func (c *shard) getWithMaybeReadEntry(
id ID, fileNum base.DiskFileNum, offset uint64, desireReadEntry bool,
) (Handle, *readEntry) {
c.mu.RLock()
var value *Value
if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
Expand All @@ -126,12 +150,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
}
}
c.mu.RUnlock()
var re *readEntry
if value == nil && desireReadEntry {
c.mu.Lock()
// After the c.mu.RUnlock(), someone could have inserted the value in the
// cache. We could tolerate the race and do a file read, or do another map
// lookup. We choose to do the latter, since the cost of a map lookup is
// insignificant compared to the cost of reading a block from a file.
if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
value = e.acquireValue()
if value != nil {
e.referenced.Store(true)
}
}
if value == nil {
re = c.readShard.getReadEntryLocked(id, fileNum, offset)
}
c.mu.Unlock()
}
if value == nil {
c.misses.Add(1)
return Handle{}
} else {
c.hits.Add(1)
}
c.hits.Add(1)
return Handle{value: value}
return Handle{value: value}, re
}

func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle {
Expand Down Expand Up @@ -170,6 +212,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value
value.ref.trace("add-hot")
c.sizeHot += delta
} else {
// TODO(sumeer): unclear why we don't set e.ptype to etHot on this path.
// In the default case below, where the state is etTest we set it to
// etHot. But etTest is "colder" than etCold, since the only transition
// into etTest is etCold => etTest, so since etTest transitions to
// etHot, then etCold should also transition.
value.ref.trace("add-cold")
c.sizeCold += delta
}
Expand Down Expand Up @@ -737,15 +784,7 @@ func newShards(size int64, shards int) *Cache {
c.idAlloc.Store(1)
c.trace("alloc", c.refs.Load())
for i := range c.shards {
c.shards[i] = shard{
maxSize: size / int64(len(c.shards)),
coldTarget: size / int64(len(c.shards)),
}
if entriesGoAllocated {
c.shards[i].entries = make(map[*entry]struct{})
}
c.shards[i].blocks.Init(16)
c.shards[i].files.Init(16)
c.shards[i].init(size / int64(len(c.shards)))
}

// Note: this is a no-op if invariants are disabled or race is enabled.
Expand Down Expand Up @@ -822,7 +861,49 @@ func (c *Cache) Unref() {
// Get retrieves the cache value for the specified file and offset, returning
// nil if no value is present.
func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
return c.getShard(id, fileNum, offset).Get(id, fileNum, offset)
h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry(
id, fileNum, offset, false)
if invariants.Enabled && re != nil {
panic("readEntry should be nil")
}
return h
}

// GetWithReadHandle retrieves the cache value for the specified ID, fileNum
// and offset. If found, a valid Handle is returned (with cacheHit set to
// true), else a valid ReadHandle is returned.
//
// See the ReadHandle declaration for the contract the caller must satisfy
// when getting a valid ReadHandle.
//
// This method can block before returning since multiple concurrent gets for
// the same cache value will take turns getting a ReadHandle, which represents
// permission to do the read. This blocking respects context cancellation, in
// which case an error is returned (and not a valid ReadHandle).
//
// When blocking, the errorDuration return value can be non-zero and is
// populated with the total duration that other readers that observed an error
// (see ReadHandle.SetReadError) spent in doing the read. This duration can be
// greater than the time spent blocked in this method, since some of these
// errors could have occurred prior to this call. But it serves as a rough
// indicator of whether turn taking could have caused higher latency due to
// context cancellation of other readers.
//
// While waiting, someone else may successfully read the value, which results
// in a valid Handle being returned. This is a case where cacheHit=false.
func (c *Cache) GetWithReadHandle(
ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64,
) (h Handle, rh ReadHandle, errorDuration time.Duration, cacheHit bool, err error) {
h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry(
id, fileNum, offset, true)
if h.Valid() {
return h, ReadHandle{}, 0, true, nil
}
h, errorDuration, err = re.waitForReadPermissionOrHandle(ctx)
if err != nil || h.Valid() {
return h, ReadHandle{}, errorDuration, false, err
}
return Handle{}, ReadHandle{entry: re}, errorDuration, false, nil
}

// Set sets the cache value for the specified file and offset, overwriting an
Expand Down
Loading
Loading