Skip to content

Commit

Permalink
sstable: add LoadBlockSema option
Browse files Browse the repository at this point in the history
Add option to use a semaphore to limit the number of block loads in
parallel.
  • Loading branch information
RaduBerinde committed Jun 10, 2024
1 parent 47da75f commit 9b0824f
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 14 deletions.
104 changes: 104 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
"fmt"
"io"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
Expand Down Expand Up @@ -2340,3 +2343,104 @@ func parseOrderingTokens(tokens []string) (orderingNode, int) {
return leaf(v), 1
}
}

type readTrackFS struct {
vfs.FS

currReadCount atomic.Int32
maxReadCount atomic.Int32
}

type readTrackFile struct {
vfs.File
fs *readTrackFS
}

func (fs *readTrackFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) {
file, err := fs.FS.Open(name, opts...)
if err != nil || !strings.HasSuffix(name, ".sst") {
return file, err
}
return &readTrackFile{
File: file,
fs: fs,
}, nil
}

func (f *readTrackFile) ReadAt(p []byte, off int64) (n int, err error) {
val := f.fs.currReadCount.Add(1)
defer f.fs.currReadCount.Add(-1)
for maxVal := f.fs.maxReadCount.Load(); val > maxVal; maxVal = f.fs.maxReadCount.Load() {
if f.fs.maxReadCount.CompareAndSwap(maxVal, val) {
break
}
}
return f.File.ReadAt(p, off)
}

