Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: make read and write async during adding index #39249

Merged
merged 14 commits into from
Nov 23, 2022
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ go_library(
"//util/domainutil",
"//util/filter",
"//util/gcutil",
"//util/generic",
"//util/hack",
"//util/logutil",
"//util/mathutil",
Expand Down
66 changes: 58 additions & 8 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := hex.EncodeToString(r.startKey)
endKey := hex.EncodeToString(r.endKey)
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
rc.increaseRowCount(int64(taskCtx.addedCount))
rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)

if num := result.scanCount - lastLogCount; num >= 30000 {
if num := result.scanCount - lastLogCount; num >= 90000 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 90000 row print log, it will be how much of interval of time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, throughput of adding index using ingest is about 200k rows per second.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we set 200k rows print one log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 200k is too large for the old implementation, the average throughput is 20k rows per second.

Copy link
Member

@hawkingrei hawkingrei Nov 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a sliding window to static the throughput. so we can adapt the rows for machine performance.

Lucky, I have create a sliding windows at util/window at this pr

lastLogCount = result.scanCount
logutil.BgLogger().Info("[ddl] backfill worker back fill index",
zap.Int("worker ID", w.id),
Expand Down Expand Up @@ -439,6 +439,9 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
batchTasks []*reorgBackfillTask) error {
reorgInfo := scheduler.reorgInfo
for _, task := range batchTasks {
if scheduler.copReqSenderPool != nil {
scheduler.copReqSenderPool.sendTask(task)
}
scheduler.taskCh <- task
Comment on lines +442 to 445
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the task be sent to two different channels?

Copy link
Contributor Author

@tangenta tangenta Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. This is because I want to keep the old backfill implementation and introduce as few changes as possible:

Old backfill:

       scheduler.taskCh -> backfill worker -> read rows -> write indexes -> resultCh

New backfill in this PR:

       scheduler.taskCh -> backfill worker -> write indexes -> resultCh
                                             ^
                                             |
copReqSenderPool.taskCh -> cop-req sender -> read rows

If we only use one channel, then it will be

       scheduler.taskCh -> backfill worker -> write indexes -> resultCh
                        |                    ^
                        v                    |
                        -> cop-req sender -> read rows

The backfill worker cannot process the next task until the current task is complete. However, this is not the behavior we want in the cop-request sender pool. To gain better performance, the sender pool should be always filled with tasks.

Besides, for one channel solution, it is not easy to set the concurrency of backfill worker and cop-request sender separately.

}

Expand Down Expand Up @@ -605,6 +608,8 @@ type backfillScheduler struct {

taskCh chan *reorgBackfillTask
resultCh chan *backfillResult

copReqSenderPool *copReqSenderPool // for add index in ingest way.
}

const backfillTaskChanSize = 1024
Expand Down Expand Up @@ -658,14 +663,19 @@ func (b *backfillScheduler) workerSize() int {
}

func (b *backfillScheduler) adjustWorkerSize() error {
b.initCopReqSenderPool()
reorgInfo := b.reorgInfo
job := reorgInfo.Job
jc := b.jobCtx
if err := loadDDLReorgVars(b.ctx, b.sessPool); err != nil {
logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err))
}
workerCnt := int(variable.GetDDLReorgWorkerCounter())
workerCnt = mathutil.Min(workerCnt, b.maxSize)
if b.copReqSenderPool != nil {
workerCnt = mathutil.Min(workerCnt/2+1, b.maxSize)
} else {
workerCnt = mathutil.Min(workerCnt, b.maxSize)
}
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := b.newSessCtx()
Expand All @@ -680,8 +690,12 @@ func (b *backfillScheduler) adjustWorkerSize() error {
case typeAddIndexWorker:
idxWorker, err := newAddIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc, job)
if err != nil {
return b.handleCreateBackfillWorkerErr(err)
if b.canSkipError(err) {
continue
}
return err
}
idxWorker.copReqSenderPool = b.copReqSenderPool
worker, runner = idxWorker, idxWorker.backfillWorker
case typeAddIndexMergeTmpWorker:
tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, b.tbl, reorgInfo, jc)
Expand All @@ -708,20 +722,56 @@ func (b *backfillScheduler) adjustWorkerSize() error {
b.workers = b.workers[:workerCnt]
closeBackfillWorkers(workers)
}
if b.copReqSenderPool != nil {
b.copReqSenderPool.adjustSize(len(b.workers))
}
return injectCheckBackfillWorkerNum(len(b.workers))
}

