Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: remove mock.Context usage in addIndexIngestWorker #53479

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ go_test(
"//pkg/disttask/framework/storage",
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/errctx",
"//pkg/errno",
"//pkg/executor",
"//pkg/infoschema",
Expand All @@ -300,7 +301,6 @@ go_test(
"//pkg/session",
"//pkg/session/types",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/store/gcworker",
Expand Down
19 changes: 0 additions & 19 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tidb/pkg/util/topsql"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -543,24 +542,6 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co
return decodeColMap, nil
}

func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error {
// It is set to SystemLocation to be compatible with nil LocationInfo.
tz := *timeutil.SystemLocation()
if sctx.GetSessionVars().TimeZone == nil {
sctx.GetSessionVars().TimeZone = &tz
} else {
*sctx.GetSessionVars().TimeZone = tz
}
if tzLocation != nil {
loc, err := tzLocation.GetLocation()
if err != nil {
return errors.Trace(err)
}
*sctx.GetSessionVars().TimeZone = *loc
}
return nil
}

var backfillTaskChanSize = 128

// SetBackfillTaskChanSizeForTest is only used for test.
Expand Down
8 changes: 3 additions & 5 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,7 @@ func (w *indexIngestBaseWorker) initSessCtx() {
return
}
w.restore = restoreSessCtx(sessCtx)
if err := initSessCtx(sessCtx,
w.reorgMeta.SQLMode,
w.reorgMeta.Location,
w.reorgMeta.ResourceGroupName); err != nil {
if err := initSessCtx(sessCtx, w.reorgMeta); err != nil {
w.ctx.onError(err)
return
}
Expand Down Expand Up @@ -748,7 +745,8 @@ func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nex

oprStartTime := time.Now()
vars := w.se.GetSessionVars()
cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, vars, rs.Chunk)
sc := vars.StmtCtx
cnt, lastHandle, err := writeChunkToLocal(w.ctx, w.writers, w.indexes, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk)
if err != nil || cnt == 0 {
return 0, nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return err
}

sessCtx, err := newSessCtx(
r.d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName)
sessCtx, err := newSessCtx(r.d.store, r.job.ReorgMeta)
if err != nil {
return err
}
Expand Down
54 changes: 16 additions & 38 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
poolutil "github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
Expand Down Expand Up @@ -143,14 +141,9 @@ func (b *txnBackfillScheduler) resultChan() <-chan *backfillResult {
return b.resultCh
}

func newSessCtx(
store kv.Storage,
sqlMode mysql.SQLMode,
tzLocation *model.TimeZoneLocation,
resourceGroupName string,
) (sessionctx.Context, error) {
func newSessCtx(store kv.Storage, reorgMeta *model.DDLReorgMeta) (sessionctx.Context, error) {
sessCtx := newReorgSessCtx(store)
if err := initSessCtx(sessCtx, sqlMode, tzLocation, resourceGroupName); err != nil {
if err := initSessCtx(sessCtx, reorgMeta); err != nil {
return nil, errors.Trace(err)
}
return sessCtx, nil
Expand Down Expand Up @@ -182,12 +175,7 @@ func newDefaultReorgDistSQLCtx(kvClient kv.Client) *distsqlctx.DistSQLContext {
}

// initSessCtx initializes the session context. Be careful to the timezone.
func initSessCtx(
sessCtx sessionctx.Context,
sqlMode mysql.SQLMode,
tzLocation *model.TimeZoneLocation,
resGroupName string,
) error {
func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) error {
// Correct the initial timezone.
tz := *time.UTC
sessCtx.GetSessionVars().TimeZone = &tz
Expand All @@ -197,25 +185,21 @@ func initSessCtx(
rowFormat := variable.GetDDLReorgRowFormat()
sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1
// Simulate the sql mode environment in the worker sessionCtx.
sqlMode := reorgMeta.SQLMode
sessCtx.GetSessionVars().SQLMode = sqlMode
if err := setSessCtxLocation(sessCtx, tzLocation); err != nil {
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
if err != nil {
return errors.Trace(err)
}
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(sessCtx.GetSessionVars().Location())
sessCtx.GetSessionVars().TimeZone = loc
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(loc)

errLevels := sessCtx.GetSessionVars().StmtCtx.ErrLevels()
errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !sqlMode.HasStrictMode())
errLevels[errctx.ErrGroupDividedByZero] =
errctx.ResolveErrLevel(!sqlMode.HasErrorForDivisionByZeroMode(), !sqlMode.HasStrictMode())
errLevels := reorgErrLevelsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetErrLevels(errLevels)

typeFlags := types.StrictFlags.
WithTruncateAsWarning(!sqlMode.HasStrictMode()).
WithIgnoreInvalidDateErr(sqlMode.HasAllowInvalidDatesMode()).
WithIgnoreZeroInDate(!sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode()).
WithCastTimeToYearThroughConcat(true)
typeFlags := reorgTypeFlagsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags)
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = resGroupName
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName

// Prevent initializing the mock context in the workers concurrently.
// For details, see https://github.com/pingcap/tidb/issues/40879.
Expand Down Expand Up @@ -268,7 +252,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
workerCnt := b.expectedWorkerSize()
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta)
if err != nil {
return err
}
Expand Down Expand Up @@ -505,16 +489,10 @@ func (b *ingestBackfillScheduler) createWorker(
) workerpool.Worker[IndexRecordChunk, workerpool.None] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
b.sendResult(&backfillResult{err: err})
return nil
}
Comment on lines -508 to -512
Copy link
Collaborator Author

@lcwangchao lcwangchao May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newAddIndexIngestWorker is only called by createWorker, so we moved the context construction to newAddIndexIngestWorker.

One difference is that if it fails to construct the context, the previous code will always call b.sendResult but the current code will check writerMaxID:

if b.writerMaxID == 0 {
	b.sendResult(&backfillResult{err: err})
	return nil
}

Maybe the previous code lost it? PTAL @tangenta

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, LGTM


worker, err := newAddIndexIngestWorker(
b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID,
reorgInfo.SchemaName, indexIDs, b.writerMaxID,
b.copReqSenderPool, sessCtx, b.checkpointMgr)
b.ctx, b.tbl, reorgInfo, engines, b.resultCh, job.ID,
indexIDs, b.writerMaxID,
b.copReqSenderPool, b.checkpointMgr)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
Expand All @@ -541,7 +519,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
}
allIndexInfos = append(allIndexInfos, indexInfo)
}
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location, ri.ReorgMeta.ResourceGroupName)
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta)
if err != nil {
logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -72,7 +72,7 @@ func ConvertRowToHandleAndIndexDatum(
c := copCtx.GetBase()
idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, stmtctx.NewStmtCtxWithTimeZone(time.Local))
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, time.Local, errctx.StrictNoWarningContext)
return handle, idxData, err
}

