Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: remove mock.Context from DDL reorg #55873

Merged
merged 2 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ go_library(
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/table",
"//pkg/table/context",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/tidb-binlog/pump_client",
Expand Down
33 changes: 23 additions & 10 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import (
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/expression/contextstatic"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -155,7 +155,6 @@ type backfillTaskContext struct {
type backfillCtx struct {
id int
*ddlCtx
sessCtx sessionctx.Context
warnings contextutil.WarnHandlerExt
loc *time.Location
exprCtx exprctx.BuildContext
Expand All @@ -168,26 +167,40 @@ type backfillCtx struct {
}

func newBackfillCtx(id int, rInfo *reorgInfo,
schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isDistributed bool) (*backfillCtx, error) {
// TODO: remove newReorgSessCtx
sessCtx := newReorgSessCtx(rInfo.jobCtx.store)
if err := initSessCtx(sessCtx, rInfo.ReorgMeta); err != nil {
schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isDistributed bool, isUpdateColumn bool) (*backfillCtx, error) {
warnHandler := contextutil.NewStaticWarnHandler(0)
exprCtx, err := newReorgExprCtxWithReorgMeta(rInfo.ReorgMeta, warnHandler)
if err != nil {
return nil, errors.Trace(err)
}

if isUpdateColumn {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The below case should be compatible with mysql behavior:
// > create table t (a int);
// > insert into t values (0);
// > alter table t modify column a date;
// The alter DDL should return an error in strict mode and success in non-strict mode.
// See: https://github.com/pingcap/tidb/pull/25728 for more details.
hasStrictMode := rInfo.ReorgMeta.SQLMode.HasStrictMode()
tc := exprCtx.GetStaticEvalCtx().TypeCtx()
evalCtx := exprCtx.GetStaticEvalCtx().Apply(contextstatic.WithTypeFlags(
tc.Flags().WithIgnoreZeroDateErr(!hasStrictMode),
))
exprCtx = exprCtx.Apply(contextstatic.WithEvalCtx(evalCtx))
}

tblCtx := newReorgTableMutateContext(exprCtx)
if isDistributed {
id = int(backfillContextID.Add(1))
}

exprCtx := sessCtx.GetExprCtx()
batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
return &backfillCtx{
id: id,
ddlCtx: rInfo.jobCtx.oldDDLCtx,
sessCtx: sessCtx,
warnings: sessCtx.GetSessionVars().StmtCtx.WarnHandler,
warnings: warnHandler,
exprCtx: exprCtx,
tblCtx: sessCtx.GetTableCtx(),
tblCtx: tblCtx,
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
Expand Down
11 changes: 2 additions & 9 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/ppcpuusage"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
Expand Down Expand Up @@ -220,12 +219,6 @@ func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) erro
typeFlags := reorgTypeFlagsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags)
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName

// Prevent initializing the mock context in the workers concurrently.
// For details, see https://github.com/pingcap/tidb/issues/40879.
if _, ok := sessCtx.(*mock.Context); ok {
_ = sessCtx.GetDomainInfoSchema()
}
Comment on lines -224 to -228
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it because mock.Context is not used now.

return nil
}

Expand Down Expand Up @@ -278,7 +271,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false, false)
if err != nil {
return err
}
Expand All @@ -291,7 +284,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
runner = newBackfillWorker(b.ctx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false, false)
if err != nil {
return err
}
Expand Down
89 changes: 83 additions & 6 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"bytes"
"testing"
"time"

"github.com/pingcap/tidb/pkg/ddl/ingest"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/privilege"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/deeptest"
Expand Down Expand Up @@ -224,13 +227,25 @@ func assertStaticExprContextEqual(t *testing.T, sctx sessionctx.Context, exprCtx
)
}

// newMockReorgSessCtx creates a mock session context for reorg test.
// In old implementations, DDL is using `mock.Context` to construct the contexts used in DDL reorg.
// After refactoring, we just need it to do test the new implementation is compatible with the old one.
func newMockReorgSessCtx(store kv.Storage) sessionctx.Context {
c := mock.NewContext()
c.Store = store
c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false)
tz := *time.UTC
c.ResetSessionAndStmtTimeZone(&tz)
return c
}

// TestReorgExprContext is used in refactor stage to make sure the newReorgExprCtx() is
// compatible with newReorgSessCtx(nil).GetExprCtx() to make it safe to replace `mock.Context` usage.
// compatible with newMockReorgSessCtx(nil).GetExprCtx() to make it safe to replace `mock.Context` usage.
// After refactor, the TestReorgExprContext can be removed.
func TestReorgExprContext(t *testing.T) {
// test default expr context
store := &mockStorage{client: &mock.Client{}}
sctx := newReorgSessCtx(store)
sctx := newMockReorgSessCtx(store)
defaultCtx := newReorgExprCtx()
// should use an empty static warn handler by default
evalCtx := defaultCtx.GetStaticEvalCtx()
Expand All @@ -255,7 +270,7 @@ func TestReorgExprContext(t *testing.T) {
ResourceGroupName: "rg2",
},
} {
sctx = newReorgSessCtx(store)
sctx = newMockReorgSessCtx(store)
require.NoError(t, initSessCtx(sctx, &reorg))
ctx, err := newReorgExprCtxWithReorgMeta(&reorg, sctx.GetSessionVars().StmtCtx.WarnHandler)
require.NoError(t, err)
Expand All @@ -278,6 +293,68 @@ func TestReorgExprContext(t *testing.T) {
}
}

