From 9b0824f2cdaa819c3526e12917010bb765b94af5 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 28 May 2024 18:44:14 -0700 Subject: [PATCH] sstable: add LoadBlockSema option Add option to use a semaphore to limit the number of block loads in parallel. --- db_test.go | 104 +++++++++++++++++++++++++++++++++++++++++++++ go.mod | 5 ++- go.sum | 10 +++-- options.go | 7 +++ sstable/options.go | 6 +++ sstable/reader.go | 9 ++++ testdata/ingest | 2 +- testdata/metrics | 14 +++--- 8 files changed, 143 insertions(+), 14 deletions(-) diff --git a/db_test.go b/db_test.go index ecb0230501..79dd337ac8 100644 --- a/db_test.go +++ b/db_test.go @@ -10,15 +10,18 @@ import ( "fmt" "io" "path/filepath" + "runtime" "slices" "strconv" "strings" "sync" + "sync/atomic" "testing" "time" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/cockroachdb/fifo" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" @@ -2340,3 +2343,104 @@ func parseOrderingTokens(tokens []string) (orderingNode, int) { return leaf(v), 1 } } + +type readTrackFS struct { + vfs.FS + + currReadCount atomic.Int32 + maxReadCount atomic.Int32 +} + +type readTrackFile struct { + vfs.File + fs *readTrackFS +} + +func (fs *readTrackFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) { + file, err := fs.FS.Open(name, opts...) + if err != nil || !strings.HasSuffix(name, ".sst") { + return file, err + } + return &readTrackFile{ + File: file, + fs: fs, + }, nil +} + +func (f *readTrackFile) ReadAt(p []byte, off int64) (n int, err error) { + val := f.fs.currReadCount.Add(1) + defer f.fs.currReadCount.Add(-1) + for maxVal := f.fs.maxReadCount.Load(); val > maxVal; maxVal = f.fs.maxReadCount.Load() { + if f.fs.maxReadCount.CompareAndSwap(maxVal, val) { + break + } + } + return f.File.ReadAt(p, off) +} + +func TestLoadBlockSema(t *testing.T) { + fs := &readTrackFS{FS: vfs.NewMem()} + sema := fifo.NewSemaphore(100) + db, err := Open("", testingRandomized(t, &Options{ + Cache: cache.New(1), + FS: fs, + LoadBlockSema: sema, + })) + require.NoError(t, err) + + key := func(i, j int) []byte { + return []byte(fmt.Sprintf("%02d/%02d", i, j)) + } + + // Create 20 regions and compact them separately, so we end up with 20 + // disjoint tables. + const numRegions = 20 + const numKeys = 20 + for i := 0; i < numRegions; i++ { + for j := 0; j < numKeys; j++ { + require.NoError(t, db.Set(key(i, j), []byte("value"), nil)) + } + require.NoError(t, db.Compact(key(i, 0), key(i, numKeys-1), false)) + } + + // Read all regions to warm up the table cache. + for i := 0; i < numRegions; i++ { + val, closer, err := db.Get(key(i, 1)) + require.NoError(t, err) + require.Equal(t, []byte("value"), val) + if closer != nil { + closer.Close() + } + } + + for _, n := range []int64{1, 2, 4} { + t.Run(fmt.Sprintf("%d", n), func(t *testing.T) { + sema.UpdateCapacity(n) + fs.maxReadCount.Store(0) + var wg sync.WaitGroup + // Spin up workers that perform random reads. + const numWorkers = 20 + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + const numQueries = 100 + for i := 0; i < numQueries; i++ { + val, closer, err := db.Get(key(rand.Intn(numRegions), rand.Intn(numKeys))) + require.NoError(t, err) + require.Equal(t, []byte("value"), val) + if closer != nil { + closer.Close() + } + runtime.Gosched() + } + }() + } + wg.Wait() + // Verify the maximum read count did not exceed the limit. + maxReadCount := fs.maxReadCount.Load() + require.Greater(t, maxReadCount, int32(0)) + require.LessOrEqual(t, maxReadCount, int32(n)) + }) + } +} diff --git a/go.mod b/go.mod index 8c4ec8f690..579a4f0d87 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 github.com/cockroachdb/errors v1.11.1 + github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 github.com/cockroachdb/redact v1.1.5 github.com/cockroachdb/swiss v0.0.0-20240605133600-232b93a2b829 @@ -20,10 +21,10 @@ require ( github.com/prometheus/client_golang v1.12.0 github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a github.com/spf13/cobra v1.0.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df golang.org/x/perf v0.0.0-20230113213139-801c7ef9e5c5 - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 + golang.org/x/sync v0.7.0 golang.org/x/sys v0.17.0 ) diff --git a/go.sum b/go.sum index 9f8d4858d7..541e76e30a 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 h1:slXych github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.11.1 h1:xSEW75zKaKCWzR3OfxXUxgrk/NtT4G1MiOv5lWZazG8= github.com/cockroachdb/errors v1.11.1/go.mod h1:8MUxA3Gi6b25tYlFEBGLf+D8aISL+M4MIpiWMSNRfxw= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= @@ -323,8 +325,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= @@ -444,8 +446,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/options.go b/options.go index 6fbb7d1b69..77849003d6 100644 --- a/options.go +++ b/options.go @@ -15,6 +15,7 @@ import ( "unicode" "github.com/cockroachdb/errors" + "github.com/cockroachdb/fifo" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/humanize" @@ -483,6 +484,11 @@ type Options struct { // The default cache size is 8 MB. Cache *cache.Cache + // LoadBlockSema, if set, is used to limit the number of blocks that can be + // loaded (i.e. read from the filesystem) in parallel. Each load acquires one + // unit from the semaphore for the duration of the read. + LoadBlockSema *fifo.Semaphore + // Cleaner cleans obsolete files. // // The default cleaner uses the DeleteCleaner. @@ -1934,6 +1940,7 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions { var readerOpts sstable.ReaderOptions if o != nil { readerOpts.Cache = o.Cache + readerOpts.LoadBlockSema = o.LoadBlockSema readerOpts.Comparer = o.Comparer readerOpts.Filters = o.Filters if o.Merger != nil { diff --git a/sstable/options.go b/sstable/options.go index f94ec995c1..9801373970 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -5,6 +5,7 @@ package sstable import ( + "github.com/cockroachdb/fifo" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" ) @@ -85,6 +86,11 @@ type ReaderOptions struct { // The default cache size is a zero-size cache. Cache *cache.Cache + // LoadBlockSema, if set, is used to limit the number of blocks that can be + // loaded (i.e. read from the filesystem) in parallel. Each load acquires one + // unit from the semaphore for the duration of the read. + LoadBlockSema *fifo.Semaphore + // User properties specified in this map will not be added to sst.Properties.UserProperties. DeniedUserProperties map[string]struct{} diff --git a/sstable/reader.go b/sstable/reader.go index d1e047c4f5..f9d8d46676 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -629,6 +629,15 @@ func (r *Reader) readBlock( } // Cache miss. + + if sema := r.opts.LoadBlockSema; sema != nil { + if err := sema.Acquire(ctx, 1); err != nil { + // An error here can only come from the context. + return bufferHandle{}, err + } + defer sema.Release(1) + } + var compressed cacheValueOrBuf if bufferPool != nil { compressed = cacheValueOrBuf{ diff --git a/testdata/ingest b/testdata/ingest index 6675dee09d..165fe26c4e 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -54,7 +54,7 @@ Virtual tables: 0 (0B) Local tables size: 569B Compression types: snappy: 1 Block cache: 6 entries (945B) hit rate: 30.8% -Table cache: 1 entries (760B) hit rate: 50.0% +Table cache: 1 entries (768B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 diff --git a/testdata/metrics b/testdata/metrics index 6776db1cd8..ebd7787aaa 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -75,7 +75,7 @@ Virtual tables: 0 (0B) Local tables size: 589B Compression types: snappy: 1 Block cache: 3 entries (484B) hit rate: 0.0% -Table cache: 1 entries (760B) hit rate: 0.0% +Table cache: 1 entries (768B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -214,7 +214,7 @@ Virtual tables: 0 (0B) Local tables size: 595B Compression types: snappy: 1 Block cache: 3 entries (484B) hit rate: 33.3% -Table cache: 1 entries (760B) hit rate: 66.7% +Table cache: 1 entries (768B) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -488,7 +488,7 @@ Virtual tables: 0 (0B) Local tables size: 4.3KB Compression types: snappy: 7 Block cache: 12 entries (1.9KB) hit rate: 9.1% -Table cache: 1 entries (760B) hit rate: 53.8% +Table cache: 1 entries (768B) hit rate: 53.8% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -551,7 +551,7 @@ Virtual tables: 0 (0B) Local tables size: 6.1KB Compression types: snappy: 10 Block cache: 12 entries (1.9KB) hit rate: 9.1% -Table cache: 1 entries (760B) hit rate: 53.8% +Table cache: 1 entries (768B) hit rate: 53.8% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -822,7 +822,7 @@ Virtual tables: 0 (0B) Local tables size: 0B Compression types: snappy: 1 Block cache: 1 entries (440B) hit rate: 0.0% -Table cache: 1 entries (760B) hit rate: 0.0% +Table cache: 1 entries (768B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -869,7 +869,7 @@ Virtual tables: 0 (0B) Local tables size: 0B Compression types: snappy: 2 Block cache: 6 entries (996B) hit rate: 0.0% -Table cache: 1 entries (760B) hit rate: 50.0% +Table cache: 1 entries (768B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -917,7 +917,7 @@ Virtual tables: 0 (0B) Local tables size: 589B Compression types: snappy: 3 Block cache: 6 entries (996B) hit rate: 0.0% -Table cache: 1 entries (760B) hit rate: 50.0% +Table cache: 1 entries (768B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0