diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 59415e9f4bca0..786e0b66cccf0 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -133,6 +133,7 @@ go_library( "//pkg/store/driver/txn", "//pkg/store/helper", "//pkg/table", + "//pkg/table/context", "//pkg/table/tables", "//pkg/tablecodec", "//pkg/tidb-binlog/pump_client", diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 83c9047bb0fe2..54d1597e253dd 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -33,12 +33,12 @@ import ( "github.com/pingcap/tidb/pkg/disttask/operator" "github.com/pingcap/tidb/pkg/expression" exprctx "github.com/pingcap/tidb/pkg/expression/context" + "github.com/pingcap/tidb/pkg/expression/contextstatic" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/metrics" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" @@ -155,7 +155,6 @@ type backfillTaskContext struct { type backfillCtx struct { id int *ddlCtx - sessCtx sessionctx.Context warnings contextutil.WarnHandlerExt loc *time.Location exprCtx exprctx.BuildContext @@ -168,26 +167,40 @@ type backfillCtx struct { } func newBackfillCtx(id int, rInfo *reorgInfo, - schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isDistributed bool) (*backfillCtx, error) { - // TODO: remove newReorgSessCtx - sessCtx := newReorgSessCtx(rInfo.jobCtx.store) - if err := initSessCtx(sessCtx, rInfo.ReorgMeta); err != nil { + schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isDistributed bool, isUpdateColumn bool) (*backfillCtx, error) { + warnHandler := contextutil.NewStaticWarnHandler(0) + exprCtx, err := newReorgExprCtxWithReorgMeta(rInfo.ReorgMeta, warnHandler) + if err != nil { return nil, errors.Trace(err) } + if isUpdateColumn { + // The below case should be compatible with mysql behavior: + // > create table t (a int); + // > insert into t values (0); + // > alter table t modify column a date; + // The alter DDL should return an error in strict mode and success in non-strict mode. + // See: https://github.com/pingcap/tidb/pull/25728 for more details. + hasStrictMode := rInfo.ReorgMeta.SQLMode.HasStrictMode() + tc := exprCtx.GetStaticEvalCtx().TypeCtx() + evalCtx := exprCtx.GetStaticEvalCtx().Apply(contextstatic.WithTypeFlags( + tc.Flags().WithIgnoreZeroDateErr(!hasStrictMode), + )) + exprCtx = exprCtx.Apply(contextstatic.WithEvalCtx(evalCtx)) + } + + tblCtx := newReorgTableMutateContext(exprCtx) if isDistributed { id = int(backfillContextID.Add(1)) } - exprCtx := sessCtx.GetExprCtx() batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) return &backfillCtx{ id: id, ddlCtx: rInfo.jobCtx.oldDDLCtx, - sessCtx: sessCtx, - warnings: sessCtx.GetSessionVars().StmtCtx.WarnHandler, + warnings: warnHandler, exprCtx: exprCtx, - tblCtx: sessCtx.GetTableCtx(), + tblCtx: tblCtx, loc: exprCtx.GetEvalCtx().Location(), schemaName: schemaName, table: tbl, diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index b25f1502935ee..45f5ba1f99151 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" - "github.com/pingcap/tidb/pkg/util/mock" "github.com/pingcap/tidb/pkg/util/ppcpuusage" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" "github.com/pingcap/tidb/pkg/util/sqlkiller" @@ -220,12 +219,6 @@ func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) erro typeFlags := reorgTypeFlagsWithSQLMode(sqlMode) sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags) sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName - - // Prevent initializing the mock context in the workers concurrently. - // For details, see https://github.com/pingcap/tidb/issues/40879. - if _, ok := sessCtx.(*mock.Context); ok { - _ = sessCtx.GetDomainInfoSchema() - } return nil } @@ -278,7 +271,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { ) switch b.tp { case typeAddIndexWorker: - backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false) + backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false, false) if err != nil { return err } @@ -291,7 +284,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { runner = newBackfillWorker(b.ctx, idxWorker) worker = idxWorker case typeAddIndexMergeTmpWorker: - backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false) + backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false, false) if err != nil { return err } diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index c9edab74f16d7..f4ed3e9859b29 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -17,6 +17,7 @@ package ddl import ( "bytes" "testing" + "time" "github.com/pingcap/tidb/pkg/ddl/ingest" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" @@ -27,6 +28,8 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/privilege" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/deeptest" @@ -224,13 +227,25 @@ func assertStaticExprContextEqual(t *testing.T, sctx sessionctx.Context, exprCtx ) } +// newMockReorgSessCtx creates a mock session context for reorg test. +// In old implementations, DDL is using `mock.Context` to construct the contexts used in DDL reorg. +// After refactoring, we just need it to do test the new implementation is compatible with the old one. +func newMockReorgSessCtx(store kv.Storage) sessionctx.Context { + c := mock.NewContext() + c.Store = store + c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false) + tz := *time.UTC + c.ResetSessionAndStmtTimeZone(&tz) + return c +} + // TestReorgExprContext is used in refactor stage to make sure the newReorgExprCtx() is -// compatible with newReorgSessCtx(nil).GetExprCtx() to make it safe to replace `mock.Context` usage. +// compatible with newMockReorgSessCtx(nil).GetExprCtx() to make it safe to replace `mock.Context` usage. // After refactor, the TestReorgExprContext can be removed. func TestReorgExprContext(t *testing.T) { // test default expr context store := &mockStorage{client: &mock.Client{}} - sctx := newReorgSessCtx(store) + sctx := newMockReorgSessCtx(store) defaultCtx := newReorgExprCtx() // should use an empty static warn handler by default evalCtx := defaultCtx.GetStaticEvalCtx() @@ -255,7 +270,7 @@ func TestReorgExprContext(t *testing.T) { ResourceGroupName: "rg2", }, } { - sctx = newReorgSessCtx(store) + sctx = newMockReorgSessCtx(store) require.NoError(t, initSessCtx(sctx, &reorg)) ctx, err := newReorgExprCtxWithReorgMeta(&reorg, sctx.GetSessionVars().StmtCtx.WarnHandler) require.NoError(t, err) @@ -278,6 +293,68 @@ func TestReorgExprContext(t *testing.T) { } } +func TestReorgTableMutateContext(t *testing.T) { + originalRowFmt := variable.GetDDLReorgRowFormat() + defer variable.SetDDLReorgRowFormat(originalRowFmt) + + exprCtx := contextstatic.NewStaticExprContext() + + assertTblCtxMatchSessionCtx := func(ctx table.MutateContext, sctx sessionctx.Context) { + sctxTblCtx := sctx.GetTableCtx() + require.Equal(t, uint64(0), ctx.ConnectionID()) + require.Equal(t, sctxTblCtx.ConnectionID(), ctx.ConnectionID()) + + require.False(t, ctx.InRestrictedSQL()) + require.Equal(t, sctxTblCtx.InRestrictedSQL(), ctx.InRestrictedSQL()) + + require.Equal(t, variable.AssertionLevelOff, ctx.TxnAssertionLevel()) + require.Equal(t, sctxTblCtx.TxnAssertionLevel(), ctx.TxnAssertionLevel()) + + require.Equal(t, variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1, ctx.GetRowEncodingConfig().IsRowLevelChecksumEnabled) + require.Equal(t, variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1, ctx.GetRowEncodingConfig().RowEncoder.Enable) + require.Equal(t, sctxTblCtx.GetRowEncodingConfig(), ctx.GetRowEncodingConfig()) + + require.NotNil(t, ctx.GetMutateBuffers()) + require.Equal(t, sctxTblCtx.GetMutateBuffers(), ctx.GetMutateBuffers()) + + require.Equal(t, variable.DefTiDBShardAllocateStep, ctx.GetRowIDShardGenerator().GetShardStep()) + sctx.GetSessionVars().TxnCtx.StartTS = 123 // make sure GetRowIDShardGenerator() pass assert + require.Equal(t, sctxTblCtx.GetRowIDShardGenerator().GetShardStep(), ctx.GetRowIDShardGenerator().GetShardStep()) + require.GreaterOrEqual(t, ctx.GetRowIDShardGenerator().GetCurrentShard(1), int64(0)) + + alloc1, ok := sctxTblCtx.GetReservedRowIDAlloc() + require.True(t, ok) + alloc2, ok := ctx.GetReservedRowIDAlloc() + require.True(t, ok) + require.Equal(t, alloc1, alloc2) + require.True(t, alloc2.Exhausted()) + + binlog, ok := ctx.GetBinlogSupport() + require.False(t, ok) + require.Nil(t, binlog) + statistics, ok := ctx.GetStatisticsSupport() + require.False(t, ok) + require.Nil(t, statistics) + cached, ok := ctx.GetCachedTableSupport() + require.False(t, ok) + require.Nil(t, cached) + temp, ok := ctx.GetTemporaryTableSupport() + require.False(t, ok) + require.Nil(t, temp) + dml, ok := ctx.GetExchangePartitionDMLSupport() + require.False(t, ok) + require.Nil(t, dml) + } + + // test when the row format is v1 + variable.SetDDLReorgRowFormat(variable.DefTiDBRowFormatV1) + sctx := newMockReorgSessCtx(&mockStorage{client: &mock.Client{}}) + require.NoError(t, initSessCtx(sctx, &model.DDLReorgMeta{})) + ctx := newReorgTableMutateContext(exprCtx) + require.Same(t, exprCtx, ctx.GetExprCtx()) + assertTblCtxMatchSessionCtx(ctx, sctx) +} + type mockStorage struct { kv.Storage client kv.Client @@ -313,13 +390,13 @@ func assertDistSQLCtxEqual(t *testing.T, expected *distsqlctx.DistSQLContext, ac } // TestReorgExprContext is used in refactor stage to make sure the newDefaultReorgDistSQLCtx() is -// compatible with newReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage. +// compatible with newMockReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage. // After refactor, the TestReorgExprContext can be removed. func TestReorgDistSQLCtx(t *testing.T) { store := &mockStorage{client: &mock.Client{}} // test default dist sql context - expected := newReorgSessCtx(store).GetDistSQLCtx() + expected := newMockReorgSessCtx(store).GetDistSQLCtx() defaultCtx := newDefaultReorgDistSQLCtx(store.client, expected.WarnHandler) assertDistSQLCtxEqual(t, expected, defaultCtx) @@ -339,7 +416,7 @@ func TestReorgDistSQLCtx(t *testing.T) { ResourceGroupName: "rg2", }, } { - sctx := newReorgSessCtx(store) + sctx := newMockReorgSessCtx(store) require.NoError(t, initSessCtx(sctx, &reorg)) expected = sctx.GetDistSQLCtx() ctx, err := newReorgDistSQLCtxWithReorgMeta(store.client, &reorg, expected.WarnHandler) diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index 3c02b2e48464a..ee2e1304ebae5 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -606,18 +606,11 @@ type updateColumnWorker struct { } func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*updateColumnWorker, error) { - bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false) + bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false, true) if err != nil { return nil, err } - sessCtx := bCtx.sessCtx - sessCtx.GetSessionVars().StmtCtx.SetTypeFlags( - sessCtx.GetSessionVars().StmtCtx.TypeFlags(). - WithIgnoreZeroDateErr(!reorgInfo.ReorgMeta.SQLMode.HasStrictMode())) - bCtx.exprCtx = bCtx.sessCtx.GetExprCtx() - bCtx.tblCtx = bCtx.sessCtx.GetTableCtx() - if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { logutil.DDLLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query), zap.Stringer("reorgInfo", reorgInfo)) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index b466905f3ea59..897f788c26542 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2398,7 +2398,7 @@ type cleanUpIndexWorker struct { } func newCleanUpIndexWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*cleanUpIndexWorker, error) { - bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false) + bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false, false) if err != nil { return nil, err } diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 3d769b6467e75..2fbf6d930a2e4 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -3602,7 +3602,7 @@ type reorgPartitionWorker struct { } func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) { - bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false) + bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false, false) if err != nil { return nil, err } diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 6bc23428d7cb6..affad43013ccd 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -18,6 +18,7 @@ import ( "context" "encoding/hex" "fmt" + "math/rand" "strconv" "strings" "sync" @@ -32,17 +33,21 @@ import ( "github.com/pingcap/tidb/pkg/distsql" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" + exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/expression/contextstatic" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/table" + tbctx "github.com/pingcap/tidb/pkg/table/context" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" @@ -53,6 +58,7 @@ import ( "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/mock" "github.com/pingcap/tidb/pkg/util/ranger" + "github.com/pingcap/tidb/pkg/util/rowcodec" "github.com/pingcap/tidb/pkg/util/timeutil" "github.com/pingcap/tipb/go-tipb" atomicutil "go.uber.org/atomic" @@ -121,6 +127,134 @@ func newReorgExprCtxWithReorgMeta(reorgMeta *model.DDLReorgMeta, warnHandler con return ctx.Apply(contextstatic.WithEvalCtx(evalCtx)), nil } +// reorgTableMutateContext implements table.MutateContext for reorganization. +type reorgTableMutateContext struct { + exprCtx exprctx.ExprContext + encodingConfig tbctx.RowEncodingConfig + mutateBuffers *tbctx.MutateBuffers + shardID *variable.RowIDShardGenerator + reservedRowIDAlloc stmtctx.ReservedRowIDAlloc +} + +// AlternativeAllocators implements table.MutateContext.AlternativeAllocators. +func (*reorgTableMutateContext) AlternativeAllocators(*model.TableInfo) (autoid.Allocators, bool) { + // No alternative allocators for all tables because temporary tables + // are not supported (temporary tables do not have any data in TiKV) in reorganization. + return autoid.Allocators{}, false +} + +// GetExprCtx implements table.MutateContext.GetExprCtx. +func (ctx *reorgTableMutateContext) GetExprCtx() exprctx.ExprContext { + return ctx.exprCtx +} + +// ConnectionID implements table.MutateContext.ConnectionID. +func (*reorgTableMutateContext) ConnectionID() uint64 { + return 0 +} + +// InRestrictedSQL implements table.MutateContext.InRestrictedSQL. +func (*reorgTableMutateContext) InRestrictedSQL() bool { + return false +} + +// TxnAssertionLevel implements table.MutateContext.TxnAssertionLevel. +func (*reorgTableMutateContext) TxnAssertionLevel() variable.AssertionLevel { + // Because only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method, + // we can just return `AssertionLevelOff`. + return variable.AssertionLevelOff +} + +// EnableMutationChecker implements table.MutateContext.EnableMutationChecker. +func (*reorgTableMutateContext) EnableMutationChecker() bool { + // Because only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method, + // we can just return false. + return false +} + +// GetRowEncodingConfig implements table.MutateContext.GetRowEncodingConfig. +func (ctx *reorgTableMutateContext) GetRowEncodingConfig() tbctx.RowEncodingConfig { + return ctx.encodingConfig +} + +// GetMutateBuffers implements table.MutateContext.GetMutateBuffers. +func (ctx *reorgTableMutateContext) GetMutateBuffers() *tbctx.MutateBuffers { + return ctx.mutateBuffers +} + +// GetRowIDShardGenerator implements table.MutateContext.GetRowIDShardGenerator. +func (ctx *reorgTableMutateContext) GetRowIDShardGenerator() *variable.RowIDShardGenerator { + return ctx.shardID +} + +// GetReservedRowIDAlloc implements table.MutateContext.GetReservedRowIDAlloc. +func (ctx *reorgTableMutateContext) GetReservedRowIDAlloc() (*stmtctx.ReservedRowIDAlloc, bool) { + return &ctx.reservedRowIDAlloc, true +} + +// GetBinlogSupport implements table.MutateContext.GetBinlogSupport. +func (*reorgTableMutateContext) GetBinlogSupport() (tbctx.BinlogSupport, bool) { + // We can just return `(nil, false)` because: + // - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method. + // - Data change in DDL reorganization should not write binlog. + return nil, false +} + +// GetStatisticsSupport implements table.MutateContext.GetStatisticsSupport. +func (*reorgTableMutateContext) GetStatisticsSupport() (tbctx.StatisticsSupport, bool) { + // We can just return `(nil, false)` because: + // - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method. + // - DDL reorg do need to collect statistics in this way. + return nil, false +} + +// GetCachedTableSupport implements table.MutateContext.GetCachedTableSupport. +func (*reorgTableMutateContext) GetCachedTableSupport() (tbctx.CachedTableSupport, bool) { + // We can just return `(nil, false)` because: + // - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method. + // - It is not allowed to execute DDL on a cached table. + return nil, false +} + +// GetTemporaryTableSupport implements table.MutateContext.GetTemporaryTableSupport. +func (*reorgTableMutateContext) GetTemporaryTableSupport() (tbctx.TemporaryTableSupport, bool) { + // We can just return `(nil, false)` because: + // - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method. + // - Temporary tables do not have any data in TiKV. + return nil, false +} + +// GetExchangePartitionDMLSupport implements table.MutateContext.GetExchangePartitionDMLSupport. +func (*reorgTableMutateContext) GetExchangePartitionDMLSupport() (tbctx.ExchangePartitionDMLSupport, bool) { + // We can just return `(nil, false)` because: + // - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method. + return nil, false +} + +// newReorgTableMutateContext creates a new table.MutateContext for reorganization. +func newReorgTableMutateContext(exprCtx exprctx.ExprContext) table.MutateContext { + rowEncoder := &rowcodec.Encoder{ + Enable: variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1, + } + + encodingConfig := tbctx.RowEncodingConfig{ + IsRowLevelChecksumEnabled: rowEncoder.Enable, + RowEncoder: rowEncoder, + } + + return &reorgTableMutateContext{ + exprCtx: exprCtx, + encodingConfig: encodingConfig, + mutateBuffers: tbctx.NewMutateBuffers(&variable.WriteStmtBufs{}), + // Though currently, `RowIDShardGenerator` is not required in DDL reorg, + // we still provide a valid one to keep the context complete and to avoid panic if it is used in the future. + shardID: variable.NewRowIDShardGenerator( + rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404 + variable.DefTiDBShardAllocateStep, + ), + } +} + func reorgTypeFlagsWithSQLMode(mode mysql.SQLMode) types.Flags { return types.StrictFlags. WithTruncateAsWarning(!mode.HasStrictMode()). @@ -148,17 +282,6 @@ func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, erro return tzLoc.GetLocation() } -func newReorgSessCtx(store kv.Storage) sessionctx.Context { - c := mock.NewContext() - c.Store = store - c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false) - - tz := *time.UTC - c.GetSessionVars().TimeZone = &tz - c.GetSessionVars().StmtCtx.SetTimeZone(&tz) - return c -} - // ReorgWaitTimeout is the timeout that wait ddl in write reorganization stage. // make it a var for testing. var ReorgWaitTimeout = 5 * time.Second diff --git a/pkg/table/context/BUILD.bazel b/pkg/table/context/BUILD.bazel index 21fe8ee0715ee..5eb49d4315d0d 100644 --- a/pkg/table/context/BUILD.bazel +++ b/pkg/table/context/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/tablecodec", "//pkg/types", "//pkg/util/chunk", + "//pkg/util/intest", "//pkg/util/rowcodec", "//pkg/util/tableutil", "@com_github_pingcap_tipb//go-binlog", diff --git a/pkg/table/context/buffers.go b/pkg/table/context/buffers.go index 12e6722947aa5..cc42e4bbcbec4 100644 --- a/pkg/table/context/buffers.go +++ b/pkg/table/context/buffers.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/rowcodec" ) @@ -156,6 +157,7 @@ type MutateBuffers struct { // NewMutateBuffers creates a new `MutateBuffers`. func NewMutateBuffers(stmtBufs *variable.WriteStmtBufs) *MutateBuffers { + intest.AssertNotNil(stmtBufs) return &MutateBuffers{ stmtBufs: stmtBufs, encodeRow: &EncodeRowBuffer{