Skip to content

Commit

Permalink
chunk: Dedup prefetch/partial read with full block read to reduce rea…
Browse files Browse the repository at this point in the history
…d amplification (#5219)

Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit authored Oct 14, 2024
1 parent c011d62 commit fd191ea
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 11 deletions.
22 changes: 19 additions & 3 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,22 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
if s.store.downLimit != nil {
s.store.downLimit.Wait(int64(len(p)))
}
fullPage, err := s.store.group.TryPiggyback(key)
if fullPage != nil {
defer fullPage.Release()
if err == nil { // piggybacked a full read
n = copy(p, fullPage.Data[boff:])
return n, nil
}
}
// partial read
st := time.Now()
var (
reqID string
sc = object.DefaultStorageClass
)
page.Acquire()
err := utils.WithTimeout(func() error {
err = utils.WithTimeout(func() error {
defer page.Release()
in, err := s.store.storage.Get(key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
Expand Down Expand Up @@ -762,7 +770,7 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
seekable: compressor.CompressBound(0) == 0,
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
pendingKeys: make(map[string]*pendingItem),
group: &Controller{},
group: NewController(),
}
if config.UploadLimit > 0 {
// there are overheads coming from HTTP/TCP/IP
Expand Down Expand Up @@ -809,7 +817,15 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
}
p := NewOffPage(size)
defer p.Release()
_ = store.load(key, p, true, true)
block, err := store.group.Execute(key, func() (*Page, error) { // dedup requests with full read
p.Acquire()
err := store.load(key, p, false, false) // delay writing cache until singleflight ends to prevent blocking waiters
return p, err
})
defer block.Release()
if err == nil && block == p {
store.bcache.cache(key, block, true, !store.conf.OSCache)
}
})

if store.conf.CacheDir != "memory" && store.conf.Writeback {
Expand Down
6 changes: 6 additions & 0 deletions pkg/chunk/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func testStore(t *testing.T, store ChunkStore) {
} else if string(p.Data[:n]) != "world" {
t.Fatalf("not expected: %s", string(p.Data[:n]))
}
p = NewPage(make([]byte, 5))
if n, err := reader.ReadAt(context.Background(), p, 0); n != 5 || err != nil {
t.Fatalf("read failed: %d %s", n, err)
} else if string(p.Data[:n]) != "hello" {
t.Fatalf("not expected: %s", string(p.Data[:n]))
}
p = NewPage(make([]byte, 20))
if n, err := reader.ReadAt(context.Background(), p, offset); n != 11 || err != nil && err != io.EOF {
t.Fatalf("read failed: %d %s", n, err)
Expand Down
21 changes: 18 additions & 3 deletions pkg/chunk/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ type Controller struct {
rs map[string]*request
}

func NewController() *Controller {
return &Controller{
rs: make(map[string]*request),
}
}

func (con *Controller) Execute(key string, fn func() (*Page, error)) (*Page, error) {
con.Lock()
if con.rs == nil {
con.rs = make(map[string]*request)
}
if c, ok := con.rs[key]; ok {
c.dups++
con.Unlock()
Expand All @@ -60,3 +63,15 @@ func (con *Controller) Execute(key string, fn func() (*Page, error)) (*Page, err

return c.val, c.err
}

func (con *Controller) TryPiggyback(key string) (*Page, error) {
con.Lock()
if c, ok := con.rs[key]; ok {
c.dups++
con.Unlock()
c.wg.Wait()
return c.val, c.err
}
con.Unlock()
return nil, nil
}
29 changes: 25 additions & 4 deletions pkg/chunk/singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package chunk

import (
"bytes"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -25,35 +26,55 @@ import (
)

func TestSingleFlight(t *testing.T) {
g := &Controller{}
g := NewController()
gp := &sync.WaitGroup{}
var cache sync.Map
var n int32
var piggyback atomic.Int64
iters := 100000
for i := 0; i < iters; i++ {
gp.Add(1)
gp.Add(2)
go func(k int) {
p, _ := g.Execute(strconv.Itoa(k/100), func() (*Page, error) {
time.Sleep(time.Microsecond * 500000) // In most cases 500ms is enough to run 100 goroutines
atomic.AddInt32(&n, 1)
return NewOffPage(100), nil
page := NewOffPage(100)
copy(page.Data, make([]byte, 100)) // zeroed
copy(page.Data, strconv.Itoa(k/100))
return page, nil
})
p.Release()
cache.LoadOrStore(strconv.Itoa(k/100), p)
gp.Done()
}(i)
go func(k int) {
defer gp.Done()
page, _ := g.TryPiggyback(strconv.Itoa(k / 100))
if page != nil {
expected := make([]byte, 100)
copy(expected, strconv.Itoa(k/100))
if bytes.Compare(page.Data, expected) != 0 {
t.Fatalf("got %x, want %x, key: %d", page.Data, expected, k/100)
}
page.Release()
piggyback.Add(1)
}
}(i)
}
gp.Wait()

nv := int(atomic.LoadInt32(&n))
if nv != iters/100 {
t.Fatalf("singleflight doesn't take effect: %v", nv)
}
if piggyback.Load() == 0 {
t.Fatal("never piggybacked?")
}

// verify the ref
cache.Range(func(key any, value any) bool {
if value.(*Page).refs != 0 {
t.Fatal("refs of page is not 0")
t.Fatalf("refs of page is not 0, got: %d, key: %s", value.(*Page).refs, key)
}
return true
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/object/restful.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func GetHttpClient() *http.Client {

func cleanup(response *http.Response) {
if response != nil && response.Body != nil {
_, _ = io.ReadAll(response.Body)
_, _ = io.Copy(io.Discard, response.Body)
_ = response.Body.Close()
}
}
Expand Down

0 comments on commit fd191ea

Please sign in to comment.