Skip to content

Commit

Permalink
ddl: remove mock.Context from DDL reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Sep 5, 2024
1 parent fef43c5 commit 7c7458b
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 46 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ go_library(
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/table",
"//pkg/table/context",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/tidb-binlog/pump_client",
Expand Down
33 changes: 23 additions & 10 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import (
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/expression/contextstatic"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -155,7 +155,6 @@ type backfillTaskContext struct {
type backfillCtx struct {
id int
*ddlCtx
sessCtx sessionctx.Context
warnings contextutil.WarnHandlerExt
loc *time.Location
exprCtx exprctx.BuildContext
Expand All @@ -168,26 +167,40 @@ type backfillCtx struct {
}

func newBackfillCtx(id int, rInfo *reorgInfo,
schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isDistributed bool) (*backfillCtx, error) {
// TODO: remove newReorgSessCtx
sessCtx := newReorgSessCtx(rInfo.jobCtx.store)
if err := initSessCtx(sessCtx, rInfo.ReorgMeta); err != nil {
schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isDistributed bool, isUpdateColumn bool) (*backfillCtx, error) {
warnHandler := contextutil.NewStaticWarnHandler(0)
exprCtx, err := newReorgExprCtxWithReorgMeta(rInfo.ReorgMeta, warnHandler)
if err != nil {
return nil, errors.Trace(err)
}

if isUpdateColumn {
// The below case should be compatible with mysql behavior:
// > create table t (a int);
// > insert into t values (0);
// > alter table t modify column a date;
// The alter DDL should return an error in strict mode and success in non-strict mode.
// See: https://github.com/pingcap/tidb/pull/25728 for more details.
hasStrictMode := rInfo.ReorgMeta.SQLMode.HasStrictMode()
tc := exprCtx.GetStaticEvalCtx().TypeCtx()
evalCtx := exprCtx.GetStaticEvalCtx().Apply(contextstatic.WithTypeFlags(
tc.Flags().WithIgnoreZeroDateErr(!hasStrictMode),
))
exprCtx = exprCtx.Apply(contextstatic.WithEvalCtx(evalCtx))
}

tblCtx := newReorgTableMutateContext(exprCtx)
if isDistributed {
id = int(backfillContextID.Add(1))
}

exprCtx := sessCtx.GetExprCtx()
batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
return &backfillCtx{
id: id,
ddlCtx: rInfo.jobCtx.oldDDLCtx,
sessCtx: sessCtx,
warnings: sessCtx.GetSessionVars().StmtCtx.WarnHandler,
warnings: warnHandler,
exprCtx: exprCtx,
tblCtx: sessCtx.GetTableCtx(),
tblCtx: tblCtx,
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
Expand Down
11 changes: 2 additions & 9 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/ppcpuusage"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
Expand Down Expand Up @@ -220,12 +219,6 @@ func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) erro
typeFlags := reorgTypeFlagsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags)
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName

// Prevent initializing the mock context in the workers concurrently.
// For details, see https://github.com/pingcap/tidb/issues/40879.
if _, ok := sessCtx.(*mock.Context); ok {
_ = sessCtx.GetDomainInfoSchema()
}
return nil
}

Expand Down Expand Up @@ -278,7 +271,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false, false)
if err != nil {
return err
}
Expand All @@ -291,7 +284,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
runner = newBackfillWorker(b.ctx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false, false)
if err != nil {
return err
}
Expand Down
89 changes: 83 additions & 6 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"bytes"
"testing"
"time"

"github.com/pingcap/tidb/pkg/ddl/ingest"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/privilege"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/deeptest"
Expand Down Expand Up @@ -224,13 +227,25 @@ func assertStaticExprContextEqual(t *testing.T, sctx sessionctx.Context, exprCtx
)
}

// newMockReorgSessCtx creates a mock session context for reorg test.
// In old implementations, DDL is using `mock.Context` to construct the contexts used in DDL reorg.
// After refactoring, we just need it to do test the new implementation is compatible with the old one.
func newMockReorgSessCtx(store kv.Storage) sessionctx.Context {
c := mock.NewContext()
c.Store = store
c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false)
tz := *time.UTC
c.ResetSessionAndStmtTimeZone(&tz)
return c
}

// TestReorgExprContext is used in refactor stage to make sure the newReorgExprCtx() is
// compatible with newReorgSessCtx(nil).GetExprCtx() to make it safe to replace `mock.Context` usage.
// compatible with newMockReorgSessCtx(nil).GetExprCtx() to make it safe to replace `mock.Context` usage.
// After refactor, the TestReorgExprContext can be removed.
func TestReorgExprContext(t *testing.T) {
// test default expr context
store := &mockStorage{client: &mock.Client{}}
sctx := newReorgSessCtx(store)
sctx := newMockReorgSessCtx(store)
defaultCtx := newReorgExprCtx()
// should use an empty static warn handler by default
evalCtx := defaultCtx.GetStaticEvalCtx()
Expand All @@ -255,7 +270,7 @@ func TestReorgExprContext(t *testing.T) {
ResourceGroupName: "rg2",
},
} {
sctx = newReorgSessCtx(store)
sctx = newMockReorgSessCtx(store)
require.NoError(t, initSessCtx(sctx, &reorg))
ctx, err := newReorgExprCtxWithReorgMeta(&reorg, sctx.GetSessionVars().StmtCtx.WarnHandler)
require.NoError(t, err)
Expand All @@ -278,6 +293,68 @@ func TestReorgExprContext(t *testing.T) {
}
}

