Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: reuse chunk for copr-read and check context done #39473

Merged
merged 23 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7e32f5b
ddl: clone datum for copr-read and check context done
tangenta Nov 29, 2022
f483811
use sized channel instead of sync.pool to control the memory
tangenta Nov 30, 2022
8bda001
Merge branch 'master' into add-index-clone-dt
tangenta Nov 30, 2022
3dc7d19
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Nov 30, 2022
360f4ad
close the pools
tangenta Nov 30, 2022
e583a9c
Merge branch 'master' into add-index-clone-dt
hawkingrei Nov 30, 2022
0b9f104
add cancel to copReqSender pool and close result properly
tangenta Nov 30, 2022
2ee5db5
Merge branch 'master' into add-index-clone-dt
hawkingrei Nov 30, 2022
346a34d
recycleIdxRecordsAndChunk if an error occurred
tangenta Nov 30, 2022
47ad59a
drain the result when closeing the copReqSenderPool
tangenta Nov 30, 2022
cc2d0bf
Merge branch 'master' into add-index-clone-dt
tangenta Nov 30, 2022
66e2bd4
close result set
tangenta Dec 1, 2022
fd2af9b
use terror.Call to handle error
tangenta Dec 1, 2022
9a4d264
Merge branch 'master' into add-index-clone-dt
tangenta Dec 1, 2022
c6f6f4e
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
637c9d1
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
2a5620d
Merge branch 'master' into add-index-clone-dt
tangenta Dec 1, 2022
434d568
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
5b1a550
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
0cae781
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
11f4f6a
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
bafe0cc
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
bcf7f34
Merge branch 'master' into add-index-clone-dt
hawkingrei Dec 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, done, err := pool.fetchRowColValsFromCop(*task)
idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task)
pool.close()
return idxRec, done, err
}
Expand Down
9 changes: 4 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
Expand Down Expand Up @@ -1498,11 +1499,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC

var (
idxRecords []*indexRecord
copChunk *chunk.Chunk // only used by the coprocessor request sender.
nextKey kv.Key
taskDone bool
)
if w.copReqSenderPool != nil {
idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
idxRecords, copChunk, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
defer w.copReqSenderPool.recycleIdxRecordsAndChunk(idxRecords, copChunk)
} else {
idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange)
}
Expand Down Expand Up @@ -1567,10 +1570,6 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
taskCtx.addedCount++
}

if w.copReqSenderPool != nil {
w.copReqSenderPool.recycleIdxRecords(idxRecords)
}

return nil
})
logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000)
Expand Down
96 changes: 57 additions & 39 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -46,7 +47,10 @@ import (
// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range.
const copReadBatchFactor = 10

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) {
// copReadConcurrencyFactor is the factor of concurrency of coprocessor read.
const copReadConcurrencyFactor = 10

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
Expand All @@ -55,10 +59,10 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask)
if !ok {
logutil.BgLogger().Info("[ddl-ingest] cop-response channel is closed",
zap.Int("id", handleRange.id), zap.String("task", handleRange.String()))
return nil, handleRange.endKey, true, nil
return nil, nil, handleRange.endKey, true, nil
}
if rs.err != nil {
return nil, handleRange.startKey, false, rs.err
return nil, nil, handleRange.startKey, false, rs.err
}
if rs.done {
logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task",
Expand All @@ -69,15 +73,15 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask)
logutil.BgLogger().Info("[ddl-ingest] task is found in results",
zap.Int("id", handleRange.id), zap.String("task", handleRange.String()))
c.results.Delete(handleRange.id)
return rs.records, handleRange.endKey, true, nil
return rs.records, rs.chunk, handleRange.endKey, true, nil
}
return rs.records, handleRange.startKey, false, nil
return rs.records, rs.chunk, handleRange.startKey, false, nil
case <-ticker.C:
logutil.BgLogger().Info("[ddl-ingest] cop-request result channel is empty",
zap.Int("id", handleRange.id))
if _, found := c.results.Load(handleRange.id); found {
c.results.Delete(handleRange.id)
return nil, handleRange.endKey, true, nil
return nil, nil, handleRange.endKey, true, nil
}
}
}
Expand All @@ -95,9 +99,8 @@ type copReqSenderPool struct {
senders []*copReqSender
wg sync.WaitGroup

idxBufPool sync.Pool // []*indexRecord
srcChkPool sync.Pool // *chunk.Chunk
binding generic.SyncMap[*[]*indexRecord, *chunk.Chunk]
idxBufPool chan []*indexRecord
srcChkPool chan *chunk.Chunk
}

type copReqSender struct {
Expand Down Expand Up @@ -132,35 +135,37 @@ func (c *copReqSender) run() {
idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.recycleIdxRecordsAndChunk(idxRec, srcChk)
terror.Call(rs.Close)
_ = rs.Close()
return
}
total += len(idxRec)
p.resultsCh <- idxRecResult{id: task.id, records: idxRec, done: done, total: total}
p.resultsCh <- idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total}
}
terror.Call(rs.Close)
}
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor)
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize())
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize()))
}
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultsCh: make(chan idxRecResult, backfillTaskChanSize),
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: sync.Pool{
New: func() any {
return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
srcChkPool: sync.Pool{
New: func() any {
return chunk.NewChunkWithCapacity(copCtx.fieldTps, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
binding: generic.NewSyncMap[*[]*indexRecord, *chunk.Chunk](4),
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultsCh: make(chan idxRecResult, backfillTaskChanSize),
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: idxBufPool,
srcChkPool: srcChkPool,
}
}

Expand Down Expand Up @@ -195,30 +200,41 @@ func (c *copReqSenderPool) close() {
for _, w := range c.senders {
w.cancel()
}
cleanupWg := util.WaitGroupWrapper{}
cleanupWg.Run(c.drainResults)
// Wait for all cop-req senders to exit.
c.wg.Wait()
close(c.resultsCh)
cleanupWg.Wait()
close(c.idxBufPool)
close(c.srcChkPool)
}

func (c *copReqSenderPool) drainResults() {
// Consume the rest results because the writers are inactive anymore.
for rs := range c.resultsCh {
c.recycleIdxRecordsAndChunk(rs.records, rs.chunk)
}
}

func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) {
ir, chk := c.idxBufPool.Get().([]*indexRecord), c.srcChkPool.Get().(*chunk.Chunk)
ir := <-c.idxBufPool
chk := <-c.srcChkPool
newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
chk.Reset()
c.binding.Store(&ir, chk)
return ir[:0], chk
}

// recycleIdxRecords puts the index record slice back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) {
if idxRecs == nil {
// recycleIdxRecordsAndChunk puts the index record slice and the chunk back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecordsAndChunk(idxRecs []*indexRecord, chk *chunk.Chunk) {
if idxRecs == nil || chk == nil {
return
}
c.idxBufPool.Put(idxRecs[:0])
if bindingChunk, ok := c.binding.Load(&idxRecs); ok {
c.srcChkPool.Put(bindingChunk)
}
c.idxBufPool <- idxRecs
c.srcChkPool <- chk
}

// copContext contains the information that is needed when building a coprocessor request.
Expand Down Expand Up @@ -449,7 +465,8 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C
datumBuf := make([]types.Datum, 0, len(offsets))
for _, offset := range offsets {
c := expCols[offset]
datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType()))
rowDt := row.GetDatum(offset, c.GetType())
datumBuf = append(datumBuf, rowDt)
}
return datumBuf
}
Expand All @@ -470,6 +487,7 @@ func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
type idxRecResult struct {
id int
records []*indexRecord
chunk *chunk.Chunk
err error
done bool
total int
Expand Down