Skip to content

Commit

Permalink
rc: add resource group name to ddl (#46756)
Browse files Browse the repository at this point in the history
ref #46656
  • Loading branch information
okJiang authored Sep 12, 2023
1 parent f85c9be commit b7f0742
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 113 deletions.
6 changes: 4 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
jobCtx := reorgInfo.d.jobContext(job.ID)
jobCtx := reorgInfo.NewJobContext()
for _, keyRange := range kvRanges {
taskID := taskIDAlloc.alloc()
startKey := keyRange.StartKey
Expand Down Expand Up @@ -684,7 +684,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
}
})

jc := dc.jobContext(job.ID)
jc := reorgInfo.NewJobContext()
sessCtx := newContext(reorgInfo.d.store)
scheduler, err := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc)
if err != nil {
Expand Down Expand Up @@ -780,6 +780,7 @@ func iterateSnapshotKeys(ctx *JobContext, store kv.Storage, priority int, keyPre
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
snap.SetOption(kv.ResourceGroupName, ctx.resourceGroupName)

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
Expand Down Expand Up @@ -824,6 +825,7 @@ func GetRangeEndKey(ctx *JobContext, store kv.Storage, priority int, keyPrefix k
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
snap.SetOption(kv.ResourceGroupName, ctx.resourceGroupName)
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job)
if err != nil {
return nil, errors.Trace(err)
}
startKey, endKey, err := getTableRange(d.jobContext(job.ID), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority)
startKey, endKey, err := getTableRange(d.jobContext(job.ID, job.ReorgMeta), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority)
if startKey == nil && endKey == nil {
// Empty table.
return nil, nil
Expand Down
20 changes: 15 additions & 5 deletions ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,25 @@ func (b *txnBackfillScheduler) receiveResult() (*backfillResult, bool) {
return ret, ok
}

func newSessCtx(store kv.Storage, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) (sessionctx.Context, error) {
func newSessCtx(
store kv.Storage,
sqlMode mysql.SQLMode,
tzLocation *model.TimeZoneLocation,
resourceGroupName string,
) (sessionctx.Context, error) {
sessCtx := newContext(store)
if err := initSessCtx(sessCtx, sqlMode, tzLocation); err != nil {
return nil, errors.Trace(err)
}
sessCtx.GetSessionVars().ResourceGroupName = resourceGroupName
return sessCtx, nil
}

func initSessCtx(sessCtx sessionctx.Context, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) error {
func initSessCtx(
sessCtx sessionctx.Context,
sqlMode mysql.SQLMode,
tzLocation *model.TimeZoneLocation,
) error {
// Unify the TimeZone settings in newContext.
if sessCtx.GetSessionVars().StmtCtx.TimeZone == nil {
tz := *time.UTC
Expand Down Expand Up @@ -183,7 +193,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
workerCnt := b.expectedWorkerSize()
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location)
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
return err
}
Expand Down Expand Up @@ -383,7 +393,7 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error {
func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordChunk, workerpool.None] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location)
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
b.poolErr <- err
return nil
Expand Down Expand Up @@ -425,7 +435,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
return nil, errors.New("cannot find index info")
}
ri := b.reorgInfo
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location)
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location, ri.ReorgMeta.ResourceGroupName)
if err != nil {
logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err))
return nil, err
Expand Down
6 changes: 4 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,8 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
reorgInfo, err := getReorgInfo(d.jobContext(job.ID, job.ReorgMeta),
d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo == nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1137,7 +1138,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
return errors.Trace(err)
}
//nolint:forcetypeassert
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job.ID), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1467,6 +1468,7 @@ func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCt
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down
26 changes: 9 additions & 17 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,13 +465,19 @@ func (dc *ddlCtx) removeJobCtx(job *model.Job) {
delete(dc.jobCtx.jobCtxMap, job.ID)
}

func (dc *ddlCtx) jobContext(jobID int64) *JobContext {
func (dc *ddlCtx) jobContext(jobID int64, reorgMeta *model.DDLReorgMeta) *JobContext {
dc.jobCtx.RLock()
defer dc.jobCtx.RUnlock()
var ctx *JobContext
if jobContext, exists := dc.jobCtx.jobCtxMap[jobID]; exists {
return jobContext
ctx = jobContext
} else {
ctx = NewJobContext()
}
if reorgMeta != nil && len(ctx.resourceGroupName) == 0 {
ctx.resourceGroupName = reorgMeta.ResourceGroupName
}
return NewJobContext()
return ctx
}

func (dc *ddlCtx) removeBackfillCtxJobCtx(jobID int64) {
Expand All @@ -491,20 +497,6 @@ func (dc *ddlCtx) backfillCtxJobIDs() []int64 {
return runningJobIDs
}

func (dc *ddlCtx) setBackfillCtxJobContext(jobID int64, jobQuery string, jobType model.ActionType) (*JobContext, bool) {
dc.backfillCtx.Lock()
defer dc.backfillCtx.Unlock()

jobCtx, existent := dc.backfillCtx.jobCtxMap[jobID]
if !existent {
dc.setDDLLabelForTopSQL(jobID, jobQuery)
dc.setDDLSourceForDiagnosis(jobID, jobType)
jobCtx = dc.jobContext(jobID)
dc.backfillCtx.jobCtxMap[jobID] = jobCtx
}
return jobCtx, existent
}

type reorgContexts struct {
sync.RWMutex
// reorgCtxMap maps job ID to reorg context.
Expand Down
88 changes: 28 additions & 60 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4385,7 +4385,6 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp
newPartInfo.NewTableID = newID[0]
newPartInfo.DDLType = piOld.Type

tzName, tzOffset := ddlutil.GetTimeZone(ctx)
job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
Expand All @@ -4394,12 +4393,7 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp
Type: model.ActionAlterTablePartitioning,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames, newPartInfo},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
ReorgMeta: NewDDLReorgMeta(ctx),
}

// No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead.
Expand Down Expand Up @@ -4455,7 +4449,6 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec
return errors.Trace(err)
}