func TestReorgTableMutateContext(t *testing.T) {
originalRowFmt := variable.GetDDLReorgRowFormat()
defer variable.SetDDLReorgRowFormat(originalRowFmt)

exprCtx := contextstatic.NewStaticExprContext()

assertTblCtxMatchSessionCtx := func(ctx table.MutateContext, sctx sessionctx.Context) {
sctxTblCtx := sctx.GetTableCtx()
require.Equal(t, uint64(0), ctx.ConnectionID())
require.Equal(t, sctxTblCtx.ConnectionID(), ctx.ConnectionID())

require.False(t, ctx.InRestrictedSQL())
require.Equal(t, sctxTblCtx.InRestrictedSQL(), ctx.InRestrictedSQL())

require.Equal(t, variable.AssertionLevelOff, ctx.TxnAssertionLevel())
require.Equal(t, sctxTblCtx.TxnAssertionLevel(), ctx.TxnAssertionLevel())

require.Equal(t, variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1, ctx.GetRowEncodingConfig().IsRowLevelChecksumEnabled)
require.Equal(t, variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1, ctx.GetRowEncodingConfig().RowEncoder.Enable)
require.Equal(t, sctxTblCtx.GetRowEncodingConfig(), ctx.GetRowEncodingConfig())

require.NotNil(t, ctx.GetMutateBuffers())
require.Equal(t, sctxTblCtx.GetMutateBuffers(), ctx.GetMutateBuffers())

require.Equal(t, variable.DefTiDBShardAllocateStep, ctx.GetRowIDShardGenerator().GetShardStep())
sctx.GetSessionVars().TxnCtx.StartTS = 123 // make sure GetRowIDShardGenerator() pass assert
require.Equal(t, sctxTblCtx.GetRowIDShardGenerator().GetShardStep(), ctx.GetRowIDShardGenerator().GetShardStep())
require.GreaterOrEqual(t, ctx.GetRowIDShardGenerator().GetCurrentShard(1), int64(0))

alloc1, ok := sctxTblCtx.GetReservedRowIDAlloc()
require.True(t, ok)
alloc2, ok := ctx.GetReservedRowIDAlloc()
require.True(t, ok)
require.Equal(t, alloc1, alloc2)
require.True(t, alloc2.Exhausted())

binlog, ok := ctx.GetBinlogSupport()
require.False(t, ok)
require.Nil(t, binlog)
statistics, ok := ctx.GetStatisticsSupport()
require.False(t, ok)
require.Nil(t, statistics)
cached, ok := ctx.GetCachedTableSupport()
require.False(t, ok)
require.Nil(t, cached)
temp, ok := ctx.GetTemporaryTableSupport()
require.False(t, ok)
require.Nil(t, temp)
dml, ok := ctx.GetExchangePartitionDMLSupport()
require.False(t, ok)
require.Nil(t, dml)
}

// test when the row format is v1
variable.SetDDLReorgRowFormat(variable.DefTiDBRowFormatV1)
sctx := newMockReorgSessCtx(&mockStorage{client: &mock.Client{}})
require.NoError(t, initSessCtx(sctx, &model.DDLReorgMeta{}))
ctx := newReorgTableMutateContext(exprCtx)
require.Same(t, exprCtx, ctx.GetExprCtx())
assertTblCtxMatchSessionCtx(ctx, sctx)
}

type mockStorage struct {
kv.Storage
client kv.Client
Expand Down Expand Up @@ -313,13 +390,13 @@ func assertDistSQLCtxEqual(t *testing.T, expected *distsqlctx.DistSQLContext, ac
}

// TestReorgExprContext is used in refactor stage to make sure the newDefaultReorgDistSQLCtx() is
// compatible with newReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage.
// compatible with newMockReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage.
// After refactor, the TestReorgExprContext can be removed.
func TestReorgDistSQLCtx(t *testing.T) {
store := &mockStorage{client: &mock.Client{}}

// test default dist sql context
expected := newReorgSessCtx(store).GetDistSQLCtx()
expected := newMockReorgSessCtx(store).GetDistSQLCtx()
defaultCtx := newDefaultReorgDistSQLCtx(store.client, expected.WarnHandler)
assertDistSQLCtxEqual(t, expected, defaultCtx)

Expand All @@ -339,7 +416,7 @@ func TestReorgDistSQLCtx(t *testing.T) {
ResourceGroupName: "rg2",
},
} {
sctx := newReorgSessCtx(store)
sctx := newMockReorgSessCtx(store)
require.NoError(t, initSessCtx(sctx, &reorg))
expected = sctx.GetDistSQLCtx()
ctx, err := newReorgDistSQLCtxWithReorgMeta(store.client, &reorg, expected.WarnHandler)
Expand Down
9 changes: 1 addition & 8 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,18 +606,11 @@ type updateColumnWorker struct {
}

func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*updateColumnWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false)
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false, true)
if err != nil {
return nil, err
}

sessCtx := bCtx.sessCtx
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(
sessCtx.GetSessionVars().StmtCtx.TypeFlags().
WithIgnoreZeroDateErr(!reorgInfo.ReorgMeta.SQLMode.HasStrictMode()))
bCtx.exprCtx = bCtx.sessCtx.GetExprCtx()
bCtx.tblCtx = bCtx.sessCtx.GetTableCtx()

if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
logutil.DDLLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.Stringer("reorgInfo", reorgInfo))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2397,7 +2397,7 @@ type cleanUpIndexWorker struct {
}

func newCleanUpIndexWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*cleanUpIndexWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false)
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3600,7 +3600,7 @@ type reorgPartitionWorker struct {
}

func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) {
bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false)
bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false, false)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7c7458b

Please sign in to comment.