Skip to content

Commit

Permalink
ddl: separate sessionctx.Context in backfillCtx to sub-contexts (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored May 31, 2024
1 parent 3fdb963 commit 4670bf5
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 73 deletions.
25 changes: 21 additions & 4 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/logutil"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/topsql"
Expand Down Expand Up @@ -145,29 +147,44 @@ type backfillCtx struct {
id int
*ddlCtx
sessCtx sessionctx.Context
warnings contextutil.WarnHandlerExt
loc *time.Location
exprCtx exprctx.BuildContext
tblCtx table.MutateContext
schemaName string
table table.Table
batchCnt int
jobContext *JobContext
metricCounter prometheus.Counter
}

func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context,
schemaName string, tbl table.Table, jobCtx *JobContext, label string, isDistributed bool) *backfillCtx {
func newBackfillCtx(id int, rInfo *reorgInfo,
schemaName string, tbl table.Table, jobCtx *JobContext, label string, isDistributed bool) (*backfillCtx, error) {
sessCtx, err := newSessCtx(rInfo.d.store, rInfo.ReorgMeta)
if err != nil {
return nil, err
}

if isDistributed {
id = int(backfillContextID.Add(1))
}

exprCtx := sessCtx.GetExprCtx()
return &backfillCtx{
id: id,
ddlCtx: ctx,
ddlCtx: rInfo.d,
sessCtx: sessCtx,
warnings: sessCtx.GetSessionVars().StmtCtx.WarnHandler,
exprCtx: exprCtx,
tblCtx: sessCtx.GetTableCtx(),
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
batchCnt: int(variable.GetDDLReorgBatchSize()),
jobContext: jobCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())),
}
}, nil
}

func updateTxnEntrySizeLimitIfNeeded(txn kv.Transaction) {
Expand Down
30 changes: 18 additions & 12 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,17 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
workerCnt := b.expectedWorkerSize()
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta)
if err != nil {
return err
}
var (
runner *backfillWorker
worker backfiller
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "add_idx_rate", false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false)
if err != nil {
return err
}

idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx,
job.ID, reorgInfo.elements, reorgInfo.currElement.TypeKey)
if err != nil {
Expand All @@ -293,23 +293,29 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
runner = newBackfillWorker(b.ctx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false)
if err != nil {
return err
}
tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.elements)
runner = newBackfillWorker(b.ctx, tmpIdxWorker)
worker = tmpIdxWorker
case typeUpdateColumnWorker:
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(
sessCtx.GetSessionVars().StmtCtx.TypeFlags().
WithIgnoreZeroDateErr(!reorgInfo.ReorgMeta.SQLMode.HasStrictMode()))
updateWorker := newUpdateColumnWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
updateWorker, err := newUpdateColumnWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(b.ctx, updateWorker)
worker = updateWorker
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
idxWorker, err := newCleanUpIndexWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(b.ctx, idxWorker)
worker = idxWorker
case typeReorgPartitionWorker:
partWorker, err := newReorgPartitionWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
partWorker, err := newReorgPartitionWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
Expand Down
49 changes: 31 additions & 18 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,11 +1212,23 @@ type updateColumnWorker struct {
checksumNeeded bool
}

func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) (*updateColumnWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false)
if err != nil {
return nil, err
}

sessCtx := bCtx.sessCtx
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(
sessCtx.GetSessionVars().StmtCtx.TypeFlags().
WithIgnoreZeroDateErr(!reorgInfo.ReorgMeta.SQLMode.HasStrictMode()))
bCtx.exprCtx = bCtx.sessCtx.GetExprCtx()
bCtx.tblCtx = bCtx.sessCtx.GetTableCtx()

if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
logutil.DDLLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.Stringer("reorgInfo", reorgInfo))
return nil
return nil, nil
}
var oldCol, newCol *model.ColumnInfo
for _, col := range t.WritableCols() {
Expand Down Expand Up @@ -1248,13 +1260,13 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
}
return &updateColumnWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false),
backfillCtx: bCtx,
oldColInfo: oldCol,
newColInfo: newCol,
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
checksumNeeded: checksumNeeded,
}
}, nil
}

