From b7f0742f17add942b1d2a5cddc7204842ad4e2ce Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 12 Sep 2023 13:42:09 +0800 Subject: [PATCH] rc: add resource group name to ddl (#46756) ref pingcap/tidb#46656 --- ddl/backfilling.go | 6 +- ddl/backfilling_dispatcher.go | 2 +- ddl/backfilling_scheduler.go | 20 ++++-- ddl/column.go | 6 +- ddl/ddl.go | 26 +++----- ddl/ddl_api.go | 88 ++++++++----------------- ddl/ddl_worker.go | 6 +- ddl/index.go | 8 ++- ddl/index_merge_tmp.go | 1 + ddl/multi_schema_change.go | 10 +-- ddl/partition.go | 9 +-- ddl/reorg.go | 10 ++- ddl/stage_read_index.go | 2 +- ddl/stage_scheduler.go | 2 +- disttask/importinto/subtask_executor.go | 2 + parser/model/reorg.go | 13 ++-- 16 files changed, 98 insertions(+), 113 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index d880658a7fe25..f5dda05b74301 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -543,7 +543,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, job := reorgInfo.Job //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) - jobCtx := reorgInfo.d.jobContext(job.ID) + jobCtx := reorgInfo.NewJobContext() for _, keyRange := range kvRanges { taskID := taskIDAlloc.alloc() startKey := keyRange.StartKey @@ -684,7 +684,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical } }) - jc := dc.jobContext(job.ID) + jc := reorgInfo.NewJobContext() sessCtx := newContext(reorgInfo.d.store) scheduler, err := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc) if err != nil { @@ -780,6 +780,7 @@ func iterateSnapshotKeys(ctx *JobContext, store kv.Storage, priority int, keyPre if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil { snap.SetOption(kv.ResourceGroupTagger, tagger) } + snap.SetOption(kv.ResourceGroupName, ctx.resourceGroupName) it, err := snap.Iter(firstKey, upperBound) if err != nil { @@ -824,6 +825,7 @@ func GetRangeEndKey(ctx *JobContext, store kv.Storage, priority int, keyPrefix k if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil { snap.SetOption(kv.ResourceGroupTagger, tagger) } + snap.SetOption(kv.ResourceGroupName, ctx.resourceGroupName) snap.SetOption(kv.RequestSourceInternal, true) snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType()) snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL) diff --git a/ddl/backfilling_dispatcher.go b/ddl/backfilling_dispatcher.go index 194a6b17cea48..242444ecd4845 100644 --- a/ddl/backfilling_dispatcher.go +++ b/ddl/backfilling_dispatcher.go @@ -193,7 +193,7 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job) if err != nil { return nil, errors.Trace(err) } - startKey, endKey, err := getTableRange(d.jobContext(job.ID), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority) + startKey, endKey, err := getTableRange(d.jobContext(job.ID, job.ReorgMeta), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority) if startKey == nil && endKey == nil { // Empty table. return nil, nil diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index 6b375574b2a1f..9b58ef2a755d6 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -128,15 +128,25 @@ func (b *txnBackfillScheduler) receiveResult() (*backfillResult, bool) { return ret, ok } -func newSessCtx(store kv.Storage, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) (sessionctx.Context, error) { +func newSessCtx( + store kv.Storage, + sqlMode mysql.SQLMode, + tzLocation *model.TimeZoneLocation, + resourceGroupName string, +) (sessionctx.Context, error) { sessCtx := newContext(store) if err := initSessCtx(sessCtx, sqlMode, tzLocation); err != nil { return nil, errors.Trace(err) } + sessCtx.GetSessionVars().ResourceGroupName = resourceGroupName return sessCtx, nil } -func initSessCtx(sessCtx sessionctx.Context, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) error { +func initSessCtx( + sessCtx sessionctx.Context, + sqlMode mysql.SQLMode, + tzLocation *model.TimeZoneLocation, +) error { // Unify the TimeZone settings in newContext. if sessCtx.GetSessionVars().StmtCtx.TimeZone == nil { tz := *time.UTC @@ -183,7 +193,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { workerCnt := b.expectedWorkerSize() // Increase the worker. for i := len(b.workers); i < workerCnt; i++ { - sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location) + sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName) if err != nil { return err } @@ -383,7 +393,7 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error { func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordChunk, workerpool.None] { reorgInfo := b.reorgInfo job := reorgInfo.Job - sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location) + sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName) if err != nil { b.poolErr <- err return nil @@ -425,7 +435,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e return nil, errors.New("cannot find index info") } ri := b.reorgInfo - sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location) + sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location, ri.ReorgMeta.ResourceGroupName) if err != nil { logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err)) return nil, err diff --git a/ddl/column.go b/ddl/column.go index 03dd5cda82733..5065925fb34db 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -825,7 +825,8 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) + reorgInfo, err := getReorgInfo(d.jobContext(job.ID, job.ReorgMeta), + d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) if err != nil || reorgInfo == nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1137,7 +1138,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error return errors.Trace(err) } //nolint:forcetypeassert - originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job.ID), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) + originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } @@ -1467,6 +1468,7 @@ func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCt if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } + txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { diff --git a/ddl/ddl.go b/ddl/ddl.go index 19a87a706e099..39c22c8f3c191 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -465,13 +465,19 @@ func (dc *ddlCtx) removeJobCtx(job *model.Job) { delete(dc.jobCtx.jobCtxMap, job.ID) } -func (dc *ddlCtx) jobContext(jobID int64) *JobContext { +func (dc *ddlCtx) jobContext(jobID int64, reorgMeta *model.DDLReorgMeta) *JobContext { dc.jobCtx.RLock() defer dc.jobCtx.RUnlock() + var ctx *JobContext if jobContext, exists := dc.jobCtx.jobCtxMap[jobID]; exists { - return jobContext + ctx = jobContext + } else { + ctx = NewJobContext() + } + if reorgMeta != nil && len(ctx.resourceGroupName) == 0 { + ctx.resourceGroupName = reorgMeta.ResourceGroupName } - return NewJobContext() + return ctx } func (dc *ddlCtx) removeBackfillCtxJobCtx(jobID int64) { @@ -491,20 +497,6 @@ func (dc *ddlCtx) backfillCtxJobIDs() []int64 { return runningJobIDs } -func (dc *ddlCtx) setBackfillCtxJobContext(jobID int64, jobQuery string, jobType model.ActionType) (*JobContext, bool) { - dc.backfillCtx.Lock() - defer dc.backfillCtx.Unlock() - - jobCtx, existent := dc.backfillCtx.jobCtxMap[jobID] - if !existent { - dc.setDDLLabelForTopSQL(jobID, jobQuery) - dc.setDDLSourceForDiagnosis(jobID, jobType) - jobCtx = dc.jobContext(jobID) - dc.backfillCtx.jobCtxMap[jobID] = jobCtx - } - return jobCtx, existent -} - type reorgContexts struct { sync.RWMutex // reorgCtxMap maps job ID to reorg context. diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 9f502cf05a366..efb41f6285751 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4385,7 +4385,6 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp newPartInfo.NewTableID = newID[0] newPartInfo.DDLType = piOld.Type - tzName, tzOffset := ddlutil.GetTimeZone(ctx) job := &model.Job{ SchemaID: schema.ID, TableID: meta.ID, @@ -4394,12 +4393,7 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp Type: model.ActionAlterTablePartitioning, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{partNames, newPartInfo}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, + ReorgMeta: NewDDLReorgMeta(ctx), } // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. @@ -4455,7 +4449,6 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec return errors.Trace(err) } - tzName, tzOffset := ddlutil.GetTimeZone(ctx) job := &model.Job{ SchemaID: schema.ID, TableID: meta.ID, @@ -4464,12 +4457,7 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec Type: model.ActionReorganizePartition, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{partNames, partInfo}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, + ReorgMeta: NewDDLReorgMeta(ctx), } // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. @@ -4525,7 +4513,6 @@ func (d *ddl) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, spec * } partInfo.NewTableID = partInfo.Definitions[0].ID - tzName, tzOffset := ddlutil.GetTimeZone(ctx) job := &model.Job{ SchemaID: schema.ID, TableID: meta.ID, @@ -4534,12 +4521,7 @@ func (d *ddl) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, spec * Type: model.ActionRemovePartitioning, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{partNames, partInfo}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, + ReorgMeta: NewDDLReorgMeta(ctx), } // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. @@ -5687,7 +5669,6 @@ func GetModifiableColumnJob( return nil, errors.Trace(err) } - tzName, tzOffset := ddlutil.GetTimeZone(sctx) job := &model.Job{ SchemaID: schema.ID, TableID: t.Meta().ID, @@ -5695,14 +5676,9 @@ func GetModifiableColumnJob( TableName: t.Meta().Name.L, Type: model.ActionModifyColumn, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: sctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, - CtxVars: []interface{}{needChangeColData}, - Args: []interface{}{&newCol.ColumnInfo, originalColName, spec.Position, modifyColumnTp, newAutoRandBits}, + ReorgMeta: NewDDLReorgMeta(sctx), + CtxVars: []interface{}{needChangeColData}, + Args: []interface{}{&newCol.ColumnInfo, originalColName, spec.Position, modifyColumnTp, newAutoRandBits}, } return job, nil } @@ -5953,8 +5929,6 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al return errors.Trace(err) } - tzName, tzOffset := ddlutil.GetTimeZone(ctx) - newCol := oldCol.Clone() newCol.Name = newColName job := &model.Job{ @@ -5964,13 +5938,8 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al TableName: tbl.Meta().Name.L, Type: model.ActionModifyColumn, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, - Args: []interface{}{&newCol, oldColName, spec.Position, 0, 0}, + ReorgMeta: NewDDLReorgMeta(ctx), + Args: []interface{}{&newCol, oldColName, spec.Position, 0, 0}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -7090,8 +7059,6 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m } } - tzName, tzOffset := ddlutil.GetTimeZone(ctx) - unique := true sqlMode := ctx.GetSessionVars().SQLMode job := &model.Job{ @@ -7101,14 +7068,9 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m TableName: t.Meta().Name.L, Type: model.ActionAddPrimaryKey, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, - Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, sqlMode, nil, global}, - Priority: ctx.GetSessionVars().DDLReorgPriority, + ReorgMeta: NewDDLReorgMeta(ctx), + Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, sqlMode, nil, global}, + Priority: ctx.GetSessionVars().DDLReorgPriority, } err = d.DoDDLJob(ctx, job) @@ -7342,7 +7304,6 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde return d.addHypoIndexIntoCtx(ctx, ti.Schema, ti.Name, indexInfo) } - tzName, tzOffset := ddlutil.GetTimeZone(ctx) chs, coll := ctx.GetSessionVars().GetCharsetInfo() job := &model.Job{ SchemaID: schema.ID, @@ -7351,16 +7312,11 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde TableName: t.Meta().Name.L, Type: model.ActionAddIndex, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, - Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global}, - Priority: ctx.GetSessionVars().DDLReorgPriority, - Charset: chs, - Collate: coll, + ReorgMeta: NewDDLReorgMeta(ctx), + Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global}, + Priority: ctx.GetSessionVars().DDLReorgPriority, + Charset: chs, + Collate: coll, } err = d.DoDDLJob(ctx, job) @@ -9012,3 +8968,15 @@ func (d *ddl) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrN err = d.callHookOnChanged(job, err) return errors.Trace(err) } + +// NewDDLReorgMeta create a DDL ReorgMeta. +func NewDDLReorgMeta(ctx sessionctx.Context) *model.DDLReorgMeta { + tzName, tzOffset := ddlutil.GetTimeZone(ctx) + return &model.DDLReorgMeta{ + SQLMode: ctx.GetSessionVars().SQLMode, + Warnings: make(map[errors.ErrorID]*terror.Error), + WarningsCount: make(map[errors.ErrorID]int64), + Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, + ResourceGroupName: ctx.GetSessionVars().ResourceGroupName, + } +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 0775ebb0ed192..e99601a6d81d7 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -116,6 +116,8 @@ type JobContext struct { cacheNormalizedSQL string cacheDigest *parser.Digest tp string + + resourceGroupName string } // NewJobContext returns a new ddl job context. @@ -762,10 +764,12 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { } w.setDDLLabelForTopSQL(job.ID, job.Query) w.setDDLSourceForDiagnosis(job.ID, job.Type) - jobContext := w.jobContext(job.ID) + jobContext := w.jobContext(job.ID, job.ReorgMeta) if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } + txn.SetOption(kv.ResourceGroupName, jobContext.resourceGroupName) + t := meta.NewMeta(txn) if job.IsDone() || job.IsRollbackDone() { if job.IsDone() { diff --git a/ddl/index.go b/ddl/index.go index d8d38980245a2..d664472f7539f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1011,7 +1011,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) + reorgInfo, err := getReorgInfo(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) if err != nil || reorgInfo == nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1743,6 +1743,7 @@ func (w *addIndexTxnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(jobID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } + txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { @@ -1996,7 +1997,7 @@ func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysic if err != nil { return 0, nil, nil, errors.Trace(err) } - startKey, endKey, err = getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + startKey, endKey, err = getTableRange(reorg.NewJobContext(), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return 0, nil, nil, errors.Trace(err) } @@ -2116,6 +2117,7 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } + txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { @@ -2196,7 +2198,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(reorg.NewJobContext(), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index b7db83c7db437..3bf2e8a70c8d4 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -151,6 +151,7 @@ func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx ba if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } + txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange) if err != nil { diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 19c355ebfa8da..3bb5eaa3b8968 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -16,7 +16,6 @@ package ddl import ( "github.com/pingcap/errors" - ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -34,7 +33,7 @@ func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident) error { if err != nil { return errors.Trace(err) } - tzName, tzOffset := ddlutil.GetTimeZone(ctx) + job := &model.Job{ SchemaID: schema.ID, TableID: t.Meta().ID, @@ -44,12 +43,7 @@ func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident) error { BinlogInfo: &model.HistoryInfo{}, Args: nil, MultiSchemaInfo: ctx.GetSessionVars().StmtCtx.MultiSchemaInfo, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, + ReorgMeta: NewDDLReorgMeta(ctx), } err = checkMultiSchemaInfo(ctx.GetSessionVars().StmtCtx.MultiSchemaInfo, t) if err != nil { diff --git a/ddl/partition.go b/ddl/partition.go index e3ded515ec5c9..b6d39772de7a9 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2033,7 +2033,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( } defer w.sessPool.Put(sctx) rh := newReorgHandler(sess.NewSession(sctx)) - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID), d, rh, job, dbInfo, pt, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, pt, physicalTableIDs, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version @@ -2220,7 +2220,7 @@ func (w *worker) onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } defer w.sessPool.Put(sctx) rh := newReorgHandler(sess.NewSession(sctx)) - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID), d, rh, job, dbInfo, pt, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, pt, physicalTableIDs, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version @@ -3013,7 +3013,7 @@ func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tb if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID), d, rh, job, dbInfo, partTbl, physTblIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID, job.ReorgMeta), d, rh, job, dbInfo, partTbl, physTblIDs, elements) err = w.runReorgJob(reorgInfo, tbl.Meta(), d.lease, func() (reorgErr error) { defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork", func() { @@ -3098,6 +3098,7 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } + txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { @@ -3290,7 +3291,7 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) // like where the regInfo PhysicalTableID and element is the same, // and the tableid in the key-prefix regInfo.StartKey and regInfo.EndKey matches with PhysicalTableID // do not change the reorgInfo start/end key - startHandle, endHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job.ID), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority) + startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index 2c83a8d6df8b2..997dcea0384e6 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -447,6 +447,10 @@ type reorgInfo struct { currElement *meta.Element } +func (r *reorgInfo) NewJobContext() *JobContext { + return r.d.jobContext(r.Job.ID, r.Job.ReorgMeta) +} + func (r *reorgInfo) String() string { var isEnabled bool if ingest.LitInitialized { @@ -519,9 +523,11 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table. builder.SetDAGRequest(dagPB). SetStartTS(startTS). SetKeepOrder(true). - SetConcurrency(1).SetDesc(true) + SetConcurrency(1). + SetDesc(true). + SetResourceGroupTagger(ctx.getResourceGroupTaggerForTopSQL()). + SetResourceGroupName(ctx.resourceGroupName) - builder.Request.ResourceGroupTagger = ctx.getResourceGroupTaggerForTopSQL() builder.Request.NotFillCache = true builder.Request.Priority = kv.PriorityLow builder.RequestSource.RequestSourceInternal = true diff --git a/ddl/stage_read_index.go b/ddl/stage_read_index.go index 02787a6fa89ac..c0019caecedfc 100644 --- a/ddl/stage_read_index.go +++ b/ddl/stage_read_index.go @@ -113,7 +113,7 @@ func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask *proto return nil, err } - sessCtx, err := newSessCtx(d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location) + sessCtx, err := newSessCtx(d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName) if err != nil { return nil, err } diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index 64f521fbbe03c..852052f12f9ca 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -71,7 +71,7 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, } if !stepForImport { - jc := d.jobContext(jobMeta.ID) + jc := d.jobContext(jobMeta.ID, jobMeta.ReorgMeta) d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) return newReadIndexToLocalStage(d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc, summary), nil diff --git a/disttask/importinto/subtask_executor.go b/disttask/importinto/subtask_executor.go index 18a9e78d75791..a09ccc05af82f 100644 --- a/disttask/importinto/subtask_executor.go +++ b/disttask/importinto/subtask_executor.go @@ -184,6 +184,8 @@ func checksumTable(ctx context.Context, executor storage.SessionExecutor, taskMe se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency) }() + // TODO: add resource group name + rs, err := storage.ExecSQL(ctx, se, sql) if err != nil { return err diff --git a/parser/model/reorg.go b/parser/model/reorg.go index 55c45da007b1b..f2c3cb7a7367b 100644 --- a/parser/model/reorg.go +++ b/parser/model/reorg.go @@ -23,12 +23,13 @@ import ( // DDLReorgMeta is meta info of DDL reorganization. type DDLReorgMeta struct { - SQLMode mysql.SQLMode `json:"sql_mode"` - Warnings map[errors.ErrorID]*terror.Error `json:"warnings"` - WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` - Location *TimeZoneLocation `json:"location"` - ReorgTp ReorgType `json:"reorg_tp"` - IsDistReorg bool `json:"is_dist_reorg"` + SQLMode mysql.SQLMode `json:"sql_mode"` + Warnings map[errors.ErrorID]*terror.Error `json:"warnings"` + WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` + Location *TimeZoneLocation `json:"location"` + ReorgTp ReorgType `json:"reorg_tp"` + IsDistReorg bool `json:"is_dist_reorg"` + ResourceGroupName string `json:"resource_group_name"` } // ReorgType indicates which process is used for the data reorganization.