func (b *backfillScheduler) handleCreateBackfillWorkerErr(err error) error {
if len(b.workers) == 0 {
return errors.Trace(err)
func (b *backfillScheduler) initCopReqSenderPool() {
if b.tp != typeAddIndexWorker || b.reorgInfo.Job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge ||
b.copReqSenderPool != nil || len(b.workers) > 0 {
return
}
indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID)
if indexInfo == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender",
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return
}
sessCtx, err := b.newSessCtx()
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if copCtx == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender")
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver)
}

func (b *backfillScheduler) canSkipError(err error) bool {
if len(b.workers) > 0 {
// The error can be skipped because the rest workers can handle the tasks.
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
return true
}
logutil.BgLogger().Warn("[ddl] create add index backfill worker failed",
zap.Int("current worker count", len(b.workers)),
zap.Int64("job ID", b.reorgInfo.ID), zap.Error(err))
return nil
return false
}

func (b *backfillScheduler) Close() {
if b.copReqSenderPool != nil {
b.copReqSenderPool.close()
}
closeBackfillWorkers(b.workers)
close(b.taskCh)
close(b.resultCh)
Expand Down
28 changes: 21 additions & 7 deletions ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,36 @@
package ddl

import (
"context"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
)

func SetBatchInsertDeleteRangeSize(i int) {
batchInsertDeleteRangeSize = i
}

var (
FetchRowsFromCop4Test = fetchRowsFromCop
NewCopContext4Test = newCopContext
)
var NewCopContext4Test = newCopContext

type (
IndexRecord4Test = *indexRecord
)
func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64,
batchSize int) ([]*indexRecord, bool, error) {
variable.SetDDLReorgBatchSize(int32(batchSize))
task := &reorgBackfillTask{
id: 1,
startKey: startKey,
endKey: endKey,
}
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, done, err := pool.fetchRowColValsFromCop(*task)
pool.close()
return idxRec, done, err
}

type IndexRecord4Test = *indexRecord

func (i IndexRecord4Test) GetHandle() kv.Handle {
return i.handle
Expand Down
18 changes: 10 additions & 8 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
if !ok && job.SnapshotVer != 0 {
// The owner is crashed or changed, we need to restart the backfill.
job.SnapshotVer = 0
job.RowCount = 0
return false, ver, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
Expand Down Expand Up @@ -1179,9 +1180,9 @@ type baseIndexWorker struct {

type addIndexWorker struct {
baseIndexWorker
index table.Index
copCtx *copContext
writerCtx *ingest.WriterContext
index table.Index
writerCtx *ingest.WriterContext
copReqSenderPool *copReqSenderPool

// The following attributes are used to reduce memory allocation.
idxKeyBufs [][]byte
Expand All @@ -1201,7 +1202,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)

var lwCtx *ingest.WriterContext
var copCtx *copContext
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
Expand All @@ -1215,7 +1215,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
if err != nil {
return nil, err
}
copCtx = newCopContext(t.Meta(), indexInfo, sessCtx)
}

return &addIndexWorker{
Expand All @@ -1230,7 +1229,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
jobContext: jc,
},
index: index,
copCtx: copCtx,
writerCtx: lwCtx,
}, nil
}
Expand Down Expand Up @@ -1501,8 +1499,8 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
nextKey kv.Key
taskDone bool
)
if w.copCtx != nil {
idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange)
if w.copReqSenderPool != nil {
idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
} else {
idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange)
}
Expand Down Expand Up @@ -1567,6 +1565,10 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
taskCtx.addedCount++
}

if w.copReqSenderPool != nil {
w.copReqSenderPool.recycleIdxRecords(idxRecords)
}

return nil
})
logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000)
Expand Down
Loading