Skip to content

Commit

Permalink
ddl: remove mock.Context usage in addIndexIngestWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed May 22, 2024
1 parent 6928519 commit 3b92ef4
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ go_test(
"//pkg/disttask/framework/storage",
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/errctx",
"//pkg/errno",
"//pkg/executor",
"//pkg/infoschema",
Expand All @@ -296,7 +297,6 @@ go_test(
"//pkg/session",
"//pkg/session/types",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/store/gcworker",
Expand Down
19 changes: 0 additions & 19 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tidb/pkg/util/topsql"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -543,24 +542,6 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co
return decodeColMap, nil
}

func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error {
// It is set to SystemLocation to be compatible with nil LocationInfo.
tz := *timeutil.SystemLocation()
if sctx.GetSessionVars().TimeZone == nil {
sctx.GetSessionVars().TimeZone = &tz
} else {
*sctx.GetSessionVars().TimeZone = tz
}
if tzLocation != nil {
loc, err := tzLocation.GetLocation()
if err != nil {
return errors.Trace(err)
}
*sctx.GetSessionVars().TimeZone = *loc
}
return nil
}

var backfillTaskChanSize = 128

// SetBackfillTaskChanSizeForTest is only used for test.
Expand Down
8 changes: 3 additions & 5 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,7 @@ func (w *indexIngestBaseWorker) initSessCtx() {
return
}
w.restore = restoreSessCtx(sessCtx)
if err := initSessCtx(sessCtx,
w.reorgMeta.SQLMode,
w.reorgMeta.Location,
w.reorgMeta.ResourceGroupName); err != nil {
if err := initSessCtx(sessCtx, w.reorgMeta); err != nil {
w.ctx.onError(err)
return
}
Expand Down Expand Up @@ -755,7 +752,8 @@ func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nex

oprStartTime := time.Now()
vars := w.se.GetSessionVars()
cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, vars, rs.Chunk)
sc := vars.StmtCtx
cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk)
if err != nil || cnt == 0 {
return 0, nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return err
}

sessCtx, err := newSessCtx(
r.d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName)
sessCtx, err := newSessCtx(r.d.store, r.job.ReorgMeta)
if err != nil {
return err
}
Expand Down
55 changes: 16 additions & 39 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ import (
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
poolutil "github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -135,26 +132,16 @@ func (b *txnBackfillScheduler) resultChan() <-chan *backfillResult {
return b.resultCh
}

func newSessCtx(
store kv.Storage,
sqlMode mysql.SQLMode,
tzLocation *model.TimeZoneLocation,
resourceGroupName string,
) (sessionctx.Context, error) {
func newSessCtx(store kv.Storage, reorgMeta *model.DDLReorgMeta) (sessionctx.Context, error) {
sessCtx := newReorgSessCtx(store)
if err := initSessCtx(sessCtx, sqlMode, tzLocation, resourceGroupName); err != nil {
if err := initSessCtx(sessCtx, reorgMeta); err != nil {
return nil, errors.Trace(err)
}
return sessCtx, nil
}

// initSessCtx initializes the session context. Be careful to the timezone.
func initSessCtx(
sessCtx sessionctx.Context,
sqlMode mysql.SQLMode,
tzLocation *model.TimeZoneLocation,
resGroupName string,
) error {
func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) error {
// Correct the initial timezone.
tz := *time.UTC
sessCtx.GetSessionVars().TimeZone = &tz
Expand All @@ -164,25 +151,21 @@ func initSessCtx(
rowFormat := variable.GetDDLReorgRowFormat()
sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1
// Simulate the sql mode environment in the worker sessionCtx.
sqlMode := reorgMeta.SQLMode
sessCtx.GetSessionVars().SQLMode = sqlMode
if err := setSessCtxLocation(sessCtx, tzLocation); err != nil {
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
if err != nil {
return errors.Trace(err)
}
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(sessCtx.GetSessionVars().Location())
sessCtx.GetSessionVars().TimeZone = loc
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(loc)

errLevels := sessCtx.GetSessionVars().StmtCtx.ErrLevels()
errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !sqlMode.HasStrictMode())
errLevels[errctx.ErrGroupDividedByZero] =
errctx.ResolveErrLevel(!sqlMode.HasErrorForDivisionByZeroMode(), !sqlMode.HasStrictMode())
errLevels := reorgErrLevelsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetErrLevels(errLevels)

typeFlags := types.StrictFlags.
WithTruncateAsWarning(!sqlMode.HasStrictMode()).
WithIgnoreInvalidDateErr(sqlMode.HasAllowInvalidDatesMode()).
WithIgnoreZeroInDate(!sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode()).
WithCastTimeToYearThroughConcat(true)
typeFlags := reorgTypeFlagsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags)
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = resGroupName
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName

// Prevent initializing the mock context in the workers concurrently.
// For details, see https://github.com/pingcap/tidb/issues/40879.
Expand Down Expand Up @@ -235,7 +218,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
workerCnt := b.expectedWorkerSize()
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta)
if err != nil {
return err
}
Expand Down Expand Up @@ -472,16 +455,10 @@ func (b *ingestBackfillScheduler) createWorker(
) workerpool.Worker[IndexRecordChunk, workerpool.None] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
b.sendResult(&backfillResult{err: err})
return nil
}

worker, err := newAddIndexIngestWorker(
b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID,
reorgInfo.SchemaName, indexIDs, b.writerMaxID,
b.copReqSenderPool, sessCtx, b.checkpointMgr)
b.ctx, b.tbl, reorgInfo, engines, b.resultCh, job.ID,
indexIDs, b.writerMaxID,
b.copReqSenderPool, b.checkpointMgr)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
Expand All @@ -508,7 +485,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
}
allIndexInfos = append(allIndexInfos, indexInfo)
}
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location, ri.ReorgMeta.ResourceGroupName)
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta)
if err != nil {
logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -72,7 +72,7 @@ func ConvertRowToHandleAndIndexDatum(
c := copCtx.GetBase()
idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, stmtctx.NewStmtCtxWithTimeZone(time.Local))
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, time.Local, errctx.StrictNoWarningContext)
return handle, idxData, err
}