tzName, tzOffset := ddlutil.GetTimeZone(ctx)
job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
Expand All @@ -4464,12 +4457,7 @@ func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec
Type: model.ActionReorganizePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames, partInfo},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
ReorgMeta: NewDDLReorgMeta(ctx),
}

// No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead.
Expand Down Expand Up @@ -4525,7 +4513,6 @@ func (d *ddl) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, spec *
}
partInfo.NewTableID = partInfo.Definitions[0].ID

tzName, tzOffset := ddlutil.GetTimeZone(ctx)
job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
Expand All @@ -4534,12 +4521,7 @@ func (d *ddl) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, spec *
Type: model.ActionRemovePartitioning,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames, partInfo},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
ReorgMeta: NewDDLReorgMeta(ctx),
}

// No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead.
Expand Down Expand Up @@ -5687,22 +5669,16 @@ func GetModifiableColumnJob(
return nil, errors.Trace(err)
}

tzName, tzOffset := ddlutil.GetTimeZone(sctx)
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: sctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
CtxVars: []interface{}{needChangeColData},
Args: []interface{}{&newCol.ColumnInfo, originalColName, spec.Position, modifyColumnTp, newAutoRandBits},
ReorgMeta: NewDDLReorgMeta(sctx),
CtxVars: []interface{}{needChangeColData},
Args: []interface{}{&newCol.ColumnInfo, originalColName, spec.Position, modifyColumnTp, newAutoRandBits},
}
return job, nil
}
Expand Down Expand Up @@ -5953,8 +5929,6 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
return errors.Trace(err)
}

tzName, tzOffset := ddlutil.GetTimeZone(ctx)

newCol := oldCol.Clone()
newCol.Name = newColName
job := &model.Job{
Expand All @@ -5964,13 +5938,8 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
TableName: tbl.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Args: []interface{}{&newCol, oldColName, spec.Position, 0, 0},
ReorgMeta: NewDDLReorgMeta(ctx),
Args: []interface{}{&newCol, oldColName, spec.Position, 0, 0},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -7090,8 +7059,6 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
}
}

tzName, tzOffset := ddlutil.GetTimeZone(ctx)

unique := true
sqlMode := ctx.GetSessionVars().SQLMode
job := &model.Job{
Expand All @@ -7101,14 +7068,9 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
TableName: t.Meta().Name.L,
Type: model.ActionAddPrimaryKey,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, sqlMode, nil, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
ReorgMeta: NewDDLReorgMeta(ctx),
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, sqlMode, nil, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -7342,7 +7304,6 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
return d.addHypoIndexIntoCtx(ctx, ti.Schema, ti.Name, indexInfo)
}

tzName, tzOffset := ddlutil.GetTimeZone(ctx)
chs, coll := ctx.GetSessionVars().GetCharsetInfo()
job := &model.Job{
SchemaID: schema.ID,
Expand All @@ -7351,16 +7312,11 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
TableName: t.Meta().Name.L,
Type: model.ActionAddIndex,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: chs,
Collate: coll,
ReorgMeta: NewDDLReorgMeta(ctx),
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: chs,
Collate: coll,
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -9012,3 +8968,15 @@ func (d *ddl) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrN
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

// NewDDLReorgMeta create a DDL ReorgMeta.
func NewDDLReorgMeta(ctx sessionctx.Context) *model.DDLReorgMeta {
tzName, tzOffset := ddlutil.GetTimeZone(ctx)
return &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
ResourceGroupName: ctx.GetSessionVars().ResourceGroupName,
}
}
6 changes: 5 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type JobContext struct {
cacheNormalizedSQL string
cacheDigest *parser.Digest
tp string

resourceGroupName string
}

// NewJobContext returns a new ddl job context.
Expand Down Expand Up @@ -762,10 +764,12 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
}
w.setDDLLabelForTopSQL(job.ID, job.Query)
w.setDDLSourceForDiagnosis(job.ID, job.Type)
jobContext := w.jobContext(job.ID)
jobContext := w.jobContext(job.ID, job.ReorgMeta)
if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, jobContext.resourceGroupName)

t := meta.NewMeta(txn)
if job.IsDone() || job.IsRollbackDone() {
if job.IsDone() {
Expand Down
Loading

0 comments on commit b7f0742

Please sign in to comment.