From e307642d9fa5c9ca09bd418d21f8868cb02afccc Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 1 Dec 2022 19:06:02 +0800 Subject: [PATCH] ddl: reuse chunk for copr-read and check context done (#39473) close pingcap/tidb#39468, close pingcap/tidb#39470 --- ddl/export_test.go | 2 +- ddl/index.go | 9 ++--- ddl/index_cop.go | 96 +++++++++++++++++++++++++++------------------- 3 files changed, 62 insertions(+), 45 deletions(-) diff --git a/ddl/export_test.go b/ddl/export_test.go index 641d7ce72fc8c..486390f9a6810 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -39,7 +39,7 @@ func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS pool := newCopReqSenderPool(context.Background(), copCtx, startTS) pool.adjustSize(1) pool.tasksCh <- task - idxRec, _, done, err := pool.fetchRowColValsFromCop(*task) + idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task) pool.close() return idxRec, done, err } diff --git a/ddl/index.go b/ddl/index.go index 818dc1eac7738..163e9c407a808 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" decoder "github.com/pingcap/tidb/util/rowDecoder" @@ -1498,11 +1499,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC var ( idxRecords []*indexRecord + copChunk *chunk.Chunk // only used by the coprocessor request sender. nextKey kv.Key taskDone bool ) if w.copReqSenderPool != nil { - idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange) + idxRecords, copChunk, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange) + defer w.copReqSenderPool.recycleIdxRecordsAndChunk(idxRecords, copChunk) } else { idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange) } @@ -1567,10 +1570,6 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC taskCtx.addedCount++ } - if w.copReqSenderPool != nil { - w.copReqSenderPool.recycleIdxRecords(idxRecords) - } - return nil }) logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 6eecd9c0d685e..950cd91404bc6 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -46,7 +47,10 @@ import ( // It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range. const copReadBatchFactor = 10 -func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) { +// copReadConcurrencyFactor is the factor of concurrency of coprocessor read. +const copReadConcurrencyFactor = 10 + +func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for { @@ -55,10 +59,10 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) if !ok { logutil.BgLogger().Info("[ddl-ingest] cop-response channel is closed", zap.Int("id", handleRange.id), zap.String("task", handleRange.String())) - return nil, handleRange.endKey, true, nil + return nil, nil, handleRange.endKey, true, nil } if rs.err != nil { - return nil, handleRange.startKey, false, rs.err + return nil, nil, handleRange.startKey, false, rs.err } if rs.done { logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", @@ -69,15 +73,15 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) logutil.BgLogger().Info("[ddl-ingest] task is found in results", zap.Int("id", handleRange.id), zap.String("task", handleRange.String())) c.results.Delete(handleRange.id) - return rs.records, handleRange.endKey, true, nil + return rs.records, rs.chunk, handleRange.endKey, true, nil } - return rs.records, handleRange.startKey, false, nil + return rs.records, rs.chunk, handleRange.startKey, false, nil case <-ticker.C: logutil.BgLogger().Info("[ddl-ingest] cop-request result channel is empty", zap.Int("id", handleRange.id)) if _, found := c.results.Load(handleRange.id); found { c.results.Delete(handleRange.id) - return nil, handleRange.endKey, true, nil + return nil, nil, handleRange.endKey, true, nil } } } @@ -95,9 +99,8 @@ type copReqSenderPool struct { senders []*copReqSender wg sync.WaitGroup - idxBufPool sync.Pool // []*indexRecord - srcChkPool sync.Pool // *chunk.Chunk - binding generic.SyncMap[*[]*indexRecord, *chunk.Chunk] + idxBufPool chan []*indexRecord + srcChkPool chan *chunk.Chunk } type copReqSender struct { @@ -132,35 +135,37 @@ func (c *copReqSender) run() { idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec) if err != nil { p.resultsCh <- idxRecResult{id: task.id, err: err} + p.recycleIdxRecordsAndChunk(idxRec, srcChk) + terror.Call(rs.Close) + _ = rs.Close() return } total += len(idxRec) - p.resultsCh <- idxRecResult{id: task.id, records: idxRec, done: done, total: total} + p.resultsCh <- idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total} } + terror.Call(rs.Close) } } func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool { + poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor) + idxBufPool := make(chan []*indexRecord, poolSize) + srcChkPool := make(chan *chunk.Chunk, poolSize) + for i := 0; i < poolSize; i++ { + idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize()) + srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize())) + } return &copReqSenderPool{ - tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), - resultsCh: make(chan idxRecResult, backfillTaskChanSize), - results: generic.NewSyncMap[int, struct{}](10), - ctx: ctx, - copCtx: copCtx, - startTS: startTS, - senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), - wg: sync.WaitGroup{}, - idxBufPool: sync.Pool{ - New: func() any { - return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor) - }, - }, - srcChkPool: sync.Pool{ - New: func() any { - return chunk.NewChunkWithCapacity(copCtx.fieldTps, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor) - }, - }, - binding: generic.NewSyncMap[*[]*indexRecord, *chunk.Chunk](4), + tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), + resultsCh: make(chan idxRecResult, backfillTaskChanSize), + results: generic.NewSyncMap[int, struct{}](10), + ctx: ctx, + copCtx: copCtx, + startTS: startTS, + senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), + wg: sync.WaitGroup{}, + idxBufPool: idxBufPool, + srcChkPool: srcChkPool, } } @@ -195,30 +200,41 @@ func (c *copReqSenderPool) close() { for _, w := range c.senders { w.cancel() } + cleanupWg := util.WaitGroupWrapper{} + cleanupWg.Run(c.drainResults) + // Wait for all cop-req senders to exit. c.wg.Wait() close(c.resultsCh) + cleanupWg.Wait() + close(c.idxBufPool) + close(c.srcChkPool) +} + +func (c *copReqSenderPool) drainResults() { + // Consume the rest results because the writers are inactive anymore. + for rs := range c.resultsCh { + c.recycleIdxRecordsAndChunk(rs.records, rs.chunk) + } } func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) { - ir, chk := c.idxBufPool.Get().([]*indexRecord), c.srcChkPool.Get().(*chunk.Chunk) + ir := <-c.idxBufPool + chk := <-c.srcChkPool newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor if chk.Capacity() != newCap { chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap) } chk.Reset() - c.binding.Store(&ir, chk) return ir[:0], chk } -// recycleIdxRecords puts the index record slice back to the pool for reuse. -func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) { - if idxRecs == nil { +// recycleIdxRecordsAndChunk puts the index record slice and the chunk back to the pool for reuse. +func (c *copReqSenderPool) recycleIdxRecordsAndChunk(idxRecs []*indexRecord, chk *chunk.Chunk) { + if idxRecs == nil || chk == nil { return } - c.idxBufPool.Put(idxRecs[:0]) - if bindingChunk, ok := c.binding.Load(&idxRecs); ok { - c.srcChkPool.Put(bindingChunk) - } + c.idxBufPool <- idxRecs + c.srcChkPool <- chk } // copContext contains the information that is needed when building a coprocessor request. @@ -449,7 +465,8 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C datumBuf := make([]types.Datum, 0, len(offsets)) for _, offset := range offsets { c := expCols[offset] - datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType())) + rowDt := row.GetDatum(offset, c.GetType()) + datumBuf = append(datumBuf, rowDt) } return datumBuf } @@ -470,6 +487,7 @@ func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, type idxRecResult struct { id int records []*indexRecord + chunk *chunk.Chunk err error done bool total int