From 0b7ed6231d510be5887ed7530bfbb2a218a7e316 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 11 Dec 2024 15:59:54 +0800 Subject: [PATCH] ddl: skip getting actual end key for each range in ingest mode (#54143) (#56748) close pingcap/tidb#45847, close pingcap/tidb#54147 --- pkg/ddl/backfilling.go | 443 ++++++++++-------- pkg/ddl/backfilling_operators.go | 7 +- pkg/ddl/backfilling_scheduler.go | 101 ++-- pkg/ddl/column.go | 6 +- pkg/ddl/ddl.go | 2 + pkg/ddl/index_cop.go | 12 +- pkg/ddl/reorg.go | 7 +- .../pool/workerpool/workerpool.go | 5 +- 8 files changed, 315 insertions(+), 268 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 33155f01e07a7..9367ac8c7e0d7 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -330,31 +330,44 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, return result } +func (w *backfillWorker) sendResult(result *backfillResult) { + select { + case <-w.ctx.Done(): + case w.resultCh <- result: + } +} + func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { - logutil.BgLogger().Info("backfill worker start", zap.String("category", "ddl"), zap.Stringer("worker", w)) - var curTaskID int + logger := ddlLogger.With(zap.Stringer("worker", w), zap.Int64("jobID", job.ID)) + var ( + curTaskID int + task *reorgBackfillTask + ok bool + ) + defer w.wg.Done() defer util.Recover(metrics.LabelDDL, "backfillWorker.run", func() { - w.resultCh <- &backfillResult{taskID: curTaskID, err: dbterror.ErrReorgPanic} + w.sendResult(&backfillResult{taskID: curTaskID, err: dbterror.ErrReorgPanic}) }, false) for { - if util.HasCancelled(w.ctx) { - logutil.BgLogger().Info("backfill worker exit on context done", zap.String("category", "ddl"), zap.Stringer("worker", w)) + select { + case <-w.ctx.Done(): + logger.Info("backfill worker exit on context done") return + case task, ok = <-w.taskCh: } - task, more := <-w.taskCh - if !more { - logutil.BgLogger().Info("backfill worker exit", zap.String("category", "ddl"), zap.Stringer("worker", w)) + if !ok { + logger.Info("backfill worker exit") return } curTaskID = task.id d.setDDLLabelForTopSQL(job.ID, job.Query) - logutil.BgLogger().Debug("backfill worker got task", zap.String("category", "ddl"), zap.Int("workerID", w.GetCtx().id), zap.String("task", task.String())) + logger.Debug("backfill worker got task", zap.Int("workerID", w.GetCtx().id), zap.Stringer("task", task)) failpoint.Inject("mockBackfillRunErr", func() { if w.GetCtx().id == 0 { result := &backfillResult{taskID: task.id, addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")} - w.resultCh <- result + w.sendResult(result) failpoint.Continue() } }) @@ -371,19 +384,58 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // Change the batch size dynamically. w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task, bf) - w.resultCh <- result + w.sendResult(result) + if result.err != nil { - logutil.BgLogger().Info("backfill worker exit on error", zap.String("category", "ddl"), - zap.Stringer("worker", w), zap.Error(result.err)) + logger.Info("backfill worker exit on error", + zap.Error(result.err)) return } } } +func splitAndValidateTableRanges( + ctx context.Context, + t table.PhysicalTable, + store kv.Storage, + startKey, endKey kv.Key, + limit int, +) ([]kv.KeyRange, error) { + ranges, err := splitTableRanges(ctx, t, store, startKey, endKey, limit) + if err != nil { + return nil, err + } + return validateTableRanges(ranges, startKey, endKey) +} + +func validateTableRanges(ranges []kv.KeyRange, start, end kv.Key) ([]kv.KeyRange, error) { + for i, r := range ranges { + if len(r.StartKey) == 0 { + if i != 0 { + return nil, errors.Errorf("get empty start key in the middle of ranges") + } + r.StartKey = start + } + if len(r.EndKey) == 0 { + if i != len(ranges)-1 { + return nil, errors.Errorf("get empty end key in the middle of ranges") + } + r.EndKey = end + } + } + return ranges, nil +} + // splitTableRanges uses PD region's key ranges to split the backfilling table key range space, // to speed up backfilling data in table with disperse handle. // The `t` should be a non-partitioned table or a partition. -func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key, limit int) ([]kv.KeyRange, error) { +func splitTableRanges( + ctx context.Context, + t table.PhysicalTable, + store kv.Storage, + startKey, endKey kv.Key, + limit int, +) ([]kv.KeyRange, error) { logutil.BgLogger().Info("split table range from PD", zap.String("category", "ddl"), zap.Int64("physicalTableID", t.GetPhysicalID()), zap.String("start key", hex.EncodeToString(startKey)), @@ -401,7 +453,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey } maxSleep := 10000 // ms - bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil) + bo := backoff.NewBackofferWithVars(ctx, maxSleep, nil) rc := copr.NewRegionCache(s.GetRegionCache()) ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}, limit) if err != nil { @@ -414,161 +466,21 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey return ranges, nil } -type resultConsumer struct { - dc *ddlCtx - wg *sync.WaitGroup - err error - hasError *atomic.Bool - reorgInfo *reorgInfo // reorgInfo is used to update the reorg handle. - sessPool *sess.Pool // sessPool is used to get the session to update the reorg handle. - distribute bool -} - -func newResultConsumer(dc *ddlCtx, reorgInfo *reorgInfo, sessPool *sess.Pool, distribute bool) *resultConsumer { - return &resultConsumer{ - dc: dc, - wg: &sync.WaitGroup{}, - hasError: &atomic.Bool{}, - reorgInfo: reorgInfo, - sessPool: sessPool, - distribute: distribute, - } -} - -func (s *resultConsumer) run(scheduler backfillScheduler, start kv.Key, totalAddedCount *int64) { - s.wg.Add(1) - go func() { - reorgInfo := s.reorgInfo - err := consumeResults(scheduler, s, start, totalAddedCount) - if err != nil { - logutil.BgLogger().Warn("backfill worker handle tasks failed", zap.String("category", "ddl"), - zap.Int64("total added count", *totalAddedCount), - zap.String("start key", hex.EncodeToString(start)), - zap.String("task failed error", err.Error())) - s.err = err - } else { - logutil.BgLogger().Info("backfill workers successfully processed", zap.String("category", "ddl"), - zap.Stringer("element", reorgInfo.currElement), - zap.Int64("total added count", *totalAddedCount), - zap.String("start key", hex.EncodeToString(start))) - } - s.wg.Done() - }() -} - -func (s *resultConsumer) getResult() error { - s.wg.Wait() - return s.err -} - -func (s *resultConsumer) shouldAbort() bool { - return s.hasError.Load() -} - -func consumeResults(scheduler backfillScheduler, consumer *resultConsumer, start kv.Key, totalAddedCount *int64) error { - keeper := newDoneTaskKeeper(start) - handledTaskCnt := 0 - var firstErr error - for { - result, ok := scheduler.receiveResult() - if !ok { - return firstErr - } - err := handleOneResult(result, scheduler, consumer, keeper, totalAddedCount, handledTaskCnt) - handledTaskCnt++ - if err != nil && firstErr == nil { - consumer.hasError.Store(true) - firstErr = err - } - } -} - -func handleOneResult(result *backfillResult, scheduler backfillScheduler, consumer *resultConsumer, - keeper *doneTaskKeeper, totalAddedCount *int64, taskSeq int) error { - reorgInfo := consumer.reorgInfo - if result.err != nil { - logutil.BgLogger().Warn("backfill worker failed", zap.String("category", "ddl"), - zap.Int64("job ID", reorgInfo.ID), - zap.String("result next key", hex.EncodeToString(result.nextKey)), - zap.Error(result.err)) - scheduler.drainTasks() // Make it quit early. - return result.err - } - if result.totalCount > 0 { - *totalAddedCount = int64(result.totalCount) - } else { - *totalAddedCount += int64(result.addedCount) - } - if !consumer.distribute { - reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID) - reorgCtx.setRowCount(*totalAddedCount) - } - keeper.updateNextKey(result.taskID, result.nextKey) - if taskSeq%(scheduler.currentWorkerSize()*4) == 0 { - if !consumer.distribute { - err := consumer.dc.isReorgRunnable(reorgInfo.ID, consumer.distribute) - if err != nil { - logutil.BgLogger().Warn("backfill worker is not runnable", zap.String("category", "ddl"), zap.Error(err)) - scheduler.drainTasks() // Make it quit early. - return err - } - failpoint.Inject("MockGetIndexRecordErr", func() { - // Make sure this job didn't failed because by the "Write conflict" error. - if dbterror.ErrNotOwner.Equal(err) { - time.Sleep(50 * time.Millisecond) - } - }) - err = reorgInfo.UpdateReorgMeta(keeper.nextKey, consumer.sessPool) - if err != nil { - logutil.BgLogger().Warn("update reorg meta failed", zap.String("category", "ddl"), - zap.Int64("job ID", reorgInfo.ID), zap.Error(err)) - } - } - // We try to adjust the worker size regularly to reduce - // the overhead of loading the DDL related global variables. - err := scheduler.adjustWorkerSize() - if err != nil { - logutil.BgLogger().Warn("cannot adjust backfill worker size", zap.String("category", "ddl"), - zap.Int64("job ID", reorgInfo.ID), zap.Error(err)) - } - } - return nil -} - -func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, - taskIDAlloc *taskIDAllocator) []*reorgBackfillTask { +func getBatchTasks( + t table.Table, + reorgInfo *reorgInfo, + kvRanges []kv.KeyRange, + taskIDAlloc *taskIDAllocator, + bfWorkerTp backfillerType, +) []*reorgBackfillTask { batchTasks := make([]*reorgBackfillTask, 0, len(kvRanges)) - var prefix kv.Key - if reorgInfo.mergingTmpIdx { - prefix = t.IndexPrefix() - } else { - prefix = t.RecordPrefix() - } - // Build reorg tasks. - job := reorgInfo.Job //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) - jobCtx := reorgInfo.NewJobContext() - for _, keyRange := range kvRanges { + for _, r := range kvRanges { taskID := taskIDAlloc.alloc() - startKey := keyRange.StartKey - if len(startKey) == 0 { - startKey = prefix - } - endKey := keyRange.EndKey - if len(endKey) == 0 { - endKey = prefix.PrefixNext() - } - endK, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, startKey, endKey) - if err != nil { - logutil.BgLogger().Info("get backfill range task, get reverse key failed", zap.String("category", "ddl"), zap.Error(err)) - } else { - logutil.BgLogger().Info("get backfill range task, change end key", zap.String("category", "ddl"), - zap.Int("id", taskID), zap.Int64("pTbl", phyTbl.GetPhysicalID()), - zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) - endKey = endK - } - + startKey := r.StartKey + endKey := r.EndKey + endKey = getActualEndKey(t, reorgInfo, bfWorkerTp, startKey, endKey, taskID) task := &reorgBackfillTask{ id: taskID, jobID: reorgInfo.Job.ID, @@ -582,16 +494,62 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, return batchTasks } +func getActualEndKey( + t table.Table, + reorgInfo *reorgInfo, + bfTp backfillerType, + rangeStart, rangeEnd kv.Key, + taskID int, +) kv.Key { + job := reorgInfo.Job + //nolint:forcetypeassert + phyTbl := t.(table.PhysicalTable) + + if bfTp == typeAddIndexMergeTmpWorker { + // Temp Index data does not grow infinitely, we can return the whole range + // and IndexMergeTmpWorker should still be finished in a bounded time. + return rangeEnd + } + if bfTp == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { + // Ingest worker uses coprocessor to read table data. It is fast enough, + // we don't need to get the actual end key of this range. + return rangeEnd + } + + // Otherwise to avoid the future data written to key range of [backfillChunkEndKey, rangeEnd) and + // backfill worker can't catch up, we shrink the end key to the actual written key for now. + jobCtx := reorgInfo.NewJobContext() + + actualEndKey, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, t.RecordPrefix(), rangeStart, rangeEnd) + if err != nil { + logutil.BgLogger().Info("get backfill range task, get reverse key failed", zap.String("category", "ddl"), zap.Error(err)) + return rangeEnd + } + logutil.BgLogger().Info("get backfill range task, change end key", + zap.String("category", "ddl"), + zap.Int("id", taskID), + zap.Int64("pTbl", phyTbl.GetPhysicalID()), + zap.String("end key", hex.EncodeToString(rangeEnd)), + zap.String("current end key", hex.EncodeToString(actualEndKey))) + return actualEndKey +} + // sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled. -func sendTasks(scheduler backfillScheduler, consumer *resultConsumer, - t table.PhysicalTable, kvRanges []kv.KeyRange, reorgInfo *reorgInfo, taskIDAlloc *taskIDAllocator) { - batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc) +func sendTasks( + scheduler backfillScheduler, + t table.PhysicalTable, + kvRanges []kv.KeyRange, + reorgInfo *reorgInfo, + taskIDAlloc *taskIDAllocator, + bfWorkerTp backfillerType, +) error { + batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc, bfWorkerTp) for _, task := range batchTasks { - if consumer.shouldAbort() { - return + if err := scheduler.sendTask(task); err != nil { + return errors.Trace(err) } - scheduler.sendTask(task) } + return nil } var ( @@ -669,10 +627,12 @@ func SetBackfillTaskChanSizeForTest(n int) { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { - job := reorgInfo.Job - totalAddedCount := job.GetRowCount() - +func (dc *ddlCtx) writePhysicalTableRecord( + sessPool *sess.Pool, + t table.PhysicalTable, + bfWorkerType backfillerType, + reorgInfo *reorgInfo, +) error { startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { @@ -682,53 +642,120 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) { //nolint:forcetypeassert if val.(bool) { - failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(job.ErrorCount)) + ", mock unknown type: ast.whenClause.")) + failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(reorgInfo.Job.ErrorCount)) + ", mock unknown type: ast.whenClause.")) } }) jc := reorgInfo.NewJobContext() - sessCtx := newContext(reorgInfo.d.store) - scheduler, err := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc) + sessCtx := newReorgSessCtx(reorgInfo.d.store) + + eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(dc.ctx) + + scheduler, err := newBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc) if err != nil { return errors.Trace(err) } defer scheduler.close(true) - - consumer := newResultConsumer(dc, reorgInfo, sessPool, false) - consumer.run(scheduler, startKey, &totalAddedCount) - err = scheduler.setupWorkers() if err != nil { return errors.Trace(err) } - taskIDAlloc := newTaskIDAllocator() - for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize) - if err != nil { - return errors.Trace(err) - } - if len(kvRanges) == 0 { - break - } - logutil.BgLogger().Info("start backfill workers to reorg record", zap.String("category", "ddl"), - zap.Stringer("type", bfWorkerType), - zap.Int("workerCnt", scheduler.currentWorkerSize()), - zap.Int("regionCnt", len(kvRanges)), - zap.String("startKey", hex.EncodeToString(startKey)), - zap.String("endKey", hex.EncodeToString(endKey))) - - sendTasks(scheduler, consumer, t, kvRanges, reorgInfo, taskIDAlloc) - if consumer.shouldAbort() { - break + // process result goroutine + eg.Go(func() error { + totalAddedCount := reorgInfo.Job.GetRowCount() + keeper := newDoneTaskKeeper(startKey) + cnt := 0 + + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case result, ok := <-scheduler.resultChan(): + if !ok { + ddlLogger.Info("backfill workers successfully processed", + zap.Stringer("element", reorgInfo.currElement), + zap.Int64("total added count", totalAddedCount), + zap.String("start key", hex.EncodeToString(startKey))) + return nil + } + cnt++ + + if result.err != nil { + ddlLogger.Warn("backfill worker failed", + zap.Int64("job ID", reorgInfo.ID), + zap.Int64("total added count", totalAddedCount), + zap.String("start key", hex.EncodeToString(startKey)), + zap.String("result next key", hex.EncodeToString(result.nextKey)), + zap.Error(result.err)) + return result.err + } + + if result.totalCount > 0 { + totalAddedCount = int64(result.totalCount) + } else { + totalAddedCount += int64(result.addedCount) + } + dc.getReorgCtx(reorgInfo.Job.ID).setRowCount(totalAddedCount) + + keeper.updateNextKey(result.taskID, result.nextKey) + + if cnt%(scheduler.currentWorkerSize()*4) == 0 { + err2 := reorgInfo.UpdateReorgMeta(keeper.nextKey, sessPool) + if err2 != nil { + ddlLogger.Warn("update reorg meta failed", + zap.Int64("job ID", reorgInfo.ID), + zap.Error(err2)) + } + // We try to adjust the worker size regularly to reduce + // the overhead of loading the DDL related global variables. + err2 = scheduler.adjustWorkerSize() + if err2 != nil { + ddlLogger.Warn("cannot adjust backfill worker size", + zap.Int64("job ID", reorgInfo.ID), + zap.Error(err2)) + } + } + } } - startKey = kvRanges[len(kvRanges)-1].EndKey - if startKey.Cmp(endKey) >= 0 { - break + }) + + // generate task goroutine + eg.Go(func() error { + // we will modify the startKey in this goroutine, so copy them to avoid race. + start, end := startKey, endKey + taskIDAlloc := newTaskIDAllocator() + for { + kvRanges, err2 := splitAndValidateTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize) + if err2 != nil { + return errors.Trace(err2) + } + if len(kvRanges) == 0 { + break + } + ddlLogger.Info("start backfill workers to reorg record", + zap.Stringer("type", bfWorkerType), + zap.Int("workerCnt", scheduler.currentWorkerSize()), + zap.Int("regionCnt", len(kvRanges)), + zap.String("startKey", hex.EncodeToString(start)), + zap.String("endKey", hex.EncodeToString(end))) + + err2 = sendTasks(scheduler, t, kvRanges, reorgInfo, taskIDAlloc, bfWorkerType) + if err2 != nil { + return errors.Trace(err2) + } + + start = kvRanges[len(kvRanges)-1].EndKey + if start.Cmp(end) >= 0 { + break + } } - } - scheduler.close(false) - return consumer.getResult() + + scheduler.close(false) + return nil + }) + + return eg.Wait() } func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 1cb979455e140..f6e5c174574d1 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -234,7 +234,7 @@ type TableScanTask struct { } // String implement fmt.Stringer interface. -func (t *TableScanTask) String() string { +func (t TableScanTask) String() string { return fmt.Sprintf("TableScanTask: id=%d, startKey=%s, endKey=%s", t.ID, hex.EncodeToString(t.Start), hex.EncodeToString(t.End)) } @@ -295,7 +295,8 @@ func (src *TableScanTaskSource) generateTasks() error { startKey := src.startKey endKey := src.endKey for { - kvRanges, err := splitTableRanges( + kvRanges, err := splitAndValidateTableRanges( + src.ctx, src.tbl, src.store, startKey, @@ -433,7 +434,7 @@ var OperatorCallBackForTest func() func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecordChunk)) { logutil.Logger(w.ctx).Info("start a table scan task", - zap.Int("id", task.ID), zap.String("task", task.String())) + zap.Int("id", task.ID), zap.Stringer("task", task)) var idxResult IndexRecordChunk err := wrapInBeginRollback(w.se, func(startTS uint64) error { diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 6ad4faebae43d..a17b0bb917031 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -48,9 +48,8 @@ type backfillScheduler interface { setupWorkers() error close(force bool) - sendTask(task *reorgBackfillTask) - drainTasks() - receiveResult() (*backfillResult, bool) + sendTask(*reorgBackfillTask) error + resultChan() <-chan *backfillResult currentWorkerSize() int adjustWorkerSize() error @@ -80,9 +79,15 @@ type txnBackfillScheduler struct { closed bool } -func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.Pool, - tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context, - jobCtx *JobContext) (backfillScheduler, error) { +func newBackfillScheduler( + ctx context.Context, + info *reorgInfo, + sessPool *sess.Pool, + tp backfillerType, + tbl table.PhysicalTable, + sessCtx sessionctx.Context, + jobCtx *JobContext, +) (backfillScheduler, error) { if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { ctx = logutil.WithCategory(ctx, "ddl-ingest") return newIngestBackfillScheduler(ctx, info, sessPool, tbl, false), nil @@ -115,19 +120,17 @@ func (b *txnBackfillScheduler) setupWorkers() error { return b.adjustWorkerSize() } -func (b *txnBackfillScheduler) sendTask(task *reorgBackfillTask) { - b.taskCh <- task -} - -func (b *txnBackfillScheduler) drainTasks() { - for len(b.taskCh) > 0 { - <-b.taskCh +func (b *txnBackfillScheduler) sendTask(task *reorgBackfillTask) error { + select { + case <-b.ctx.Done(): + return b.ctx.Err() + case b.taskCh <- task: + return nil } } -func (b *txnBackfillScheduler) receiveResult() (*backfillResult, bool) { - ret, ok := <-b.resultCh - return ret, ok +func (b *txnBackfillScheduler) resultChan() <-chan *backfillResult { + return b.resultCh } func newSessCtx( @@ -136,7 +139,7 @@ func newSessCtx( tzLocation *model.TimeZoneLocation, resourceGroupName string, ) (sessionctx.Context, error) { - sessCtx := newContext(store) + sessCtx := newReorgSessCtx(store) if err := initSessCtx(sessCtx, sqlMode, tzLocation, resourceGroupName); err != nil { return nil, errors.Trace(err) } @@ -245,29 +248,29 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { if err != nil { return err } - runner = newBackfillWorker(jc.ddlJobCtx, idxWorker) + runner = newBackfillWorker(b.ctx, idxWorker) worker = idxWorker case typeAddIndexMergeTmpWorker: backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false) tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.elements) - runner = newBackfillWorker(jc.ddlJobCtx, tmpIdxWorker) + runner = newBackfillWorker(b.ctx, tmpIdxWorker) worker = tmpIdxWorker case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true updateWorker := newUpdateColumnWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) - runner = newBackfillWorker(jc.ddlJobCtx, updateWorker) + runner = newBackfillWorker(b.ctx, updateWorker) worker = updateWorker case typeCleanUpIndexWorker: idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) - runner = newBackfillWorker(jc.ddlJobCtx, idxWorker) + runner = newBackfillWorker(b.ctx, idxWorker) worker = idxWorker case typeReorgPartitionWorker: partWorker, err := newReorgPartitionWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) if err != nil { return err } - runner = newBackfillWorker(jc.ddlJobCtx, partWorker) + runner = newBackfillWorker(b.ctx, partWorker) worker = partWorker default: return errors.New("unknown backfill type") @@ -317,7 +320,6 @@ type ingestBackfillScheduler struct { writerPool *workerpool.WorkerPool[IndexRecordChunk, workerpool.None] writerMaxID int - poolErr chan error backendCtx ingest.BackendCtx checkpointMgr *ingest.CheckpointManager } @@ -331,7 +333,6 @@ func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl: tbl, taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), resultCh: make(chan *backfillResult, backfillTaskChanSize), - poolErr: make(chan error), distribute: distribute, } } @@ -380,10 +381,10 @@ func (b *ingestBackfillScheduler) close(force bool) { b.checkpointMgr.Flush() // Get the latest status after all workers are closed so that the result is more accurate. cnt, nextKey := b.checkpointMgr.Status() - b.resultCh <- &backfillResult{ + b.sendResult(&backfillResult{ totalCount: cnt, nextKey: nextKey, - } + }) } close(b.resultCh) if intest.InTest && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() { @@ -395,25 +396,26 @@ func (b *ingestBackfillScheduler) close(force bool) { } } -func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) { - b.taskCh <- task -} - -func (b *ingestBackfillScheduler) drainTasks() { - for len(b.taskCh) > 0 { - <-b.taskCh +func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) error { + select { + case <-b.ctx.Done(): + return b.ctx.Err() + case b.taskCh <- task: + return nil } } -func (b *ingestBackfillScheduler) receiveResult() (*backfillResult, bool) { +func (b *ingestBackfillScheduler) sendResult(res *backfillResult) { select { - case err := <-b.poolErr: - return &backfillResult{err: err}, true - case rs, ok := <-b.resultCh: - return rs, ok + case <-b.ctx.Done(): + case b.resultCh <- res: } } +func (b *ingestBackfillScheduler) resultChan() <-chan *backfillResult { + return b.resultCh +} + func (b *ingestBackfillScheduler) currentWorkerSize() int { return int(b.writerPool.Cap()) } @@ -430,7 +432,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordCh job := reorgInfo.Job sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName) if err != nil { - b.poolErr <- err + b.sendResult(&backfillResult{err: err}) return nil } bcCtx := b.backendCtx @@ -441,7 +443,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordCh if err != nil { // Return an error only if it is the first worker. if b.writerMaxID == 0 { - b.poolErr <- err + b.sendResult(&backfillResult{err: err}) return nil } logutil.Logger(b.ctx).Warn("cannot create new writer", zap.Error(err), @@ -459,7 +461,7 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordCh if err != nil { // Return an error only if it is the first worker. if b.writerMaxID == 0 { - b.poolErr <- err + b.sendResult(&backfillResult{err: err}) return nil } logutil.Logger(b.ctx).Warn("cannot create new writer", zap.Error(err), @@ -508,9 +510,16 @@ func expectedIngestWorkerCnt() (readerCnt, writerCnt int) { return readerCnt, writerCnt } +func (w *addIndexIngestWorker) sendResult(res *backfillResult) { + select { + case <-w.ctx.Done(): + case w.resultCh <- res: + } +} + func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool.None)) { defer util.Recover(metrics.LabelDDL, "ingestWorker.HandleTask", func() { - w.resultCh <- &backfillResult{taskID: rs.ID, err: dbterror.ErrReorgPanic} + w.sendResult(&backfillResult{taskID: rs.ID, err: dbterror.ErrReorgPanic}) }, false) defer w.copReqSenderPool.recycleChunk(rs.Chunk) result := &backfillResult{ @@ -520,21 +529,21 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool if result.err != nil { logutil.Logger(w.ctx).Error("encounter error when handle index chunk", zap.Int("id", rs.ID), zap.Error(rs.Err)) - w.resultCh <- result + w.sendResult(result) return } if !w.distribute { err := w.d.isReorgRunnable(w.jobID, false) if err != nil { result.err = err - w.resultCh <- result + w.sendResult(result) return } } count, nextKey, err := w.WriteLocal(&rs) if err != nil { result.err = err - w.resultCh <- result + w.sendResult(result) return } if count == 0 { @@ -554,7 +563,7 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool if ResultCounterForTest != nil && result.err == nil { ResultCounterForTest.Add(1) } - w.resultCh <- result + w.sendResult(result) } func (*addIndexIngestWorker) Close() {} diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index c43e3821e48a0..3a6c4d30b4ab8 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -588,7 +588,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in changingCol.Name = newColName changingCol.ChangeStateInfo = &model.ChangeStateInfo{DependencyColumnOffset: oldCol.Offset} - originDefVal, err := GetOriginDefaultValueForModifyColumn(newContext(d.store), changingCol, oldCol) + originDefVal, err := GetOriginDefaultValueForModifyColumn(newReorgSessCtx(d.store), changingCol, oldCol) if err != nil { return ver, errors.Trace(err) } @@ -1801,7 +1801,7 @@ func updateColumnDefaultValue(d *ddlCtx, t *meta.Meta, job *model.Job, newCol *m return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(newCol.Name, tblInfo.Name) } - if hasDefaultValue, _, err := checkColumnDefaultValue(newContext(d.store), table.ToColumn(oldCol.Clone()), newCol.DefaultValue); err != nil { + if hasDefaultValue, _, err := checkColumnDefaultValue(newReorgSessCtx(d.store), table.ToColumn(oldCol.Clone()), newCol.DefaultValue); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } else if !hasDefaultValue { @@ -1817,7 +1817,7 @@ func updateColumnDefaultValue(d *ddlCtx, t *meta.Meta, job *model.Job, newCol *m oldCol.AddFlag(mysql.NoDefaultValueFlag) } else { oldCol.DelFlag(mysql.NoDefaultValueFlag) - sctx := newContext(d.store) + sctx := newReorgSessCtx(d.store) err = checkDefaultValue(sctx, table.ToColumn(oldCol), true) if err != nil { job.State = model.JobStateCancelled diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 7129fad2c8cde..dd264f7ee3230 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -98,6 +98,8 @@ const ( recoverCheckFlagDisableGC ) +var ddlLogger = logutil.BgLogger().With(zap.String("category", "ddl")) + // OnExist specifies what to do when a new object has a name collision. type OnExist uint8 diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index fec80746232cd..929d634d84117 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -107,11 +107,17 @@ func (c *copReqSender) run() { } se := sess.NewSession(sessCtx) defer p.sessPool.Put(sessCtx) + var ( + task *reorgBackfillTask + ok bool + ) + for { - if util.HasCancelled(c.ctx) { + select { + case <-c.ctx.Done(): return + case task, ok = <-p.tasksCh: } - task, ok := <-p.tasksCh if !ok { return } @@ -131,7 +137,7 @@ func (c *copReqSender) run() { func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session) error { logutil.Logger(p.ctx).Info("start a cop-request task", - zap.Int("id", task.id), zap.String("task", task.String())) + zap.Int("id", task.id), zap.Stringer("task", task)) return wrapInBeginRollback(se, func(startTS uint64) error { rs, err := buildTableScan(p.ctx, p.copCtx.GetBase(), startTS, task.startKey, task.endKey) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 1c8573d8e9d08..bc0db87764d2e 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -81,8 +81,7 @@ type reorgFnResult struct { err error } -// newContext gets a context. It is only used for adding column in reorganization state. -func newContext(store kv.Storage) sessionctx.Context { +func newReorgSessCtx(store kv.Storage) sessionctx.Context { c := mock.NewContext() c.Store = store c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false) @@ -540,7 +539,7 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType { // buildDescTableScan builds a desc table scan upon tblInfo. func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) { - sctx := newContext(dc.store) + sctx := newReorgSessCtx(dc.store) dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit) if err != nil { return nil, errors.Trace(err) @@ -619,7 +618,7 @@ func (dc *ddlCtx) GetTableMaxHandle(ctx *JobContext, startTS uint64, tbl table.P // empty table return nil, true, nil } - sessCtx := newContext(dc.store) + sessCtx := newReorgSessCtx(dc.store) row := chk.GetRow(0) if tblInfo.IsCommonHandle { maxHandle, err = buildCommonHandleFromChunkRow(sessCtx.GetSessionVars().StmtCtx, tblInfo, pkIdx, handleCols, row) diff --git a/pkg/resourcemanager/pool/workerpool/workerpool.go b/pkg/resourcemanager/pool/workerpool/workerpool.go index caee5704a671c..7128c74bf2def 100644 --- a/pkg/resourcemanager/pool/workerpool/workerpool.go +++ b/pkg/resourcemanager/pool/workerpool/workerpool.go @@ -158,7 +158,10 @@ func (p *WorkerPool[T, R]) runAWorker() { // AddTask adds a task to the pool. func (p *WorkerPool[T, R]) AddTask(task T) { - p.taskChan <- task + select { + case <-p.ctx.Done(): + case p.taskChan <- task: + } } // GetResultChan gets the result channel from the pool.