Skip to content

Commit

Permalink
external: use 2 membuf pools of difference block size (#51006)
Browse files Browse the repository at this point in the history
ref #50752
  • Loading branch information
lance6716 authored Feb 7, 2024
1 parent bbfea62 commit 8988b4c
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 174 deletions.
299 changes: 189 additions & 110 deletions br/pkg/lightning/backend/external/bench_test.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
var (
// ConcurrentReaderBufferSizePerConc is the buffer size for concurrent reader per
// concurrency.
ConcurrentReaderBufferSizePerConc = int(2 * size.MB)
readAllDataConcThreshold = uint64(16)
ConcurrentReaderBufferSizePerConc = int(8 * size.MB)
// in readAllData, expected concurrency less than this value will not use
// concurrent reader.
readAllDataConcThreshold = uint64(4)
)

// byteReader provides structured reading on a byte stream of external storage.
Expand Down
96 changes: 67 additions & 29 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ import (
const maxCloudStorageConnections = 1000

type memKVsAndBuffers struct {
mu sync.Mutex
keys [][]byte
values [][]byte
mu sync.Mutex
keys [][]byte
values [][]byte
// memKVBuffers contains two types of buffer, first half are used for small block
// buffer, second half are used for large one.
memKVBuffers []*membuf.Buffer
size int
droppedSize int
Expand Down Expand Up @@ -92,14 +94,15 @@ func (b *memKVsAndBuffers) build(ctx context.Context) {

// Engine stored sorted key/value pairs in an external storage.
type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
startKey []byte
endKey []byte
splitKeys [][]byte
regionSplitSize int64
bufPool *membuf.Pool
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
startKey []byte
endKey []byte
splitKeys [][]byte
regionSplitSize int64
smallBlockBufPool *membuf.Pool
largeBlockBufPool *membuf.Pool

memKVsAndBuffers memKVsAndBuffers

Expand All @@ -126,7 +129,10 @@ type Engine struct {
importedKVCount *atomic.Int64
}

const memLimit = 12 * units.GiB
const (
memLimit = 12 * units.GiB
smallBlockSize = units.MiB
)

// NewExternalEngine creates an (external) engine.
func NewExternalEngine(
Expand Down Expand Up @@ -156,7 +162,12 @@ func NewExternalEngine(
endKey: endKey,
splitKeys: splitKeys,
regionSplitSize: regionSplitSize,
bufPool: membuf.NewPool(
smallBlockBufPool: membuf.NewPool(
membuf.WithBlockNum(0),
membuf.WithPoolMemoryLimiter(memLimiter),
membuf.WithBlockSize(smallBlockSize),
),
largeBlockBufPool: membuf.NewPool(
membuf.WithBlockNum(0),
membuf.WithPoolMemoryLimiter(memLimiter),
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
Expand Down Expand Up @@ -222,16 +233,28 @@ func getFilesReadConcurrency(
}
startOffs, endOffs := offsets[0], offsets[1]
for i := range statsFiles {
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
expectedConc := (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
// let the stat internals cover the [startKey, endKey) since seekPropsOffsets
// always return an offset that is less than or equal to the key.
result[i] += 1
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
zap.String("filename", statsFiles[i]),
zap.Uint64("startOffset", startOffs[i]),
zap.Uint64("endOffset", endOffs[i]),
zap.Uint64("expected concurrency", result[i]),
)
expectedConc += 1
// readAllData will enable concurrent read and use large buffer if result[i] > 1
// when expectedConc < readAllDataConcThreshold, we don't use concurrent read to
// reduce overhead
if expectedConc >= readAllDataConcThreshold {
result[i] = expectedConc
} else {
result[i] = 1
}
// only log for files with expected concurrency > 1, to avoid too many logs
if expectedConc > 1 {
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
zap.String("filename", statsFiles[i]),
zap.Uint64("startOffset", startOffs[i]),
zap.Uint64("endOffset", endOffs[i]),
zap.Uint64("expectedConc", expectedConc),
zap.Uint64("concurrency", result[i]),
)
}
}
return result, startOffs, nil
}
Expand All @@ -252,7 +275,8 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, startKey, endKey []byt
e.statsFiles,
startKey,
endKey,
e.bufPool,
e.smallBlockBufPool,
e.largeBlockBufPool,
&e.memKVsAndBuffers,
)
if err != nil {
Expand Down Expand Up @@ -443,21 +467,35 @@ func (e *Engine) SplitRanges(

// Close implements common.Engine.
func (e *Engine) Close() error {
if e.bufPool != nil {
e.bufPool.Destroy()
e.bufPool = nil
if e.smallBlockBufPool != nil {
e.smallBlockBufPool.Destroy()
e.smallBlockBufPool = nil
}
if e.largeBlockBufPool != nil {
e.largeBlockBufPool.Destroy()
e.largeBlockBufPool = nil
}
e.storage.Close()
return nil
}

// Reset resets the memory buffer pool.
func (e *Engine) Reset() error {
if e.bufPool != nil {
e.bufPool.Destroy()
memLimiter := membuf.NewLimiter(memLimit)
e.bufPool = membuf.NewPool(
memLimiter := membuf.NewLimiter(memLimit)
if e.smallBlockBufPool != nil {
e.smallBlockBufPool.Destroy()
e.smallBlockBufPool = membuf.NewPool(
membuf.WithBlockNum(0),
membuf.WithPoolMemoryLimiter(memLimiter),
membuf.WithBlockSize(smallBlockSize),
)
}
if e.largeBlockBufPool != nil {
e.largeBlockBufPool.Destroy()
e.largeBlockBufPool = membuf.NewPool(
membuf.WithBlockNum(0),
membuf.WithPoolMemoryLimiter(memLimiter),
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
)
}
return nil
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/merge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func MergeOverlappingFilesV2(
curStart,
curEnd,
bufPool,
bufPool,
loaded,
)
if err1 != nil {
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/external/misc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

"github.com/felixge/fgprof"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
Expand All @@ -39,6 +40,7 @@ func openTestingStorage(t *testing.T) storage.ExternalStorage {
}
s, err := storage.NewFromURL(context.Background(), *testingStorageURI)
require.NoError(t, err)
t.Cleanup(s.Close)
return s
}

Expand Down Expand Up @@ -347,6 +349,48 @@ func createAscendingFiles(
return kvCnt, minKey, maxKey
}

type profiler struct {
onAndOffCPUProf bool
peakMemProf bool

caseIdx int

onAndOffCPUProfCloser func() error
heapProfDoneCh chan struct{}
heapProfWg *sync.WaitGroup
}

func newProfiler(onAndOffCPUProf, peakMemProf bool) *profiler {
return &profiler{
onAndOffCPUProf: onAndOffCPUProf,
peakMemProf: peakMemProf,
}
}

func (p *profiler) beforeTest() {
p.caseIdx++
if p.onAndOffCPUProf {
fileCPU, err := os.Create(fmt.Sprintf("on-and-off-cpu-%d.prof", p.caseIdx))
intest.AssertNoError(err)
p.onAndOffCPUProfCloser = fgprof.Start(fileCPU, fgprof.FormatPprof)
}
if p.peakMemProf {
fileHeap := fmt.Sprintf("heap-%d.prof", p.caseIdx)
p.heapProfDoneCh, p.heapProfWg = recordHeapForMaxInUse(fileHeap)
}
}

func (p *profiler) afterTest() {
if p.onAndOffCPUProf {
err := p.onAndOffCPUProfCloser()
intest.AssertNoError(err)
}
if p.peakMemProf {
close(p.heapProfDoneCh)
p.heapProfWg.Wait()
}
}

func recordHeapForMaxInUse(filename string) (chan struct{}, *sync.WaitGroup) {
doneCh := make(chan struct{})
var wg sync.WaitGroup
Expand Down
84 changes: 53 additions & 31 deletions br/pkg/lightning/backend/external/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func readAllData(
store storage.ExternalStorage,
dataFiles, statsFiles []string,
startKey, endKey []byte,
bufPool *membuf.Pool,
smallBlockBufPool *membuf.Pool,
largeBlockBufPool *membuf.Pool,
output *memKVsAndBuffers,
) (err error) {
task := log.BeginTask(logutil.Logger(ctx), "read all data")
Expand Down Expand Up @@ -69,36 +70,59 @@ func readAllData(
startKey,
endKey,
)
// TODO(lance6716): refine adjust concurrency
for i, c := range concurrences {
if c < readAllDataConcThreshold {
concurrences[i] = 1
}
}

if err != nil {
return err
}

eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
// limit the concurrency to avoid open too many connections at the same time
eg.SetLimit(1000)
for i := range dataFiles {
i := i
readConn := 1000
readConn = min(readConn, len(dataFiles))
taskCh := make(chan int)
output.memKVBuffers = make([]*membuf.Buffer, readConn*2)
for readIdx := 0; readIdx < readConn; readIdx++ {
readIdx := readIdx
eg.Go(func() error {
err2 := readOneFile(
egCtx,
store,
dataFiles[i],
startKey,
endKey,
startOffsets[i],
concurrences[i],
bufPool,
output,
)
return errors.Annotatef(err2, "failed to read file %s", dataFiles[i])
output.memKVBuffers[readIdx] = smallBlockBufPool.NewBuffer()
output.memKVBuffers[readIdx+readConn] = largeBlockBufPool.NewBuffer()
smallBlockBuf := output.memKVBuffers[readIdx]
largeBlockBuf := output.memKVBuffers[readIdx+readConn]

for {
select {
case <-egCtx.Done():
return egCtx.Err()
case fileIdx, ok := <-taskCh:
if !ok {
return nil
}
err2 := readOneFile(
egCtx,
store,
dataFiles[fileIdx],
startKey,
endKey,
startOffsets[fileIdx],
concurrences[fileIdx],
smallBlockBuf,
largeBlockBuf,
output,
)
if err2 != nil {
return errors.Annotatef(err2, "failed to read file %s", dataFiles[fileIdx])
}
}
}
})
}

for fileIdx := range dataFiles {
select {
case <-egCtx.Done():
return eg.Wait()
case taskCh <- fileIdx:
}
}
close(taskCh)
return eg.Wait()
}

Expand All @@ -109,7 +133,8 @@ func readOneFile(
startKey, endKey []byte,
startOffset uint64,
concurrency uint64,
bufPool *membuf.Pool,
smallBlockBuf *membuf.Buffer,
largeBlockBuf *membuf.Buffer,
output *memKVsAndBuffers,
) error {
readAndSortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_one_file")
Expand All @@ -127,16 +152,14 @@ func readOneFile(
dataFile,
int(concurrency),
ConcurrentReaderBufferSizePerConc,
bufPool.NewBuffer(),
largeBlockBuf,
)
err = rd.byteReader.switchConcurrentMode(true)
if err != nil {
return err
}
}

// this buffer is associated with data slices and will return to caller
memBuf := bufPool.NewBuffer()
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
size := 0
Expand All @@ -159,15 +182,14 @@ func readOneFile(
}
// TODO(lance6716): we are copying every KV from rd's buffer to memBuf, can we
// directly read into memBuf?
keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
keys = append(keys, smallBlockBuf.AddBytes(k))
values = append(values, smallBlockBuf.AddBytes(v))
size += len(k) + len(v)
}
readAndSortDurHist.Observe(time.Since(ts).Seconds())
output.mu.Lock()
output.keysPerFile = append(output.keysPerFile, keys)
output.valuesPerFile = append(output.valuesPerFile, values)
output.memKVBuffers = append(output.memKVBuffers, memBuf)
output.size += size
output.droppedSizePerFile = append(output.droppedSizePerFile, droppedSize)
output.mu.Unlock()
Expand Down
Loading

0 comments on commit 8988b4c

Please sign in to comment.