Expand Down
47 changes: 30 additions & 17 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/common"
Expand All @@ -50,7 +51,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
Expand All @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
Expand Down Expand Up @@ -1651,7 +1652,9 @@ type addIndexIngestWorker struct {
ctx context.Context
d *ddlCtx
metricCounter prometheus.Counter
sessCtx sessionctx.Context
writeLoc *time.Location
writeErrCtx errctx.Context
writeStmtBufs variable.WriteStmtBufs

tbl table.PhysicalTable
indexes []table.Index
Expand All @@ -1666,15 +1669,13 @@ type addIndexIngestWorker struct {
func newAddIndexIngestWorker(
ctx context.Context,
t table.PhysicalTable,
d *ddlCtx,
info *reorgInfo,
engines []ingest.Engine,
resultCh chan *backfillResult,
jobID int64,
schemaName string,
indexIDs []int64,
writerID int,
copReqSenderPool *copReqSenderPool,
sessCtx sessionctx.Context,
checkpointMgr *ingest.CheckpointManager,
) (*addIndexIngestWorker, error) {
indexes := make([]table.Index, 0, len(indexIDs))
Expand All @@ -1690,12 +1691,23 @@ func newAddIndexIngestWorker(
writers = append(writers, lw)
}

writeLoc, err := reorgTimeZoneWithTzLoc(info.ReorgMeta.Location)
if err != nil {
return nil, err
}

writeErrCtx := errctx.NewContextWithLevels(
reorgErrLevelsWithSQLMode(info.ReorgMeta.SQLMode),
contextutil.IgnoreWarn,
)

return &addIndexIngestWorker{
ctx: ctx,
d: d,
sessCtx: sessCtx,
ctx: ctx,
d: info.d,
writeLoc: writeLoc,
writeErrCtx: writeErrCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)),
metrics.GenerateReorgLabel("add_idx_rate", info.SchemaName, t.Meta().Name.O)),
tbl: t,
indexes: indexes,
writers: writers,
Expand All @@ -1710,9 +1722,8 @@ func newAddIndexIngestWorker(
func (w *addIndexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
oprStartTime := time.Now()
copCtx := w.copReqSenderPool.copCtx
vars := w.sessCtx.GetSessionVars()
cnt, lastHandle, err := writeChunkToLocal(
w.ctx, w.writers, w.indexes, copCtx, vars, rs.Chunk)
w.ctx, w.writers, w.indexes, copCtx, w.writeLoc, w.writeErrCtx, &w.writeStmtBufs, rs.Chunk)
if err != nil || cnt == 0 {
return 0, nil, err
}
Expand All @@ -1727,10 +1738,11 @@ func writeChunkToLocal(
writers []ingest.Writer,
indexes []table.Index,
copCtx copr.CopContext,
vars *variable.SessionVars,
loc *time.Location,
errCtx errctx.Context,
writeStmtBufs *variable.WriteStmtBufs,
copChunk *chunk.Chunk,
) (int, kv.Handle, error) {
sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs()
iter := chunk.NewIterator4Chunk(copChunk)
c := copCtx.GetBase()

Expand Down Expand Up @@ -1772,7 +1784,7 @@ func writeChunkToLocal(
restoreDataBuf[i] = *datum.Clone()
}
}
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx)
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, loc, errCtx)
if err != nil {
return 0, nil, errors.Trace(err)
}
Expand All @@ -1785,7 +1797,7 @@ func writeChunkToLocal(
if needRestoreForIndexes[i] {
rsData = getRestoreData(c.TableInfo, copCtx.IndexInfo(idxID), c.PrimaryKeyInfo, restoreDataBuf)
}
err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxData, rsData, h)
err = writeOneKVToLocal(ctx, writers[i], index, loc, errCtx, writeStmtBufs, idxData, rsData, h)
if err != nil {
return 0, nil, errors.Trace(err)
}
Expand All @@ -1811,12 +1823,13 @@ func writeOneKVToLocal(
ctx context.Context,
writer ingest.Writer,
index table.Index,
sCtx *stmtctx.StatementContext,
loc *time.Location,
errCtx errctx.Context,
writeBufs *variable.WriteStmtBufs,
idxDt, rsData []types.Datum,
handle kv.Handle,
) error {
iter := index.GenIndexKVIter(sCtx.ErrCtx(), sCtx.TimeZone(), idxDt, handle, rsData)
iter := index.GenIndexKVIter(errCtx, loc, idxDt, handle, rsData)
for iter.Valid() {
key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf, writeBufs.RowValBuf)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"context"
"encoding/hex"
"github.com/pingcap/tidb/pkg/errctx"
"sync"
"time"

Expand All @@ -32,7 +33,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
Expand Down Expand Up @@ -379,11 +379,11 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C
}

func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) {
pkInfo *model.IndexInfo, loc *time.Location, errCtx errctx.Context) (kv.Handle, error) {
if tblInfo.IsCommonHandle {
tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts)
handleBytes, err := codec.EncodeKey(stmtCtx.TimeZone(), nil, pkDts...)
err = stmtCtx.HandleError(err)
handleBytes, err := codec.EncodeKey(loc, nil, pkDts...)
err = errCtx.HandleError(err)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 3b92ef4

Please sign in to comment.