Skip to content

Commit

Permalink
global sort: speed up 2-level iterator (#49832)
Browse files Browse the repository at this point in the history
ref #49717
  • Loading branch information
lance6716 authored Dec 29, 2023
1 parent 36cafb2 commit 8a79c0d
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 25 deletions.
157 changes: 132 additions & 25 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"container/heap"
"context"
"io"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/membuf"
Expand Down Expand Up @@ -168,7 +169,7 @@ func openAndGetFirstElem[
}

// newMergeIter creates a merge iterator for multiple sorted reader opener
// functions.
// functions. mergeIter.readers will have same order as input readerOpeners.
func newMergeIter[
T heapElem,
R sortedReader[T],
Expand Down Expand Up @@ -373,37 +374,38 @@ func newLimitSizeMergeIter[
if err != nil {
return nil, errors.Trace(err)
}
i := &limitSizeMergeIter[T, R]{
ret := &limitSizeMergeIter[T, R]{
mergeIter: iter,
readerOpeners: readerOpeners,
weights: weights,
weightSum: cur,
nextReaderIdx: end,
limit: limit,
}
return i, nil
}

func (i *limitSizeMergeIter[T, R]) next() (ok bool, closeReaderIdx int) {
closeReaderIdx, ok = i.mergeIter.next()
if closeReaderIdx == -1 {
return
// newMergeIter may close readers if the reader has no content, so we need to
// fill more
for i, rp := range iter.readers {
if rp != nil {
continue
}
ret.weightSum -= weights[i]
}
// limitSizeMergeIter will try to open next reader when one reader is closed.
i.weightSum -= i.weights[closeReaderIdx]

return ret, ret.tryOpenMoreReaders()
}

func (i *limitSizeMergeIter[T, R]) tryOpenMoreReaders() error {
for i.nextReaderIdx < len(i.readerOpeners) {
weight := i.weights[i.nextReaderIdx]
if i.weightSum+weight > i.limit {
return
return nil
}

opener := i.readerOpeners[i.nextReaderIdx]
i.nextReaderIdx++
newReaders, firstElements, err := openAndGetFirstElem(opener)
if err != nil {
i.mergeIter.err = err
return false, closeReaderIdx
return err
}
newReader := newReaders[0]
newReaderIdx := len(i.mergeIter.readers)
Expand All @@ -419,11 +421,29 @@ func (i *limitSizeMergeIter[T, R]) next() (ok bool, closeReaderIdx int) {
})
heap.Fix(&i.mergeIter.h, len(i.mergeIter.h)-1)
i.weightSum += weight
// we need to call next once because mergeIter doesn't use h[0] as current value,
// but a separate curr field
if !ok && i.mergeIter.h.Len() == 1 {
_, ok = i.mergeIter.next()
}
}
return nil
}

func (i *limitSizeMergeIter[T, R]) next() (ok bool, closeReaderIdx int) {
closeReaderIdx, ok = i.mergeIter.next()
if closeReaderIdx == -1 {
return
}

mergeIterDrained := !ok && i.mergeIter.h.Len() == 0

// limitSizeMergeIter will try to open next reader when one reader is closed.
i.weightSum -= i.weights[closeReaderIdx]
if err := i.tryOpenMoreReaders(); err != nil {
i.mergeIter.err = err
return false, closeReaderIdx
}

// we need to call next once because mergeIter doesn't use h[0] as current value,
// but a separate curr field
if mergeIterDrained && i.mergeIter.h.Len() > 0 {
_, ok = i.mergeIter.next()
}
return
}
Expand Down Expand Up @@ -594,8 +614,17 @@ func (p statReaderProxy) close() error {
type mergePropBaseIter struct {
iter *limitSizeMergeIter[*rangeProperty, statReaderProxy]
closeReaderFlag *bool
closeCh chan struct{}
wg *sync.WaitGroup
}

type readerAndError struct {
r *statReaderProxy
err error
}

var errMergePropBaseIterClosed = errors.New("mergePropBaseIter is closed")

func newMergePropBaseIter(
ctx context.Context,
multiStat MultipleFilesStat,
Expand All @@ -619,24 +648,100 @@ func newMergePropBaseIter(
// to support this.
limit = multiStat.MaxOverlappingNum + 1
}
limit = min(limit, int64(len(multiStat.Filenames)))

// we are rely on the caller have reduced the overall overlapping to less than
// MergeSortOverlapThreshold for []MultipleFilesStat. And we are going to open
// about 8000 connection to read files.
preOpenLimit := limit * 8
preOpenLimit = min(preOpenLimit, int64(len(multiStat.Filenames)))
preOpenCh := make(chan chan readerAndError, preOpenLimit-limit)
closeCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer close(preOpenCh)
defer wg.Done()
// newLimitSizeMergeIter will open #limit readers at the beginning, and for rest
// readers we open them in advance to reduce block when we need to open them.
for i := int(limit); i < len(multiStat.Filenames); i++ {
filePair := multiStat.Filenames[i]
path := filePair[1]
asyncTask := make(chan readerAndError, 1)
wg.Add(1)
go func() {
defer close(asyncTask)
defer wg.Done()
rd, err := newStatsReader(ctx, exStorage, path, 500*1024)
select {
case <-closeCh:
_ = rd.Close()
return
case asyncTask <- readerAndError{r: &statReaderProxy{p: path, r: rd}, err: err}:
}
}()
select {
case <-closeCh:
// when close, no other methods is called simultaneously, so this goroutine can
// check the size of channel and drain all
for j := len(preOpenCh); j > 0; j-- {
asyncTask2 := <-preOpenCh
t, ok := <-asyncTask2
if !ok {
continue
}
_ = t.r.close()
}
t, ok := <-asyncTask
if ok {
_ = t.r.close()
}
return
case preOpenCh <- asyncTask:
}
}
}()

readerOpeners := make([]readerOpenerFn[*rangeProperty, statReaderProxy], 0, len(multiStat.Filenames))
for _, filePair := range multiStat.Filenames {
path := filePair[1]
// first `limit` reader will be opened by newLimitSizeMergeIter
for i := 0; i < int(limit); i++ {
path := multiStat.Filenames[i][1]
readerOpeners = append(readerOpeners, func() (*statReaderProxy, error) {
rd, err := newStatsReader(ctx, exStorage, path, 4096)
rd, err := newStatsReader(ctx, exStorage, path, 500*1024)
if err != nil {
return nil, err
}
return &statReaderProxy{p: filePair[1], r: rd}, nil
return &statReaderProxy{p: path, r: rd}, nil
})
}
// rest reader will be opened in above goroutine, just read them from channel
for i := int(limit); i < len(multiStat.Filenames); i++ {
readerOpeners = append(readerOpeners, func() (*statReaderProxy, error) {
select {
case <-closeCh:
return nil, errMergePropBaseIterClosed
case asyncTask, ok := <-preOpenCh:
if !ok {
return nil, errMergePropBaseIterClosed
}
select {
case <-closeCh:
return nil, errMergePropBaseIterClosed
case t, ok := <-asyncTask:
if !ok {
return nil, errMergePropBaseIterClosed
}
return t.r, t.err
}
}
})
}
weight := make([]int64, len(readerOpeners))
for i := range weight {
weight[i] = 1
}
i, err := newLimitSizeMergeIter(ctx, readerOpeners, weight, limit)
return &mergePropBaseIter{iter: i}, err
return &mergePropBaseIter{iter: i, closeCh: closeCh, wg: wg}, err
}

func (m mergePropBaseIter) path() string {
Expand All @@ -649,7 +754,6 @@ func (m mergePropBaseIter) next() (*rangeProperty, error) {
*m.closeReaderFlag = true
}
if !ok {
// TODO(lance6716): explain it??
if m.iter.err == nil {
return nil, io.EOF
}
Expand All @@ -662,7 +766,10 @@ func (m mergePropBaseIter) switchConcurrentMode(bool) error {
return nil
}

// close should not be called concurrently with next.
func (m mergePropBaseIter) close() error {
close(m.closeCh)
m.wg.Wait()
return m.iter.close()
}

Expand Down
124 changes: 124 additions & 0 deletions br/pkg/lightning/backend/external/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,127 @@ func TestLimitSizeMergeIterDiffWeight(t *testing.T) {
require.NoError(t, iter.err)
require.Equal(t, int64(0), refCnt.Load())
}

type slowOpenStorage struct {
*storage.MemStorage
sleep time.Duration
openCnt atomic.Int32
}

func (s *slowOpenStorage) Open(
ctx context.Context,
filePath string,
o *storage.ReaderOption,
) (storage.ExternalFileReader, error) {
time.Sleep(s.sleep)
s.openCnt.Inc()
return s.MemStorage.Open(ctx, filePath, o)
}

func TestMergePropBaseIter(t *testing.T) {
// this test should be finished around 1 second. However, due to CI is not
// stable, we don't check the time.
oneOpenSleep := time.Second

fileNum := 16
filenames := make([]string, fileNum)
for i := range filenames {
filenames[i] = fmt.Sprintf("/test%06d", i)
}
ctx := context.Background()
store := &slowOpenStorage{
MemStorage: storage.NewMemStorage(),
sleep: oneOpenSleep,
}
for i, filename := range filenames {
writer, err := store.Create(ctx, filename, nil)
require.NoError(t, err)
prop := &rangeProperty{firstKey: []byte{byte(i)}}
buf := encodeMultiProps(nil, []*rangeProperty{prop})
_, err = writer.Write(ctx, buf)
require.NoError(t, err)
err = writer.Close(ctx)
require.NoError(t, err)
}

multiStat := MultipleFilesStat{MaxOverlappingNum: 1}
for _, f := range filenames {
multiStat.Filenames = append(multiStat.Filenames, [2]string{"", f})
}
iter, err := newMergePropBaseIter(ctx, multiStat, store)
require.NoError(t, err)
for i := 0; i < fileNum; i++ {
p, err := iter.next()
require.NoError(t, err)
require.EqualValues(t, i, p.firstKey[0])
}

_, err = iter.next()
require.ErrorIs(t, err, io.EOF)
require.EqualValues(t, fileNum, store.openCnt.Load())
}

func TestEmptyBaseReader4LimitSizeMergeIter(t *testing.T) {
fileNum := 100
filenames := make([]string, fileNum)
for i := range filenames {
filenames[i] = fmt.Sprintf("/test%06d", i)
}
ctx := context.Background()
store := &slowOpenStorage{
MemStorage: storage.NewMemStorage(),
}
// empty file so reader will be closed at init
for _, filename := range filenames {
writer, err := store.Create(ctx, filename, nil)
require.NoError(t, err)
err = writer.Close(ctx)
require.NoError(t, err)
}

multiStat := MultipleFilesStat{MaxOverlappingNum: 1}
for _, f := range filenames {
multiStat.Filenames = append(multiStat.Filenames, [2]string{"", f})
}
iter, err := newMergePropBaseIter(ctx, multiStat, store)
require.NoError(t, err)

_, err = iter.next()
require.ErrorIs(t, err, io.EOF)
require.EqualValues(t, fileNum, store.openCnt.Load())
}

func TestCloseLimitSizeMergeIterHalfway(t *testing.T) {
fileNum := 10000
filenames := make([]string, fileNum)
for i := range filenames {
filenames[i] = fmt.Sprintf("/test%06d", i)
}
ctx := context.Background()
store := &trackOpenMemStorage{MemStorage: storage.NewMemStorage()}

for i, filename := range filenames {
writer, err := store.Create(ctx, filename, nil)
require.NoError(t, err)
prop := &rangeProperty{firstKey: []byte{byte(i)}}
buf := encodeMultiProps(nil, []*rangeProperty{prop})
_, err = writer.Write(ctx, buf)
require.NoError(t, err)
err = writer.Close(ctx)
require.NoError(t, err)
}

multiStat := MultipleFilesStat{MaxOverlappingNum: 1}
for _, f := range filenames {
multiStat.Filenames = append(multiStat.Filenames, [2]string{"", f})
}
iter, err := newMergePropBaseIter(ctx, multiStat, store)
require.NoError(t, err)

_, err = iter.next()
require.NoError(t, err)

err = iter.close()
require.NoError(t, err)
require.EqualValues(t, 0, store.opened.Load())
}

0 comments on commit 8a79c0d

Please sign in to comment.