From 4a53cdf5df3aaf654bd6318c1392308b28b7354b Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Mon, 11 Nov 2024 08:52:35 -0800 Subject: [PATCH] table cache: use a long-lived ReaderProvider An `sstable.ReaderProvider` is necessary whenever an sstable has value blocks. Currently, we allocate one every time we create a point iterator. This commit makes the `tableCacheShardReaderProvider` usable in parallel and maintains a long-lived instance in the `tableCacheValue`. --- table_cache.go | 105 +++++++++++++++++++++++++++++++---------------- testdata/ingest | 2 +- testdata/metrics | 18 ++++---- 3 files changed, 80 insertions(+), 45 deletions(-) diff --git a/table_cache.go b/table_cache.go index 848b202361..fd8f8ce6e5 100644 --- a/table_cache.go +++ b/table_cache.go @@ -557,10 +557,6 @@ func (c *tableCacheShard) newPointIter( if err != nil { return nil, err } - var rp sstable.ReaderProvider - if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 { - rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts} - } if v.isShared && file.SyntheticSeqNum() != 0 { if tableFormat < sstable.TableFormatPebblev4 { @@ -577,11 +573,11 @@ func (c *tableCacheShard) newPointIter( uint64(uintptr(unsafe.Pointer(v.reader))), opts.Category) } if internalOpts.compaction { - iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, rp, internalOpts.bufferPool) + iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool) } else { iter, err = cr.NewPointIter( ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit, - internalOpts.stats, iterStatsAccum, rp) + internalOpts.stats, iterStatsAccum, &v.readerProvider) } if err != nil { return nil, err @@ -652,15 +648,36 @@ func (c *tableCacheShard) newRangeKeyIter( return cr.NewRawRangeKeyIter(ctx, transforms) } +// tableCacheShardReaderProvider implements sstable.ReaderProvider for a +// specific table. type tableCacheShardReaderProvider struct { - c *tableCacheShard - file *manifest.FileMetadata - dbOpts *tableCacheOpts - v *tableCacheValue + c *tableCacheShard + dbOpts *tableCacheOpts + backingFileNum base.DiskFileNum + + mu struct { + sync.Mutex + // v is the result of findNode. Whenever it is not null, we hold a refcount + // on the tableCacheValue. + v *tableCacheValue + // refCount is the number of GetReader() calls that have not received a + // corresponding Close(). + refCount int + } } var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{} +func (rp *tableCacheShardReaderProvider) init( + c *tableCacheShard, dbOpts *tableCacheOpts, backingFileNum base.DiskFileNum, +) { + rp.c = c + rp.dbOpts = dbOpts + rp.backingFileNum = backingFileNum + rp.mu.v = nil + rp.mu.refCount = 0 +} + // GetReader implements sstable.ReaderProvider. Note that it is not the // responsibility of tableCacheShardReaderProvider to ensure that the file // continues to exist. The ReaderProvider is used in iterators where the @@ -676,21 +693,38 @@ var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{} // TODO(bananabrick): We could return a wrapper over the Reader to ensure // that the reader isn't used for other purposes. func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.mu.v != nil { + rp.mu.refCount++ + return rp.mu.v.reader, nil + } + // Calling findNode gives us the responsibility of decrementing v's // refCount. - v := rp.c.findNode(ctx, rp.file.FileBacking, rp.dbOpts) + v := rp.c.findNodeInternal(ctx, rp.backingFileNum, rp.dbOpts) if v.err != nil { defer rp.c.unrefValue(v) return nil, v.err } - rp.v = v + rp.mu.v = v + rp.mu.refCount = 1 return v.reader, nil } // Close implements sstable.ReaderProvider. func (rp *tableCacheShardReaderProvider) Close() { - rp.c.unrefValue(rp.v) - rp.v = nil + rp.mu.Lock() + defer rp.mu.Unlock() + rp.mu.refCount-- + if rp.mu.refCount <= 0 { + if rp.mu.refCount < 0 { + panic("pebble: sstable.ReaderProvider misuse") + } + rp.c.unrefValue(rp.mu.v) + rp.mu.v = nil + } } // getTableProperties return sst table properties for target file @@ -782,20 +816,18 @@ func (c *tableCacheShard) findNode( // Caution! Here fileMetadata can be a physical or virtual table. Table cache // readers are associated with the physical backings. All virtual tables with // the same backing will use the same reader from the cache; so no information - // that can differ among these virtual tables can be plumbed into loadInfo. - info := loadInfo{ - backingFileNum: b.DiskFileNum, - } + // that can differ among these virtual tables can be passed to findNodeInternal. + backingFileNum := b.DiskFileNum - return c.findNodeInternal(ctx, info, dbOpts) + return c.findNodeInternal(ctx, backingFileNum, dbOpts) } func (c *tableCacheShard) findNodeInternal( - ctx context.Context, loadInfo loadInfo, dbOpts *tableCacheOpts, + ctx context.Context, backingFileNum base.DiskFileNum, dbOpts *tableCacheOpts, ) *tableCacheValue { // Fast-path for a hit in the cache. c.mu.RLock() - key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum} + key := tableCacheKey{dbOpts.cacheID, backingFileNum} if n := c.mu.nodes[key]; n != nil && n.value != nil { // Fast-path hit. // @@ -817,7 +849,7 @@ func (c *tableCacheShard) findNodeInternal( case n == nil: // Slow-path miss of a non-existent node. n = &tableCacheNode{ - fileNum: loadInfo.backingFileNum, + fileNum: backingFileNum, ptype: tableCacheNodeCold, } c.addNode(n, dbOpts) @@ -854,6 +886,7 @@ func (c *tableCacheShard) findNodeInternal( v := &tableCacheValue{ loaded: make(chan struct{}), } + v.readerProvider.init(c, dbOpts, backingFileNum) v.refCount.Store(2) // Cache the closure invoked when an iterator is closed. This avoids an // allocation on every call to newIters. @@ -875,7 +908,7 @@ func (c *tableCacheShard) findNodeInternal( // Note adding to the cache lists must complete before we begin loading the // table as a failure during load will result in the node being unlinked. pprof.Do(context.Background(), tableCacheLabels, func(context.Context) { - v.load(ctx, loadInfo, c, dbOpts) + v.load(ctx, backingFileNum, c, dbOpts) }) return v } @@ -1094,51 +1127,53 @@ type tableCacheValue struct { closeHook func(i sstable.Iterator) error reader *sstable.Reader err error - isShared bool loaded chan struct{} // Reference count for the value. The reader is closed when the reference // count drops to zero. refCount atomic.Int32 -} - -// loadInfo contains the information needed to populate a new cache entry. -type loadInfo struct { - backingFileNum base.DiskFileNum + isShared bool + + // readerProvider is embedded here so that we only allocate it once as long as + // the table stays in the cache. Its state is not always logically tied to + // this specific tableCacheValue - if a table goes out of the cache and then + // comes back in, the readerProvider in a now-defunct tableCacheValue can + // still be used and will internally refer to the new tableCacheValue. + readerProvider tableCacheShardReaderProvider } func (v *tableCacheValue) load( - ctx context.Context, loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts, + ctx context.Context, backingFileNum base.DiskFileNum, c *tableCacheShard, dbOpts *tableCacheOpts, ) { // Try opening the file first. var f objstorage.Readable var err error f, err = dbOpts.objProvider.OpenForReading( - ctx, fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true}, + ctx, fileTypeTable, backingFileNum, objstorage.OpenOptions{MustExist: true}, ) if err == nil { o := dbOpts.readerOpts o.SetInternalCacheOpts(sstableinternal.CacheOptions{ Cache: dbOpts.cache, CacheID: dbOpts.cacheID, - FileNum: loadInfo.backingFileNum, + FileNum: backingFileNum, }) v.reader, err = sstable.NewReader(ctx, f, o) } if err == nil { var objMeta objstorage.ObjectMetadata - objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum) + objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, backingFileNum) v.isShared = objMeta.IsShared() } if err != nil { v.err = errors.Wrapf( - err, "pebble: backing file %s error", loadInfo.backingFileNum) + err, "pebble: backing file %s error", backingFileNum) } if v.err != nil { c.mu.Lock() defer c.mu.Unlock() // Lookup the node in the cache again as it might have already been // removed. - key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum} + key := tableCacheKey{dbOpts.cacheID, backingFileNum} n := c.mu.nodes[key] if n != nil && n.value == v { c.releaseNode(n) diff --git a/testdata/ingest b/testdata/ingest index b902079149..65b0e536dd 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -54,7 +54,7 @@ Virtual tables: 0 (0B) Local tables size: 569B Compression types: snappy: 1 Block cache: 3 entries (1.1KB) hit rate: 18.2% -Table cache: 1 entries (864B) hit rate: 50.0% +Table cache: 1 entries (904B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 diff --git a/testdata/metrics b/testdata/metrics index 37c32771b7..13c3747959 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -76,7 +76,7 @@ Virtual tables: 0 (0B) Local tables size: 589B Compression types: snappy: 1 Block cache: 2 entries (716B) hit rate: 0.0% -Table cache: 1 entries (864B) hit rate: 0.0% +Table cache: 1 entries (904B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -136,7 +136,7 @@ Virtual tables: 0 (0B) Local tables size: 595B Compression types: snappy: 1 Block cache: 2 entries (716B) hit rate: 33.3% -Table cache: 2 entries (1.7KB) hit rate: 66.7% +Table cache: 2 entries (1.8KB) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 2 @@ -183,7 +183,7 @@ Virtual tables: 0 (0B) Local tables size: 595B Compression types: snappy: 1 Block cache: 2 entries (716B) hit rate: 33.3% -Table cache: 2 entries (1.7KB) hit rate: 66.7% +Table cache: 2 entries (1.8KB) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 2 @@ -227,7 +227,7 @@ Virtual tables: 0 (0B) Local tables size: 595B Compression types: snappy: 1 Block cache: 2 entries (716B) hit rate: 33.3% -Table cache: 1 entries (864B) hit rate: 66.7% +Table cache: 1 entries (904B) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -510,7 +510,7 @@ Virtual tables: 0 (0B) Local tables size: 4.3KB Compression types: snappy: 7 Block cache: 8 entries (2.8KB) hit rate: 9.1% -Table cache: 1 entries (864B) hit rate: 53.8% +Table cache: 1 entries (904B) hit rate: 53.8% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -575,7 +575,7 @@ Virtual tables: 0 (0B) Local tables size: 6.1KB Compression types: snappy: 10 Block cache: 8 entries (2.8KB) hit rate: 9.1% -Table cache: 1 entries (864B) hit rate: 53.8% +Table cache: 1 entries (904B) hit rate: 53.8% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -853,7 +853,7 @@ Virtual tables: 0 (0B) Local tables size: 0B Compression types: snappy: 1 Block cache: 0 entries (0B) hit rate: 0.0% -Table cache: 1 entries (864B) hit rate: 0.0% +Table cache: 1 entries (904B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -901,7 +901,7 @@ Virtual tables: 0 (0B) Local tables size: 0B Compression types: snappy: 2 Block cache: 4 entries (1.4KB) hit rate: 0.0% -Table cache: 1 entries (864B) hit rate: 50.0% +Table cache: 1 entries (904B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -950,7 +950,7 @@ Virtual tables: 0 (0B) Local tables size: 589B Compression types: snappy: 3 Block cache: 4 entries (1.4KB) hit rate: 0.0% -Table cache: 1 entries (864B) hit rate: 50.0% +Table cache: 1 entries (904B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0