Skip to content

Commit

Permalink
ddl: make read and write async during adding index (#39249)
Browse files Browse the repository at this point in the history
ref #35983
  • Loading branch information
tangenta authored Nov 23, 2022
1 parent 8fc4535 commit c9531d4
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 67 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ go_library(
"//util/domainutil",
"//util/filter",
"//util/gcutil",
"//util/generic",
"//util/hack",
"//util/logutil",
"//util/mathutil",
Expand Down
66 changes: 58 additions & 8 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := hex.EncodeToString(r.startKey)
endKey := hex.EncodeToString(r.endKey)
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
rc.increaseRowCount(int64(taskCtx.addedCount))
rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)

if num := result.scanCount - lastLogCount; num >= 30000 {
if num := result.scanCount - lastLogCount; num >= 90000 {
lastLogCount = result.scanCount
logutil.BgLogger().Info("[ddl] backfill worker back fill index",
zap.Int("worker ID", w.id),
Expand Down Expand Up @@ -439,6 +439,9 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
batchTasks []*reorgBackfillTask) error {
reorgInfo := scheduler.reorgInfo
for _, task := range batchTasks {
if scheduler.copReqSenderPool != nil {
scheduler.copReqSenderPool.sendTask(task)
}
scheduler.taskCh <- task
}

Expand Down Expand Up @@ -605,6 +608,8 @@ type backfillScheduler struct {

taskCh chan *reorgBackfillTask
resultCh chan *backfillResult

copReqSenderPool *copReqSenderPool // for add index in ingest way.
}

const backfillTaskChanSize = 1024
Expand Down Expand Up @@ -658,14 +663,19 @@ func (b *backfillScheduler) workerSize() int {
}

func (b *backfillScheduler) adjustWorkerSize() error {
b.initCopReqSenderPool()
reorgInfo := b.reorgInfo
job := reorgInfo.Job
jc := b.jobCtx
if err := loadDDLReorgVars(b.ctx, b.sessPool); err != nil {
logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err))
}
workerCnt := int(variable.GetDDLReorgWorkerCounter())
workerCnt = mathutil.Min(workerCnt, b.maxSize)
if b.copReqSenderPool != nil {
workerCnt = mathutil.Min(workerCnt/2+1, b.maxSize)
} else {
workerCnt = mathutil.Min(workerCnt, b.maxSize)
}
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := b.newSessCtx()
Expand All @@ -680,8 +690,12 @@ func (b *backfillScheduler) adjustWorkerSize() error {
case typeAddIndexWorker:
idxWorker, err := newAddIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc, job)
if err != nil {
return b.handleCreateBackfillWorkerErr(err)
if b.canSkipError(err) {
continue
}
return err
}
idxWorker.copReqSenderPool = b.copReqSenderPool
worker, runner = idxWorker, idxWorker.backfillWorker
case typeAddIndexMergeTmpWorker:
tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, b.tbl, reorgInfo, jc)
Expand All @@ -708,20 +722,56 @@ func (b *backfillScheduler) adjustWorkerSize() error {
b.workers = b.workers[:workerCnt]
closeBackfillWorkers(workers)
}
if b.copReqSenderPool != nil {
b.copReqSenderPool.adjustSize(len(b.workers))
}
return injectCheckBackfillWorkerNum(len(b.workers))
}

func (b *backfillScheduler) handleCreateBackfillWorkerErr(err error) error {
if len(b.workers) == 0 {
return errors.Trace(err)
func (b *backfillScheduler) initCopReqSenderPool() {
if b.tp != typeAddIndexWorker || b.reorgInfo.Job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge ||
b.copReqSenderPool != nil || len(b.workers) > 0 {
return
}
indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID)
if indexInfo == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender",
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return
}
sessCtx, err := b.newSessCtx()
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if copCtx == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender")
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver)
}

func (b *backfillScheduler) canSkipError(err error) bool {
if len(b.workers) > 0 {
// The error can be skipped because the rest workers can handle the tasks.
return true
}
logutil.BgLogger().Warn("[ddl] create add index backfill worker failed",
zap.Int("current worker count", len(b.workers)),
zap.Int64("job ID", b.reorgInfo.ID), zap.Error(err))
return nil
return false
}

func (b *backfillScheduler) Close() {
if b.copReqSenderPool != nil {
b.copReqSenderPool.close()
}
closeBackfillWorkers(b.workers)
close(b.taskCh)
close(b.resultCh)
Expand Down
28 changes: 21 additions & 7 deletions ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,36 @@
package ddl

import (
"context"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
)

func SetBatchInsertDeleteRangeSize(i int) {
batchInsertDeleteRangeSize = i
}

var (
FetchRowsFromCop4Test = fetchRowsFromCop
NewCopContext4Test = newCopContext
)
var NewCopContext4Test = newCopContext

type (
IndexRecord4Test = *indexRecord
)
func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64,
batchSize int) ([]*indexRecord, bool, error) {
variable.SetDDLReorgBatchSize(int32(batchSize))
task := &reorgBackfillTask{
id: 1,
startKey: startKey,
endKey: endKey,
}
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, done, err := pool.fetchRowColValsFromCop(*task)
pool.close()
return idxRec, done, err
}

type IndexRecord4Test = *indexRecord

func (i IndexRecord4Test) GetHandle() kv.Handle {
return i.handle
Expand Down
18 changes: 10 additions & 8 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
if !ok && job.SnapshotVer != 0 {
// The owner is crashed or changed, we need to restart the backfill.
job.SnapshotVer = 0
job.RowCount = 0
return false, ver, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
Expand Down Expand Up @@ -1179,9 +1180,9 @@ type baseIndexWorker struct {

type addIndexWorker struct {
baseIndexWorker
index table.Index
copCtx *copContext
writerCtx *ingest.WriterContext
index table.Index
writerCtx *ingest.WriterContext
copReqSenderPool *copReqSenderPool

// The following attributes are used to reduce memory allocation.
idxKeyBufs [][]byte
Expand All @@ -1201,7 +1202,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)

var lwCtx *ingest.WriterContext
var copCtx *copContext
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
Expand All @@ -1215,7 +1215,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
if err != nil {
return nil, err
}
copCtx = newCopContext(t.Meta(), indexInfo, sessCtx)
}

return &addIndexWorker{
Expand All @@ -1230,7 +1229,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
jobContext: jc,
},
index: index,
copCtx: copCtx,
writerCtx: lwCtx,
}, nil
}
Expand Down Expand Up @@ -1501,8 +1499,8 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
nextKey kv.Key
taskDone bool
)
if w.copCtx != nil {
idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange)
if w.copReqSenderPool != nil {
idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
} else {
idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange)
}
Expand Down Expand Up @@ -1567,6 +1565,10 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
taskCtx.addedCount++
}

if w.copReqSenderPool != nil {
w.copReqSenderPool.recycleIdxRecords(idxRecords)
}

return nil
})
logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000)
Expand Down
Loading

0 comments on commit c9531d4

Please sign in to comment.