func TestLoadBlockSema(t *testing.T) {
fs := &readTrackFS{FS: vfs.NewMem()}
sema := fifo.NewSemaphore(100)
db, err := Open("", testingRandomized(t, &Options{
Cache: cache.New(1),
FS: fs,
LoadBlockSema: sema,
}))
require.NoError(t, err)

key := func(i, j int) []byte {
return []byte(fmt.Sprintf("%02d/%02d", i, j))
}

// Create 20 regions and compact them separately, so we end up with 20
// disjoint tables.
const numRegions = 20
const numKeys = 20
for i := 0; i < numRegions; i++ {
for j := 0; j < numKeys; j++ {
require.NoError(t, db.Set(key(i, j), []byte("value"), nil))
}
require.NoError(t, db.Compact(key(i, 0), key(i, numKeys-1), false))
}

// Read all regions to warm up the table cache.
for i := 0; i < numRegions; i++ {
val, closer, err := db.Get(key(i, 1))
require.NoError(t, err)
require.Equal(t, []byte("value"), val)
if closer != nil {
closer.Close()
}
}

for _, n := range []int64{1, 2, 4} {
t.Run(fmt.Sprintf("%d", n), func(t *testing.T) {
sema.UpdateCapacity(n)
fs.maxReadCount.Store(0)
var wg sync.WaitGroup
// Spin up workers that perform random reads.
const numWorkers = 20
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
const numQueries = 100
for i := 0; i < numQueries; i++ {
val, closer, err := db.Get(key(rand.Intn(numRegions), rand.Intn(numKeys)))
require.NoError(t, err)
require.Equal(t, []byte("value"), val)
if closer != nil {
closer.Close()
}
runtime.Gosched()
}
}()
}
wg.Wait()
// Verify the maximum read count did not exceed the limit.
maxReadCount := fs.maxReadCount.Load()
require.Greater(t, maxReadCount, int32(0))
require.LessOrEqual(t, maxReadCount, int32(n))
})
}
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056
github.com/cockroachdb/errors v1.11.1
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895
github.com/cockroachdb/redact v1.1.5
github.com/cockroachdb/swiss v0.0.0-20240605133600-232b93a2b829
Expand All @@ -20,10 +21,10 @@ require (
github.com/prometheus/client_golang v1.12.0
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/perf v0.0.0-20230113213139-801c7ef9e5c5
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sync v0.7.0
golang.org/x/sys v0.17.0
)

Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 h1:slXych
github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
github.com/cockroachdb/errors v1.11.1 h1:xSEW75zKaKCWzR3OfxXUxgrk/NtT4G1MiOv5lWZazG8=
github.com/cockroachdb/errors v1.11.1/go.mod h1:8MUxA3Gi6b25tYlFEBGLf+D8aISL+M4MIpiWMSNRfxw=
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4=
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA=
Expand Down Expand Up @@ -323,8 +325,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand Down Expand Up @@ -444,8 +446,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
7 changes: 7 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"unicode"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/humanize"
Expand Down Expand Up @@ -483,6 +484,11 @@ type Options struct {
// The default cache size is 8 MB.
Cache *cache.Cache

// LoadBlockSema, if set, is used to limit the number of blocks that can be
// loaded (i.e. read from the filesystem) in parallel. Each load acquires one
// unit from the semaphore for the duration of the read.
LoadBlockSema *fifo.Semaphore

// Cleaner cleans obsolete files.
//
// The default cleaner uses the DeleteCleaner.
Expand Down Expand Up @@ -1934,6 +1940,7 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
var readerOpts sstable.ReaderOptions
if o != nil {
readerOpts.Cache = o.Cache
readerOpts.LoadBlockSema = o.LoadBlockSema
readerOpts.Comparer = o.Comparer
readerOpts.Filters = o.Filters
if o.Merger != nil {
Expand Down
6 changes: 6 additions & 0 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package sstable

import (
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
)
Expand Down Expand Up @@ -85,6 +86,11 @@ type ReaderOptions struct {
// The default cache size is a zero-size cache.
Cache *cache.Cache

// LoadBlockSema, if set, is used to limit the number of blocks that can be
// loaded (i.e. read from the filesystem) in parallel. Each load acquires one
// unit from the semaphore for the duration of the read.
LoadBlockSema *fifo.Semaphore

// User properties specified in this map will not be added to sst.Properties.UserProperties.
DeniedUserProperties map[string]struct{}

Expand Down
9 changes: 9 additions & 0 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,15 @@ func (r *Reader) readBlock(
}

// Cache miss.

if sema := r.opts.LoadBlockSema; sema != nil {
if err := sema.Acquire(ctx, 1); err != nil {
// An error here can only come from the context.
return bufferHandle{}, err
}
defer sema.Release(1)
}

var compressed cacheValueOrBuf
if bufferPool != nil {
compressed = cacheValueOrBuf{
Expand Down
2 changes: 1 addition & 1 deletion testdata/ingest
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Virtual tables: 0 (0B)
Local tables size: 569B
Compression types: snappy: 1
Block cache: 6 entries (945B) hit rate: 30.8%
Table cache: 1 entries (760B) hit rate: 50.0%
Table cache: 1 entries (768B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down
14 changes: 7 additions & 7 deletions testdata/metrics
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 1
Block cache: 3 entries (484B) hit rate: 0.0%
Table cache: 1 entries (760B) hit rate: 0.0%
Table cache: 1 entries (768B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -214,7 +214,7 @@ Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 3 entries (484B) hit rate: 33.3%
Table cache: 1 entries (760B) hit rate: 66.7%
Table cache: 1 entries (768B) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -488,7 +488,7 @@ Virtual tables: 0 (0B)
Local tables size: 4.3KB
Compression types: snappy: 7
Block cache: 12 entries (1.9KB) hit rate: 9.1%
Table cache: 1 entries (760B) hit rate: 53.8%
Table cache: 1 entries (768B) hit rate: 53.8%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -551,7 +551,7 @@ Virtual tables: 0 (0B)
Local tables size: 6.1KB
Compression types: snappy: 10
Block cache: 12 entries (1.9KB) hit rate: 9.1%
Table cache: 1 entries (760B) hit rate: 53.8%
Table cache: 1 entries (768B) hit rate: 53.8%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -822,7 +822,7 @@ Virtual tables: 0 (0B)
Local tables size: 0B
Compression types: snappy: 1
Block cache: 1 entries (440B) hit rate: 0.0%
Table cache: 1 entries (760B) hit rate: 0.0%
Table cache: 1 entries (768B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -869,7 +869,7 @@ Virtual tables: 0 (0B)
Local tables size: 0B
Compression types: snappy: 2
Block cache: 6 entries (996B) hit rate: 0.0%
Table cache: 1 entries (760B) hit rate: 50.0%
Table cache: 1 entries (768B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -917,7 +917,7 @@ Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 3
Block cache: 6 entries (996B) hit rate: 0.0%
Table cache: 1 entries (760B) hit rate: 50.0%
Table cache: 1 entries (768B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down

0 comments on commit 9b0824f

Please sign in to comment.