diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 69ab527f0cedc..05f2f4a4cb59b 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/ddl/ingest" sess "github.com/pingcap/tidb/pkg/ddl/internal/session" ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" @@ -34,15 +35,22 @@ import ( "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" poolutil "github.com/pingcap/tidb/pkg/resourcemanager/util" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" + contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/mock" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" + "github.com/pingcap/tidb/pkg/util/sqlkiller" + "github.com/pingcap/tidb/pkg/util/tiflash" + tikvstore "github.com/tikv/client-go/v2/kv" "go.uber.org/zap" ) @@ -148,6 +156,31 @@ func newSessCtx( return sessCtx, nil } +func newDefaultReorgDistSQLCtx(kvClient kv.Client) *distsqlctx.DistSQLContext { + warnHandler := contextutil.NewStaticWarnHandler(0) + var sqlKiller sqlkiller.SQLKiller + var execDetails execdetails.SyncExecDetails + return &distsqlctx.DistSQLContext{ + WarnHandler: warnHandler, + Client: kvClient, + EnableChunkRPC: true, + EnabledRateLimitAction: variable.DefTiDBEnableRateLimitAction, + KVVars: tikvstore.NewVariables(&sqlKiller.Signal), + SessionMemTracker: memory.NewTracker(memory.LabelForSession, -1), + Location: time.UTC, + SQLKiller: &sqlKiller, + ErrCtx: errctx.NewContextWithLevels(stmtctx.DefaultStmtErrLevels, warnHandler), + TiFlashReplicaRead: tiflash.GetTiFlashReplicaReadByStr(variable.DefTiFlashReplicaRead), + TiFlashMaxThreads: variable.DefTiFlashMaxThreads, + TiFlashMaxBytesBeforeExternalJoin: variable.DefTiFlashMaxBytesBeforeExternalJoin, + TiFlashMaxBytesBeforeExternalGroupBy: variable.DefTiFlashMaxBytesBeforeExternalGroupBy, + TiFlashMaxBytesBeforeExternalSort: variable.DefTiFlashMaxBytesBeforeExternalSort, + TiFlashMaxQueryMemoryPerNode: variable.DefTiFlashMemQuotaQueryPerNode, + TiFlashQuerySpillRatio: variable.DefTiFlashQuerySpillRatio, + ExecDetails: &execDetails, + } +} + // initSessCtx initializes the session context. Be careful to the timezone. func initSessCtx( sessCtx sessionctx.Context, diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index 4702031c67e86..21b8255b12e95 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -20,10 +20,13 @@ import ( "testing" "time" + "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" + contextutil "github.com/pingcap/tidb/pkg/util/context" + "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) @@ -144,3 +147,48 @@ func TestReorgExprContext(t *testing.T) { require.Equal(t, evalCtx1.GetDefaultWeekFormatMode(), evalCtx.GetDefaultWeekFormatMode()) require.Equal(t, evalCtx1.GetDivPrecisionIncrement(), evalCtx.GetDivPrecisionIncrement()) } + +type mockStorage struct { + kv.Storage + client kv.Client +} + +func (s *mockStorage) GetClient() kv.Client { + return s.client +} + +// TestReorgExprContext is used in refactor stage to make sure the newDefaultReorgDistSQLCtx() is +// compatible with newReorgSessCtx(nil).GetDistSQLCtx() to make it safe to replace `mock.Context` usage. +// After refactor, the TestReorgExprContext can be removed. +func TestReorgDistSQLCtx(t *testing.T) { + store := &mockStorage{client: &mock.Client{}} + ctx1 := newReorgSessCtx(store).GetDistSQLCtx() + ctx2 := newDefaultReorgDistSQLCtx(store.client) + + // set the same warnHandler to make two contexts equal + ctx1.WarnHandler = ctx2.WarnHandler + + // set the same KVVars to make two contexts equal + require.Equal(t, uint32(0), *ctx1.KVVars.Killed) + require.Equal(t, uint32(0), *ctx2.KVVars.Killed) + ctx1.KVVars.Killed = ctx2.KVVars.Killed + + // set the same SessionMemTracker to make two contexts equal + require.Equal(t, ctx1.SessionMemTracker.Label(), ctx2.SessionMemTracker.Label()) + require.Equal(t, ctx1.SessionMemTracker.GetBytesLimit(), ctx2.SessionMemTracker.GetBytesLimit()) + ctx1.SessionMemTracker = ctx2.SessionMemTracker + + // set the same ErrCtx to make two contexts equal + require.Equal(t, ctx1.ErrCtx.LevelMap(), ctx2.ErrCtx.LevelMap()) + require.Equal(t, 0, ctx2.WarnHandler.(contextutil.WarnHandler).WarningCount()) + ctx2.ErrCtx.AppendWarning(errors.New("warn")) + require.Equal(t, 1, ctx2.WarnHandler.(contextutil.WarnHandler).WarningCount()) + ctx1.ErrCtx = ctx2.ErrCtx + + // set the same ExecDetails to make two contexts equal + require.NotNil(t, ctx1.ExecDetails) + require.NotNil(t, ctx2.ExecDetails) + ctx1.ExecDetails = ctx2.ExecDetails + + require.Equal(t, ctx1, ctx2) +} diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 7cf13f60fbb01..2ec65db314eaf 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -29,6 +29,7 @@ import ( sess "github.com/pingcap/tidb/pkg/ddl/internal/session" "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/distsql" + distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" exprctx "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/expression/contextstatic" "github.com/pingcap/tidb/pkg/kv" @@ -510,7 +511,7 @@ func constructLimitPB(count uint64) *tipb.Executor { return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec} } -func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) { +func buildDescTableScanDAG(distSQLCtx *distsqlctx.DistSQLContext, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} _, timeZoneOffset := time.Now().In(time.UTC).Zone() dagReq.TimeZoneOffset = int64(timeZoneOffset) @@ -522,7 +523,7 @@ func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, hand tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), tbl.Meta(), handleCols) dagReq.Executors = append(dagReq.Executors, tblScanExec) dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit)) - distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq) + distsql.SetEncodeType(distSQLCtx, dagReq) return dagReq, nil } @@ -537,8 +538,8 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType { // buildDescTableScan builds a desc table scan upon tblInfo. func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.PhysicalTable, handleCols []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) { - sctx := newReorgSessCtx(dc.store) - dagPB, err := buildDescTableScanDAG(sctx, tbl, handleCols, limit) + distSQLCtx := newDefaultReorgDistSQLCtx(dc.store.GetClient()) + dagPB, err := buildDescTableScanDAG(distSQLCtx, tbl, handleCols, limit) if err != nil { return nil, errors.Trace(err) } @@ -550,7 +551,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table. } else { ranges = ranger.FullIntRange(false) } - builder = b.SetHandleRanges(sctx.GetDistSQLCtx(), tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges) + builder = b.SetHandleRanges(distSQLCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges) builder.SetDAGRequest(dagPB). SetStartTS(startTS). SetKeepOrder(true). @@ -569,7 +570,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table. return nil, errors.Trace(err) } - result, err := distsql.Select(ctx.ddlJobCtx, sctx.GetDistSQLCtx(), kvReq, getColumnsTypes(handleCols)) + result, err := distsql.Select(ctx.ddlJobCtx, distSQLCtx, kvReq, getColumnsTypes(handleCols)) if err != nil { return nil, errors.Trace(err) }