func TestReorgTableMutateContext(t *testing.T) {
originalRowFmt := variable.GetDDLReorgRowFormat()
defer variable.SetDDLReorgRowFormat(originalRowFmt)

exprCtx := contextstatic.NewStaticExprContext()

assertTblCtxMatchSessionCtx := func(ctx table.MutateContext, sctx sessionctx.Context) {
sctxTblCtx := sctx.GetTableCtx()
require.Equal(t, uint64(0), ctx.ConnectionID())
require.Equal(t, sctxTblCtx.ConnectionID(), ctx.ConnectionID())

require.False(t, ctx.InRestrictedSQL())
require.Equal(t, sctxTblCtx.InRestrictedSQL(), ctx.InRestrictedSQL())

require.Equal(t, variable.AssertionLevelOff, ctx.TxnAssertionLevel())
require.Equal(t, sctxTblCtx.TxnAssertionLevel(), ctx.TxnAssertionLevel())

require.Equal(t, variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1, ctx.GetRowEncodingConfig().IsRowLevelChecksumEnabled)
require.Equal(t, variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1, ctx.GetRowEncodingConfig().RowEncoder.Enable)
require.Equal(t, sctxTblCtx.GetRowEncodingConfig(), ctx.GetRowEncodingConfig())

require.NotNil(t, ctx.GetMutateBuffers())
require.Equal(t, sctxTblCtx.GetMutateBuffers(), ctx.GetMutateBuffers())

require.Equal(t, variable.DefTiDBShardAllocateStep, ctx.GetRowIDShardGenerator().GetShardStep())
sctx.GetSessionVars().TxnCtx.StartTS = 123 // make sure GetRowIDShardGenerator() pass assert
require.Equal(t, sctxTblCtx.GetRowIDShardGenerator().GetShardStep(), ctx.GetRowIDShardGenerator().GetShardStep())
require.GreaterOrEqual(t, ctx.GetRowIDShardGenerator().GetCurrentShard(1), int64(0))

alloc1, ok := sctxTblCtx.GetReservedRowIDAlloc()
require.True(t, ok)
alloc2, ok := ctx.GetReservedRowIDAlloc()
require.True(t, ok)
require.Equal(t, alloc1, alloc2)
require.True(t, alloc2.Exhausted())

binlog, ok := ctx.GetBinlogSupport()
require.False(t, ok)
require.Nil(t, binlog)
statistics, ok := ctx.GetStatisticsSupport()
require.False(t, ok)
require.Nil(t, statistics)
cached, ok := ctx.GetCachedTableSupport()
require.False(t, ok)
require.Nil(t, cached)
temp, ok := ctx.GetTemporaryTableSupport()
require.False(t, ok)
require.Nil(t, temp)
dml, ok := ctx.GetExchangePartitionDMLSupport()
require.False(t, ok)
require.Nil(t, dml)
}

// test when the row format is v1
variable.SetDDLReorgRowFormat(variable.DefTiDBRowFormatV1)
sctx := newMockReorgSessCtx(&mockStorage{client: &mock.Client{}})
require.NoError(t, initSessCtx(sctx, &model.DDLReorgMeta{}))
ctx := newReorgTableMutateContext(exprCtx)
require.Same(t, exprCtx, ctx.GetExprCtx())
assertTblCtxMatchSessionCtx(ctx, sctx)
}

type mockStorage struct {
kv.Storage
client kv.Client
Expand Down Expand Up @@ -313,13 +390,13 @@ func assertDistSQLCtxEqual(t *testing.T, expected *distsqlctx.DistSQLContext, ac
}

// TestReorgExprContext is used in refactor stage to make sure the newDefaultReorgDistSQLCtx() is
// compatible with newReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage.
// compatible with newMockReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage.
// After refactor, the TestReorgExprContext can be removed.
func TestReorgDistSQLCtx(t *testing.T) {
store := &mockStorage{client: &mock.Client{}}

// test default dist sql context
expected := newReorgSessCtx(store).GetDistSQLCtx()
expected := newMockReorgSessCtx(store).GetDistSQLCtx()
defaultCtx := newDefaultReorgDistSQLCtx(store.client, expected.WarnHandler)
assertDistSQLCtxEqual(t, expected, defaultCtx)

Expand All @@ -339,7 +416,7 @@ func TestReorgDistSQLCtx(t *testing.T) {
ResourceGroupName: "rg2",
},
} {
sctx := newReorgSessCtx(store)
sctx := newMockReorgSessCtx(store)
require.NoError(t, initSessCtx(sctx, &reorg))
expected = sctx.GetDistSQLCtx()
ctx, err := newReorgDistSQLCtxWithReorgMeta(store.client, &reorg, expected.WarnHandler)
Expand Down
9 changes: 1 addition & 8 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,18 +606,11 @@ type updateColumnWorker struct {
}

func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*updateColumnWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false)
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false, true)
if err != nil {
return nil, err
}

sessCtx := bCtx.sessCtx
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(
sessCtx.GetSessionVars().StmtCtx.TypeFlags().
WithIgnoreZeroDateErr(!reorgInfo.ReorgMeta.SQLMode.HasStrictMode()))
bCtx.exprCtx = bCtx.sessCtx.GetExprCtx()
bCtx.tblCtx = bCtx.sessCtx.GetTableCtx()

Comment on lines -614 to -620
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
logutil.DDLLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.Stringer("reorgInfo", reorgInfo))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2397,7 +2397,7 @@ type cleanUpIndexWorker struct {
}

func newCleanUpIndexWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*cleanUpIndexWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false)
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3600,7 +3600,7 @@ type reorgPartitionWorker struct {
}

func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) {
bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false)
bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false, false)
if err != nil {
return nil, err
}
Expand Down
Loading