Expand Down
47 changes: 30 additions & 17 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/common"
Expand All @@ -50,7 +51,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
Expand All @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
Expand Down Expand Up @@ -1651,7 +1652,9 @@ type addIndexIngestWorker struct {
ctx context.Context
d *ddlCtx
metricCounter prometheus.Counter
sessCtx sessionctx.Context
writeLoc *time.Location
writeErrCtx errctx.Context
writeStmtBufs variable.WriteStmtBufs

tbl table.PhysicalTable
indexes []table.Index
Expand All @@ -1666,17 +1669,20 @@ type addIndexIngestWorker struct {
func newAddIndexIngestWorker(
ctx context.Context,
t table.PhysicalTable,
d *ddlCtx,
info *reorgInfo,
engines []ingest.Engine,
resultCh chan *backfillResult,
jobID int64,
schemaName string,
indexIDs []int64,
writerID int,
copReqSenderPool *copReqSenderPool,
sessCtx sessionctx.Context,
checkpointMgr *ingest.CheckpointManager,
) (*addIndexIngestWorker, error) {
writeLoc, err := reorgTimeZoneWithTzLoc(info.ReorgMeta.Location)
if err != nil {
return nil, err
}

indexes := make([]table.Index, 0, len(indexIDs))
writers := make([]ingest.Writer, 0, len(indexIDs))
for i, indexID := range indexIDs {
Expand All @@ -1690,12 +1696,18 @@ func newAddIndexIngestWorker(
writers = append(writers, lw)
}

writeErrCtx := errctx.NewContextWithLevels(
reorgErrLevelsWithSQLMode(info.ReorgMeta.SQLMode),
contextutil.IgnoreWarn,
)

return &addIndexIngestWorker{
ctx: ctx,
d: d,
sessCtx: sessCtx,
ctx: ctx,
d: info.d,
writeLoc: writeLoc,
writeErrCtx: writeErrCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)),
metrics.GenerateReorgLabel("add_idx_rate", info.SchemaName, t.Meta().Name.O)),
tbl: t,
indexes: indexes,
writers: writers,
Expand All @@ -1710,9 +1722,8 @@ func newAddIndexIngestWorker(
func (w *addIndexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
oprStartTime := time.Now()
copCtx := w.copReqSenderPool.copCtx
vars := w.sessCtx.GetSessionVars()
cnt, lastHandle, err := writeChunkToLocal(
w.ctx, w.writers, w.indexes, copCtx, vars, rs.Chunk)
w.ctx, w.writers, w.indexes, copCtx, w.writeLoc, w.writeErrCtx, &w.writeStmtBufs, rs.Chunk)
if err != nil || cnt == 0 {
return 0, nil, err
}
Expand All @@ -1727,10 +1738,11 @@ func writeChunkToLocal(
writers []ingest.Writer,
indexes []table.Index,
copCtx copr.CopContext,
vars *variable.SessionVars,
loc *time.Location,
errCtx errctx.Context,
writeStmtBufs *variable.WriteStmtBufs,
copChunk *chunk.Chunk,
) (int, kv.Handle, error) {
sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs()
iter := chunk.NewIterator4Chunk(copChunk)
c := copCtx.GetBase()

Expand Down Expand Up @@ -1772,7 +1784,7 @@ func writeChunkToLocal(
restoreDataBuf[i] = *datum.Clone()
}
}
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx)
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, loc, errCtx)
if err != nil {
return 0, nil, errors.Trace(err)
}
Expand All @@ -1785,7 +1797,7 @@ func writeChunkToLocal(
if needRestoreForIndexes[i] {
rsData = getRestoreData(c.TableInfo, copCtx.IndexInfo(idxID), c.PrimaryKeyInfo, restoreDataBuf)
}
err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxData, rsData, h)
err = writeOneKVToLocal(ctx, writers[i], index, loc, errCtx, writeStmtBufs, idxData, rsData, h)
if err != nil {
return 0, nil, errors.Trace(err)
}
Expand All @@ -1811,12 +1823,13 @@ func writeOneKVToLocal(
ctx context.Context,
writer ingest.Writer,
index table.Index,
sCtx *stmtctx.StatementContext,
loc *time.Location,
errCtx errctx.Context,
writeBufs *variable.WriteStmtBufs,
idxDt, rsData []types.Datum,
handle kv.Handle,
) error {
iter := index.GenIndexKVIter(sCtx.ErrCtx(), sCtx.TimeZone(), idxDt, handle, rsData)
iter := index.GenIndexKVIter(errCtx, loc, idxDt, handle, rsData)
for iter.Valid() {
key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf, writeBufs.RowValBuf)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/distsql"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
Expand Down Expand Up @@ -379,11 +379,11 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C
}

func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) {
pkInfo *model.IndexInfo, loc *time.Location, errCtx errctx.Context) (kv.Handle, error) {
if tblInfo.IsCommonHandle {
tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts)
handleBytes, err := codec.EncodeKey(stmtCtx.TimeZone(), nil, pkDts...)
err = stmtCtx.HandleError(err)
handleBytes, err := codec.EncodeKey(loc, nil, pkDts...)
err = errCtx.HandleError(err)
if err != nil {
return nil, err
}
Expand Down
Loading