diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 50fde4c6d1055..ff8275ec448df 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -276,6 +276,7 @@ go_test( "//pkg/disttask/framework/storage", "//pkg/domain", "//pkg/domain/infosync", + "//pkg/errctx", "//pkg/errno", "//pkg/executor", "//pkg/infoschema", @@ -296,7 +297,6 @@ go_test( "//pkg/session", "//pkg/session/types", "//pkg/sessionctx", - "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/sessiontxn", "//pkg/store/gcworker", diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 36493c7fa61cd..47061010b8e26 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/dbterror" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" - "github.com/pingcap/tidb/pkg/util/timeutil" "github.com/pingcap/tidb/pkg/util/topsql" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/tikv" @@ -543,24 +542,6 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co return decodeColMap, nil } -func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error { - // It is set to SystemLocation to be compatible with nil LocationInfo. - tz := *timeutil.SystemLocation() - if sctx.GetSessionVars().TimeZone == nil { - sctx.GetSessionVars().TimeZone = &tz - } else { - *sctx.GetSessionVars().TimeZone = tz - } - if tzLocation != nil { - loc, err := tzLocation.GetLocation() - if err != nil { - return errors.Trace(err) - } - *sctx.GetSessionVars().TimeZone = *loc - } - return nil -} - var backfillTaskChanSize = 128 // SetBackfillTaskChanSizeForTest is only used for test. diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 1b29c1c4ae336..65e1e6fc11804 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -714,10 +714,7 @@ func (w *indexIngestBaseWorker) initSessCtx() { return } w.restore = restoreSessCtx(sessCtx) - if err := initSessCtx(sessCtx, - w.reorgMeta.SQLMode, - w.reorgMeta.Location, - w.reorgMeta.ResourceGroupName); err != nil { + if err := initSessCtx(sessCtx, w.reorgMeta); err != nil { w.ctx.onError(err) return } @@ -755,7 +752,8 @@ func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nex oprStartTime := time.Now() vars := w.se.GetSessionVars() - cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, vars, rs.Chunk) + sc := vars.StmtCtx + cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk) if err != nil || cnt == 0 { return 0, nil, err } diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index aee305cfaab8f..039ecb378e6ed 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -105,8 +105,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta return err } - sessCtx, err := newSessCtx( - r.d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName) + sessCtx, err := newSessCtx(r.d.store, r.job.ReorgMeta) if err != nil { return err } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 69ab527f0cedc..90114013a4303 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -26,17 +26,14 @@ import ( "github.com/pingcap/tidb/pkg/ddl/ingest" sess "github.com/pingcap/tidb/pkg/ddl/internal/session" ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil" - "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" poolutil "github.com/pingcap/tidb/pkg/resourcemanager/util" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" - "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/intest" @@ -135,26 +132,16 @@ func (b *txnBackfillScheduler) resultChan() <-chan *backfillResult { return b.resultCh } -func newSessCtx( - store kv.Storage, - sqlMode mysql.SQLMode, - tzLocation *model.TimeZoneLocation, - resourceGroupName string, -) (sessionctx.Context, error) { +func newSessCtx(store kv.Storage, reorgMeta *model.DDLReorgMeta) (sessionctx.Context, error) { sessCtx := newReorgSessCtx(store) - if err := initSessCtx(sessCtx, sqlMode, tzLocation, resourceGroupName); err != nil { + if err := initSessCtx(sessCtx, reorgMeta); err != nil { return nil, errors.Trace(err) } return sessCtx, nil } // initSessCtx initializes the session context. Be careful to the timezone. -func initSessCtx( - sessCtx sessionctx.Context, - sqlMode mysql.SQLMode, - tzLocation *model.TimeZoneLocation, - resGroupName string, -) error { +func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) error { // Correct the initial timezone. tz := *time.UTC sessCtx.GetSessionVars().TimeZone = &tz @@ -164,25 +151,21 @@ func initSessCtx( rowFormat := variable.GetDDLReorgRowFormat() sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1 // Simulate the sql mode environment in the worker sessionCtx. + sqlMode := reorgMeta.SQLMode sessCtx.GetSessionVars().SQLMode = sqlMode - if err := setSessCtxLocation(sessCtx, tzLocation); err != nil { + loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location) + if err != nil { return errors.Trace(err) } - sessCtx.GetSessionVars().StmtCtx.SetTimeZone(sessCtx.GetSessionVars().Location()) + sessCtx.GetSessionVars().TimeZone = loc + sessCtx.GetSessionVars().StmtCtx.SetTimeZone(loc) - errLevels := sessCtx.GetSessionVars().StmtCtx.ErrLevels() - errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !sqlMode.HasStrictMode()) - errLevels[errctx.ErrGroupDividedByZero] = - errctx.ResolveErrLevel(!sqlMode.HasErrorForDivisionByZeroMode(), !sqlMode.HasStrictMode()) + errLevels := reorgErrLevelsWithSQLMode(sqlMode) sessCtx.GetSessionVars().StmtCtx.SetErrLevels(errLevels) - typeFlags := types.StrictFlags. - WithTruncateAsWarning(!sqlMode.HasStrictMode()). - WithIgnoreInvalidDateErr(sqlMode.HasAllowInvalidDatesMode()). - WithIgnoreZeroInDate(!sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode()). - WithCastTimeToYearThroughConcat(true) + typeFlags := reorgTypeFlagsWithSQLMode(sqlMode) sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags) - sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = resGroupName + sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName // Prevent initializing the mock context in the workers concurrently. // For details, see https://github.com/pingcap/tidb/issues/40879. @@ -235,7 +218,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { workerCnt := b.expectedWorkerSize() // Increase the worker. for i := len(b.workers); i < workerCnt; i++ { - sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName) + sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta) if err != nil { return err } @@ -472,16 +455,10 @@ func (b *ingestBackfillScheduler) createWorker( ) workerpool.Worker[IndexRecordChunk, workerpool.None] { reorgInfo := b.reorgInfo job := reorgInfo.Job - sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName) - if err != nil { - b.sendResult(&backfillResult{err: err}) - return nil - } - worker, err := newAddIndexIngestWorker( - b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID, - reorgInfo.SchemaName, indexIDs, b.writerMaxID, - b.copReqSenderPool, sessCtx, b.checkpointMgr) + b.ctx, b.tbl, reorgInfo, engines, b.resultCh, job.ID, + indexIDs, b.writerMaxID, + b.copReqSenderPool, b.checkpointMgr) if err != nil { // Return an error only if it is the first worker. if b.writerMaxID == 0 { @@ -508,7 +485,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e } allIndexInfos = append(allIndexInfos, indexInfo) } - sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location, ri.ReorgMeta.ResourceGroupName) + sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta) if err != nil { logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err)) return nil, err diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 6625aef25d8b4..ab1703bf22230 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -21,8 +21,8 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" @@ -72,7 +72,7 @@ func ConvertRowToHandleAndIndexDatum( c := copCtx.GetBase() idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf) handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf) - handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, stmtctx.NewStmtCtxWithTimeZone(time.Local)) + handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, time.Local, errctx.StrictNoWarningContext) return handle, idxData, err } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 94f6297ca96fa..0987cc11f624d 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/common" @@ -50,7 +51,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/pkg/util/backoff" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" + contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror" tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" @@ -1651,7 +1652,9 @@ type addIndexIngestWorker struct { ctx context.Context d *ddlCtx metricCounter prometheus.Counter - sessCtx sessionctx.Context + writeLoc *time.Location + writeErrCtx errctx.Context + writeStmtBufs variable.WriteStmtBufs tbl table.PhysicalTable indexes []table.Index @@ -1666,15 +1669,13 @@ type addIndexIngestWorker struct { func newAddIndexIngestWorker( ctx context.Context, t table.PhysicalTable, - d *ddlCtx, + info *reorgInfo, engines []ingest.Engine, resultCh chan *backfillResult, jobID int64, - schemaName string, indexIDs []int64, writerID int, copReqSenderPool *copReqSenderPool, - sessCtx sessionctx.Context, checkpointMgr *ingest.CheckpointManager, ) (*addIndexIngestWorker, error) { indexes := make([]table.Index, 0, len(indexIDs)) @@ -1690,12 +1691,23 @@ func newAddIndexIngestWorker( writers = append(writers, lw) } + writeLoc, err := reorgTimeZoneWithTzLoc(info.ReorgMeta.Location) + if err != nil { + return nil, err + } + + writeErrCtx := errctx.NewContextWithLevels( + reorgErrLevelsWithSQLMode(info.ReorgMeta.SQLMode), + contextutil.IgnoreWarn, + ) + return &addIndexIngestWorker{ - ctx: ctx, - d: d, - sessCtx: sessCtx, + ctx: ctx, + d: info.d, + writeLoc: writeLoc, + writeErrCtx: writeErrCtx, metricCounter: metrics.BackfillTotalCounter.WithLabelValues( - metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)), + metrics.GenerateReorgLabel("add_idx_rate", info.SchemaName, t.Meta().Name.O)), tbl: t, indexes: indexes, writers: writers, @@ -1710,9 +1722,8 @@ func newAddIndexIngestWorker( func (w *addIndexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) { oprStartTime := time.Now() copCtx := w.copReqSenderPool.copCtx - vars := w.sessCtx.GetSessionVars() cnt, lastHandle, err := writeChunkToLocal( - w.ctx, w.writers, w.indexes, copCtx, vars, rs.Chunk) + w.ctx, w.writers, w.indexes, copCtx, w.writeLoc, w.writeErrCtx, &w.writeStmtBufs, rs.Chunk) if err != nil || cnt == 0 { return 0, nil, err } @@ -1727,10 +1738,11 @@ func writeChunkToLocal( writers []ingest.Writer, indexes []table.Index, copCtx copr.CopContext, - vars *variable.SessionVars, + loc *time.Location, + errCtx errctx.Context, + writeStmtBufs *variable.WriteStmtBufs, copChunk *chunk.Chunk, ) (int, kv.Handle, error) { - sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() iter := chunk.NewIterator4Chunk(copChunk) c := copCtx.GetBase() @@ -1772,7 +1784,7 @@ func writeChunkToLocal( restoreDataBuf[i] = *datum.Clone() } } - h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx) + h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, loc, errCtx) if err != nil { return 0, nil, errors.Trace(err) } @@ -1785,7 +1797,7 @@ func writeChunkToLocal( if needRestoreForIndexes[i] { rsData = getRestoreData(c.TableInfo, copCtx.IndexInfo(idxID), c.PrimaryKeyInfo, restoreDataBuf) } - err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxData, rsData, h) + err = writeOneKVToLocal(ctx, writers[i], index, loc, errCtx, writeStmtBufs, idxData, rsData, h) if err != nil { return 0, nil, errors.Trace(err) } @@ -1811,12 +1823,13 @@ func writeOneKVToLocal( ctx context.Context, writer ingest.Writer, index table.Index, - sCtx *stmtctx.StatementContext, + loc *time.Location, + errCtx errctx.Context, writeBufs *variable.WriteStmtBufs, idxDt, rsData []types.Datum, handle kv.Handle, ) error { - iter := index.GenIndexKVIter(sCtx.ErrCtx(), sCtx.TimeZone(), idxDt, handle, rsData) + iter := index.GenIndexKVIter(errCtx, loc, idxDt, handle, rsData) for iter.Valid() { key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf, writeBufs.RowValBuf) if err != nil { diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 557d72ed900f5..3782c8b24f821 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -17,6 +17,7 @@ package ddl import ( "context" "encoding/hex" + "github.com/pingcap/tidb/pkg/errctx" "sync" "time" @@ -32,7 +33,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" @@ -379,11 +379,11 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C } func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, - pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) { + pkInfo *model.IndexInfo, loc *time.Location, errCtx errctx.Context) (kv.Handle, error) { if tblInfo.IsCommonHandle { tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts) - handleBytes, err := codec.EncodeKey(stmtCtx.TimeZone(), nil, pkDts...) - err = stmtCtx.HandleError(err) + handleBytes, err := codec.EncodeKey(loc, nil, pkDts...) + err = errCtx.HandleError(err) if err != nil { return nil, err } diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 7cf13f60fbb01..095be04fb0f2b 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -29,6 +29,7 @@ import ( sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/distsql" + "github.com/pingcap/tidb/pkg/errctx" exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/expression/contextstatic" "github.com/pingcap/tidb/pkg/kv" @@ -49,6 +50,7 @@ import ( "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/mock" "github.com/pingcap/tidb/pkg/util/ranger" + "github.com/pingcap/tidb/pkg/util/timeutil" "github.com/pingcap/tipb/go-tipb" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -88,6 +90,30 @@ func newReorgExprCtx() exprctx.ExprContext { ) } +func reorgTypeFlagsWithSQLMode(mode mysql.SQLMode) types.Flags { + return types.StrictFlags. + WithTruncateAsWarning(!mode.HasStrictMode()). + WithIgnoreInvalidDateErr(mode.HasAllowInvalidDatesMode()). + WithIgnoreZeroInDate(!mode.HasStrictMode() || mode.HasAllowInvalidDatesMode()). + WithCastTimeToYearThroughConcat(true) +} + +func reorgErrLevelsWithSQLMode(mode mysql.SQLMode) errctx.LevelMap { + levels := stmtctx.DefaultStmtErrLevels + levels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !mode.HasStrictMode()) + levels[errctx.ErrGroupDividedByZero] = + errctx.ResolveErrLevel(!mode.HasErrorForDivisionByZeroMode(), !mode.HasStrictMode()) + return levels +} + +func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, error) { + if tzLoc == nil { + // It is set to SystemLocation to be compatible with nil LocationInfo. + return timeutil.SystemLocation(), nil + } + return tzLoc.GetLocation() +} + func newReorgSessCtx(store kv.Storage) sessionctx.Context { c := mock.NewContext() c.Store = store