func (w *updateColumnWorker) AddMetricInfo(cnt float64) {
Expand Down Expand Up @@ -1294,7 +1306,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotKeys(w.jobContext, w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(),
err := iterateSnapshotKeys(w.jobContext, w.ddlCtx.store, taskRange.priority, taskRange.physicalTable.RecordPrefix(),
txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in updateColumnWorker fetchRowColVals", 0)
Expand Down Expand Up @@ -1329,8 +1341,8 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
}

func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error {
sysTZ := w.sessCtx.GetSessionVars().StmtCtx.TimeZone()
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, sysTZ, w.rowMap)
sysTZ := w.loc
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.exprCtx, handle, rawRow, sysTZ, w.rowMap)
if err != nil {
return errors.Trace(dbterror.ErrCantDecodeRecord.GenWithStackByArgs("column", err))
}
Expand All @@ -1343,26 +1355,26 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra

var recordWarning *terror.Error
// Since every updateColumnWorker handle their own work individually, we can cache warning in statement context when casting datum.
oldWarn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
oldWarn := w.warnings.GetWarnings()
if oldWarn == nil {
oldWarn = []contextutil.SQLWarn{}
} else {
oldWarn = oldWarn[:0]
}
w.sessCtx.GetSessionVars().StmtCtx.SetWarnings(oldWarn)
w.warnings.SetWarnings(oldWarn)
val := w.rowMap[w.oldColInfo.ID]
col := w.newColInfo
if val.Kind() == types.KindNull && col.FieldType.GetType() == mysql.TypeTimestamp && mysql.HasNotNullFlag(col.GetFlag()) {
if v, err := expression.GetTimeCurrentTimestamp(w.sessCtx.GetExprCtx().GetEvalCtx(), col.GetType(), col.GetDecimal()); err == nil {
if v, err := expression.GetTimeCurrentTimestamp(w.exprCtx.GetEvalCtx(), col.GetType(), col.GetDecimal()); err == nil {
// convert null value to timestamp should be substituted with current timestamp if NOT_NULL flag is set.
w.rowMap[w.oldColInfo.ID] = v
}
}
newColVal, err := table.CastValue(w.sessCtx, w.rowMap[w.oldColInfo.ID], w.newColInfo, false, false)
newColVal, err := table.CastColumnValue(w.exprCtx, w.rowMap[w.oldColInfo.ID], w.newColInfo, false, false)
if err != nil {
return w.reformatErrors(err)
}
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
warn := w.warnings.GetWarnings()
if len(warn) != 0 {
//nolint:forcetypeassert
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
Expand All @@ -1378,7 +1390,7 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
})

w.rowMap[w.newColInfo.ID] = newColVal
_, err = w.rowDecoder.EvalRemainedExprColumnMap(w.sessCtx, w.rowMap)
_, err = w.rowDecoder.EvalRemainedExprColumnMap(w.exprCtx, w.rowMap)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1389,9 +1401,10 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
newRow = append(newRow, val)
}
checksums := w.calcChecksums()
sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder
newRowVal, err := tablecodec.EncodeRow(sctx.TimeZone(), newRow, newColumnIDs, nil, nil, rd, checksums...)
err = sctx.HandleError(err)
rd := &w.tblCtx.GetSessionVars().RowEncoder
ec := w.exprCtx.GetEvalCtx().ErrCtx()
newRowVal, err := tablecodec.EncodeRow(w.loc, newRow, newColumnIDs, nil, nil, rd, checksums...)
err = ec.HandleError(err)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1423,7 +1436,7 @@ func (w *updateColumnWorker) calcChecksums() []uint32 {
if !sort.IsSorted(w.checksumBuffer) {
sort.Sort(w.checksumBuffer)
}
checksum, err := w.checksumBuffer.Checksum(w.sessCtx.GetSessionVars().StmtCtx.TimeZone())
checksum, err := w.checksumBuffer.Checksum(w.loc)
if err != nil {
logutil.DDLLogger().Warn("skip checksum in update-column backfill due to encode error", zap.Error(err))
return nil
Expand Down Expand Up @@ -1465,7 +1478,7 @@ func (w *updateColumnWorker) cleanRowMap() {
func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
errInTxn = kv.RunInNewTxn(ctx, w.ddlCtx.store, true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
Expand Down
34 changes: 20 additions & 14 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,7 @@ func (w *baseIndexWorker) getIndexRecord(idxInfo *model.IndexInfo, handle kv.Han
idxVal[j] = idxColumnVal
continue
}
idxColumnVal, err = tables.GetColDefaultValue(w.sessCtx, col, w.defaultVals)
idxColumnVal, err = tables.GetColDefaultValue(w.exprCtx, col, w.defaultVals)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1469,8 +1469,8 @@ func (w *baseIndexWorker) getNextKey(taskRange reorgBackfillTask, taskDone bool)
}

func (w *baseIndexWorker) updateRowDecoder(handle kv.Handle, rawRecord []byte) error {
sysZone := w.sessCtx.GetSessionVars().StmtCtx.TimeZone()
_, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRecord, sysZone, w.rowMap)
sysZone := w.loc
_, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.exprCtx, handle, rawRecord, sysZone, w.rowMap)
return errors.Trace(err)
}

Expand All @@ -1488,7 +1488,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
// taskDone means that the reorged handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotKeys(w.jobContext, w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(), txn.StartTS(),
err := iterateSnapshotKeys(w.jobContext, w.ddlCtx.store, taskRange.priority, taskRange.physicalTable.RecordPrefix(), txn.StartTS(),
taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in baseIndexWorker fetchRowColVals", 0)
Expand Down Expand Up @@ -1572,7 +1572,8 @@ func genKeyExistsErr(key, value []byte, idxInfo *model.IndexInfo, tblInfo *model
// Note that `idxRecords` may belong to multiple indexes.
func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*indexRecord) error {
w.initBatchCheckBufs(len(idxRecords))
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
evalCtx := w.exprCtx.GetEvalCtx()
ec := evalCtx.ErrCtx()
uniqueBatchKeys := make([]kv.Key, 0, len(idxRecords))
cnt := 0
for i, record := range idxRecords {
Expand All @@ -1588,7 +1589,7 @@ func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords [
}
// skip by default.
idxRecords[i].skip = true
iter := idx.GenIndexKVIter(stmtCtx.ErrCtx(), stmtCtx.TimeZone(), record.vals, record.handle, idxRecords[i].rsData)
iter := idx.GenIndexKVIter(ec, w.loc, record.vals, record.handle, idxRecords[i].rsData)
for iter.Valid() {
var buf []byte
if cnt < len(w.idxKeyBufs) {
Expand Down Expand Up @@ -1644,7 +1645,7 @@ func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords [
idxRecords[w.recordIdx[i]].skip = found && idxRecords[w.recordIdx[i]].skip
}
// Constrains is already checked.
stmtCtx.BatchCheck = true
w.tblCtx.GetSessionVars().StmtCtx.BatchCheck = true
return nil
}

Expand Down Expand Up @@ -1868,7 +1869,7 @@ func (w *addIndexTxnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx
oprStartTime := time.Now()
jobID := handleRange.getJobID()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) (err error) {
errInTxn = kv.RunInNewTxn(ctx, w.ddlCtx.store, true, func(_ context.Context, txn kv.Transaction) (err error) {
taskCtx.finishTS = txn.StartTS()
taskCtx.addedCount = 0
taskCtx.scanCount = 0
Expand Down Expand Up @@ -1907,7 +1908,7 @@ func (w *addIndexTxnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx
}

handle, err := w.indexes[i%len(w.indexes)].Create(
w.sessCtx.GetTableCtx(), txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill)
w.tblCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) {
// Index already exists, skip it.
Expand Down Expand Up @@ -2417,7 +2418,12 @@ type cleanUpIndexWorker struct {
baseIndexWorker
}

func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *cleanUpIndexWorker {
func newCleanUpIndexWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) (*cleanUpIndexWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false)
if err != nil {
return nil, err
}

indexes := make([]table.Index, 0, len(t.Indices()))
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
for _, index := range t.Indices() {
Expand All @@ -2427,13 +2433,13 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
return &cleanUpIndexWorker{
baseIndexWorker: baseIndexWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false),
backfillCtx: bCtx,
indexes: indexes,
rowDecoder: rowDecoder,
defaultVals: make([]types.Datum, len(t.WritableCols())),
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
},
}
}, nil
}

func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
Expand All @@ -2446,7 +2452,7 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt

oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
errInTxn = kv.RunInNewTxn(ctx, w.ddlCtx.store, true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
Expand All @@ -2471,7 +2477,7 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt
// we fetch records row by row, so records will belong to
// index[0], index[1] ... index[n-1], index[0], index[1] ...
// respectively. So indexes[i%n] is the index of idxRecords[i].
err := w.indexes[i%n].Delete(w.sessCtx.GetTableCtx(), txn, idxRecord.vals, idxRecord.handle)
err := w.indexes[i%n].Delete(w.tblCtx, txn, idxRecord.vals, idxRecord.handle)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 4670bf5

Please sign in to comment.