Skip to content

Commit

Permalink
ddl: reuse chunk for copr-read and check context done (#39473)
Browse files Browse the repository at this point in the history
close #39468, close #39470
  • Loading branch information
tangenta authored Dec 1, 2022
1 parent 6a4b909 commit e307642
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 45 deletions.
2 changes: 1 addition & 1 deletion ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, done, err := pool.fetchRowColValsFromCop(*task)
idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task)
pool.close()
return idxRec, done, err
}
Expand Down
9 changes: 4 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
Expand Down Expand Up @@ -1498,11 +1499,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC

var (
idxRecords []*indexRecord
copChunk *chunk.Chunk // only used by the coprocessor request sender.
nextKey kv.Key
taskDone bool
)
if w.copReqSenderPool != nil {
idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
idxRecords, copChunk, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
defer w.copReqSenderPool.recycleIdxRecordsAndChunk(idxRecords, copChunk)
} else {
idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange)
}
Expand Down Expand Up @@ -1567,10 +1570,6 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
taskCtx.addedCount++
}

if w.copReqSenderPool != nil {
w.copReqSenderPool.recycleIdxRecords(idxRecords)
}

return nil
})
logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000)
Expand Down
96 changes: 57 additions & 39 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -46,7 +47,10 @@ import (
// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range.
const copReadBatchFactor = 10

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) {
// copReadConcurrencyFactor is the factor of concurrency of coprocessor read.
const copReadConcurrencyFactor = 10

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
Expand All @@ -55,10 +59,10 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask)
if !ok {
logutil.BgLogger().Info("[ddl-ingest] cop-response channel is closed",
zap.Int("id", handleRange.id), zap.String("task", handleRange.String()))
return nil, handleRange.endKey, true, nil
return nil, nil, handleRange.endKey, true, nil
}
if rs.err != nil {
return nil, handleRange.startKey, false, rs.err
return nil, nil, handleRange.startKey, false, rs.err
}
if rs.done {
logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task",
Expand All @@ -69,15 +73,15 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask)
logutil.BgLogger().Info("[ddl-ingest] task is found in results",
zap.Int("id", handleRange.id), zap.String("task", handleRange.String()))
c.results.Delete(handleRange.id)
return rs.records, handleRange.endKey, true, nil
return rs.records, rs.chunk, handleRange.endKey, true, nil
}
return rs.records, handleRange.startKey, false, nil
return rs.records, rs.chunk, handleRange.startKey, false, nil
case <-ticker.C:
logutil.BgLogger().Info("[ddl-ingest] cop-request result channel is empty",
zap.Int("id", handleRange.id))
if _, found := c.results.Load(handleRange.id); found {
c.results.Delete(handleRange.id)
return nil, handleRange.endKey, true, nil
return nil, nil, handleRange.endKey, true, nil
}
}
}
Expand All @@ -95,9 +99,8 @@ type copReqSenderPool struct {
senders []*copReqSender
wg sync.WaitGroup

idxBufPool sync.Pool // []*indexRecord
srcChkPool sync.Pool // *chunk.Chunk
binding generic.SyncMap[*[]*indexRecord, *chunk.Chunk]
idxBufPool chan []*indexRecord
srcChkPool chan *chunk.Chunk
}

type copReqSender struct {
Expand Down Expand Up @@ -132,35 +135,37 @@ func (c *copReqSender) run() {
idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.recycleIdxRecordsAndChunk(idxRec, srcChk)
terror.Call(rs.Close)
_ = rs.Close()
return
}
total += len(idxRec)
p.resultsCh <- idxRecResult{id: task.id, records: idxRec, done: done, total: total}
p.resultsCh <- idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total}
}
terror.Call(rs.Close)
}
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor)
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize())
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize()))
}
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultsCh: make(chan idxRecResult, backfillTaskChanSize),
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: sync.Pool{
New: func() any {
return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
srcChkPool: sync.Pool{
New: func() any {
return chunk.NewChunkWithCapacity(copCtx.fieldTps, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
binding: generic.NewSyncMap[*[]*indexRecord, *chunk.Chunk](4),
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultsCh: make(chan idxRecResult, backfillTaskChanSize),
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: idxBufPool,
srcChkPool: srcChkPool,
}
}

Expand Down Expand Up @@ -195,30 +200,41 @@ func (c *copReqSenderPool) close() {
for _, w := range c.senders {
w.cancel()
}
cleanupWg := util.WaitGroupWrapper{}
cleanupWg.Run(c.drainResults)
// Wait for all cop-req senders to exit.
c.wg.Wait()
close(c.resultsCh)
cleanupWg.Wait()
close(c.idxBufPool)
close(c.srcChkPool)
}

func (c *copReqSenderPool) drainResults() {
// Consume the rest results because the writers are inactive anymore.
for rs := range c.resultsCh {
c.recycleIdxRecordsAndChunk(rs.records, rs.chunk)
}
}

func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) {
ir, chk := c.idxBufPool.Get().([]*indexRecord), c.srcChkPool.Get().(*chunk.Chunk)
ir := <-c.idxBufPool
chk := <-c.srcChkPool
newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
chk.Reset()
c.binding.Store(&ir, chk)
return ir[:0], chk
}

// recycleIdxRecords puts the index record slice back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) {
if idxRecs == nil {
// recycleIdxRecordsAndChunk puts the index record slice and the chunk back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecordsAndChunk(idxRecs []*indexRecord, chk *chunk.Chunk) {
if idxRecs == nil || chk == nil {
return
}
c.idxBufPool.Put(idxRecs[:0])
if bindingChunk, ok := c.binding.Load(&idxRecs); ok {
c.srcChkPool.Put(bindingChunk)
}
c.idxBufPool <- idxRecs
c.srcChkPool <- chk
}

// copContext contains the information that is needed when building a coprocessor request.
Expand Down Expand Up @@ -449,7 +465,8 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C
datumBuf := make([]types.Datum, 0, len(offsets))
for _, offset := range offsets {
c := expCols[offset]
datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType()))
rowDt := row.GetDatum(offset, c.GetType())
datumBuf = append(datumBuf, rowDt)
}
return datumBuf
}
Expand All @@ -470,6 +487,7 @@ func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
type idxRecResult struct {
id int
records []*indexRecord
chunk *chunk.Chunk
err error
done bool
total int
Expand Down

0 comments on commit e307642

Please sign in to comment.