diff --git a/pkg/disttask/framework/handle/handle.go b/pkg/disttask/framework/handle/handle.go index a40a2afd01d43..4ab9ff6e4313e 100644 --- a/pkg/disttask/framework/handle/handle.go +++ b/pkg/disttask/framework/handle/handle.go @@ -77,7 +77,7 @@ func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, co if err != nil { return nil, err } - metrics.UpdateMetricsForAddTask(task) + metrics.UpdateMetricsForAddTask(&task.TaskBase) NotifyTaskChange() return task, nil diff --git a/pkg/disttask/importinto/job.go b/pkg/disttask/importinto/job.go index a94d00ade23ad..3b16f04be34fd 100644 --- a/pkg/disttask/importinto/job.go +++ b/pkg/disttask/importinto/job.go @@ -19,7 +19,7 @@ import ( "encoding/json" "fmt" - "github.com/google/uuid" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/pkg/disttask/framework/handle" @@ -38,105 +38,26 @@ import ( "go.uber.org/zap" ) -// DistImporter is a JobImporter for distributed IMPORT INTO. -type DistImporter struct { - *importer.JobImportParam - plan *importer.Plan - stmt string - logger *zap.Logger - // the instance to import data, used for single-node import, nil means import data on all instances. - instance *infosync.ServerInfo - // the files to import, when import from server file, we need to pass those file to the framework. - chunkMap map[int32][]Chunk - sourceFileSize int64 - // only set after submit task - jobID int64 - taskID int64 -} - -// NewDistImporter creates a new DistImporter. -func NewDistImporter(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error) { - return &DistImporter{ - JobImportParam: param, - plan: plan, - stmt: stmt, - logger: logutil.BgLogger(), - sourceFileSize: sourceFileSize, - }, nil -} - -// NewDistImporterCurrNode creates a new DistImporter to import data on current node. -func NewDistImporterCurrNode(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error) { +// SubmitStandaloneTask submits a task to the distribute framework that only runs on the current node. +// when import from server-disk, pass engine checkpoints too, as scheduler might run on another +// node where we can't access the data files. +func SubmitStandaloneTask(ctx context.Context, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint) (int64, *proto.TaskBase, error) { serverInfo, err := infosync.GetServerInfo() if err != nil { - return nil, err - } - return &DistImporter{ - JobImportParam: param, - plan: plan, - stmt: stmt, - logger: logutil.BgLogger(), - instance: serverInfo, - sourceFileSize: sourceFileSize, - }, nil -} - -// NewDistImporterServerFile creates a new DistImporter to import given files on current node. -// we also run import on current node. -// todo: merge all 3 ctor into one. -func NewDistImporterServerFile(param *importer.JobImportParam, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint, sourceFileSize int64) (*DistImporter, error) { - distImporter, err := NewDistImporterCurrNode(param, plan, stmt, sourceFileSize) - if err != nil { - return nil, err - } - distImporter.chunkMap = toChunkMap(ecp) - return distImporter, nil -} - -// Param implements JobImporter.Param. -func (ti *DistImporter) Param() *importer.JobImportParam { - return ti.JobImportParam -} - -// Import implements JobImporter.Import. -func (*DistImporter) Import() { - // todo: remove it -} - -// ImportTask import task. -func (ti *DistImporter) ImportTask(task *proto.Task) { - ti.logger.Info("start distribute IMPORT INTO") - ti.Group.Go(func() error { - defer close(ti.Done) - // task is run using distribute framework, so we only wait for the task to finish. - return handle.WaitTaskDoneOrPaused(ti.GroupCtx, task.ID) - }) -} - -// Result implements JobImporter.Result. -func (ti *DistImporter) Result(ctx context.Context) importer.JobImportResult { - var result importer.JobImportResult - taskMeta, err := getTaskMeta(ctx, ti.jobID) - if err != nil { - return result - } - - return importer.JobImportResult{ - Affected: taskMeta.Result.LoadedRowCnt, - ColSizeMap: taskMeta.Result.ColSizeMap, + return 0, nil, err } + return doSubmitTask(ctx, plan, stmt, serverInfo, toChunkMap(ecp)) } -// Close implements the io.Closer interface. -func (*DistImporter) Close() error { - return nil +// SubmitTask submits a task to the distribute framework that runs on all managed nodes. +func SubmitTask(ctx context.Context, plan *importer.Plan, stmt string) (int64, *proto.TaskBase, error) { + return doSubmitTask(ctx, plan, stmt, nil, nil) } -// SubmitTask submits a task to the distribute framework. -func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, error) { +func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instance *infosync.ServerInfo, chunkMap map[int32][]Chunk) (int64, *proto.TaskBase, error) { var instances []*infosync.ServerInfo - if ti.instance != nil { - instances = append(instances, ti.instance) + if instance != nil { + instances = append(instances, instance) } // we use taskManager to submit task, user might not have the privilege to system tables. taskManager, err := storage.GetTaskManager() @@ -146,7 +67,6 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err } var jobID, taskID int64 - plan := ti.plan if err = taskManager.WithNewTxn(ctx, func(se sessionctx.Context) error { var err2 error exec := se.(sqlexec.SQLExecutor) @@ -163,7 +83,7 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("there's pending or running jobs") } jobID, err2 = importer.CreateJob(ctx, exec, plan.DBName, plan.TableInfo.Name.L, plan.TableInfo.ID, - plan.User, plan.Parameters, ti.sourceFileSize) + plan.User, plan.Parameters, plan.TotalFileSize) if err2 != nil { return err2 } @@ -173,9 +93,9 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err logicalPlan := &LogicalPlan{ JobID: jobID, Plan: *plan, - Stmt: ti.stmt, + Stmt: stmt, EligibleInstances: instances, - ChunkMap: ti.chunkMap, + ChunkMap: chunkMap, } planCtx := planner.PlanCtx{ Ctx: ctx, @@ -194,51 +114,22 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err return 0, nil, err } handle.NotifyTaskChange() - task, err := taskManager.GetTaskByID(ctx, taskID) + task, err := taskManager.GetTaskBaseByID(ctx, taskID) if err != nil { return 0, nil, err } metrics.UpdateMetricsForAddTask(task) - // update logger with task id. - ti.jobID = jobID - ti.taskID = taskID - ti.logger = ti.logger.With(zap.Int64("task-id", task.ID)) - ti.logger.Info("job submitted to task queue", - zap.Int64("job-id", jobID), zap.Int("thread-cnt", plan.ThreadCnt)) + logutil.BgLogger().Info("job submitted to task queue", + zap.Int64("job-id", jobID), + zap.Int64("task-id", task.ID), + zap.String("data-size", units.BytesSize(float64(plan.TotalFileSize))), + zap.Int("thread-cnt", plan.ThreadCnt)) return jobID, task, nil } -func (*DistImporter) taskKey() string { - // task key is meaningless to IMPORT INTO, so we use a random uuid. - return fmt.Sprintf("%s/%s", proto.ImportInto, uuid.New().String()) -} - -// JobID returns the job id. -func (ti *DistImporter) JobID() int64 { - return ti.jobID -} - -func getTaskMeta(ctx context.Context, jobID int64) (*TaskMeta, error) { - taskManager, err := storage.GetTaskManager() - ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) - if err != nil { - return nil, err - } - taskKey := TaskKey(jobID) - task, err := taskManager.GetTaskByKey(ctx, taskKey) - if err != nil { - return nil, err - } - var taskMeta TaskMeta - if err := json.Unmarshal(task.Meta, &taskMeta); err != nil { - return nil, errors.Trace(err) - } - return &taskMeta, nil -} - // GetTaskImportedRows gets the number of imported rows of a job. // Note: for finished job, we can get the number of imported rows from task meta. func GetTaskImportedRows(ctx context.Context, jobID int64) (uint64, error) { diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 6d40051664edf..b5cfe9c034d8a 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -638,6 +638,12 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger return handle.RunWithRetry(ctx, scheduler.RetrySQLTimes, backoffer, logger, func(ctx context.Context) (bool, error) { return true, taskHandle.WithNewSession(func(se sessionctx.Context) error { + if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{ + Affected: taskMeta.Result.LoadedRowCnt, + ColSizeMap: taskMeta.Result.ColSizeMap, + }); err != nil { + logger.Warn("flush table stats failed", zap.Error(err)) + } exec := se.(sqlexec.SQLExecutor) return importer.FinishJob(ctx, exec, taskMeta.JobID, summary) }) diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 13ad3dbc7834a..743490d1fdb5b 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -80,11 +80,7 @@ func getTableImporter(ctx context.Context, taskID int64, taskMeta *TaskMeta) (*i return nil, err } - return importer.NewTableImporter(&importer.JobImportParam{ - GroupCtx: ctx, - Progress: importer.NewProgress(), - Job: &importer.Job{}, - }, controller, strconv.FormatInt(taskID, 10)) + return importer.NewTableImporter(ctx, controller, strconv.FormatInt(taskID, 10)) } func (s *importStepExecutor) Init(ctx context.Context) error { diff --git a/pkg/executor/import_into.go b/pkg/executor/import_into.go index a2d5c5260f4fc..10e0bac49aecf 100644 --- a/pkg/executor/import_into.go +++ b/pkg/executor/import_into.go @@ -17,7 +17,6 @@ package executor import ( "context" "fmt" - "sync/atomic" "github.com/google/uuid" "github.com/pingcap/errors" @@ -37,7 +36,6 @@ import ( "github.com/pingcap/tidb/pkg/privilege" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" @@ -49,8 +47,6 @@ import ( ) var ( - // TestDetachedTaskFinished is a flag for test. - TestDetachedTaskFinished atomic.Bool // TestCancelFunc for test. TestCancelFunc context.CancelFunc ) @@ -62,7 +58,6 @@ type ImportIntoExec struct { exec.BaseExecutor selectExec exec.Executor userSctx sessionctx.Context - importPlan *importer.Plan controller *importer.LoadDataController stmt string @@ -104,7 +99,6 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) if err != nil { return err } - e.importPlan = importPlan e.controller = controller if e.selectExec != nil { @@ -127,7 +121,7 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) return err2 } - if err := e.importPlan.InitTiKVConfigs(ctx, newSCtx); err != nil { + if err := e.controller.InitTiKVConfigs(ctx, newSCtx); err != nil { return err } @@ -137,53 +131,16 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) ctx = newCtx TestCancelFunc = cancel }) - // todo: we don't need Job now, remove it later. - parentCtx := ctx - if e.controller.Detached { - parentCtx = context.Background() - } - group, groupCtx := errgroup.WithContext(parentCtx) - groupCtx = kv.WithInternalSourceType(groupCtx, kv.InternalDistTask) - - param := &importer.JobImportParam{ - Job: &importer.Job{}, - Group: group, - GroupCtx: groupCtx, - Done: make(chan struct{}), - Progress: importer.NewProgress(), - } - distImporter, err := e.getJobImporter(ctx, param) - if err != nil { - return err - } - defer func() { - _ = distImporter.Close() - }() - param.Progress.SourceFileSize = e.controller.TotalFileSize - jobID, task, err := distImporter.SubmitTask(ctx) + + jobID, task, err := e.submitTask(ctx) if err != nil { return err } - if e.controller.Detached { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalImportInto) - se, err := CreateSession(e.userSctx) - if err != nil { + if !e.controller.Detached { + if err = e.waitTask(ctx, jobID, task); err != nil { return err } - go func() { - defer CloseSession(se) - // error is stored in system table, so we can ignore it here - //nolint: errcheck - _ = e.doImport(ctx, se, distImporter, task) - failpoint.Inject("testDetachedTaskFinished", func() { - TestDetachedTaskFinished.Store(true) - }) - }() - return e.fillJobInfo(ctx, jobID, req) - } - if err = e.doImport(ctx, e.userSctx, distImporter, task); err != nil { - return err } return e.fillJobInfo(ctx, jobID, req) } @@ -209,32 +166,32 @@ func (e *ImportIntoExec) fillJobInfo(ctx context.Context, jobID int64, req *chun return nil } -func (e *ImportIntoExec) getJobImporter(ctx context.Context, param *importer.JobImportParam) (*importinto.DistImporter, error) { +func (e *ImportIntoExec) submitTask(ctx context.Context) (int64, *proto.TaskBase, error) { importFromServer, err := storage.IsLocalPath(e.controller.Path) if err != nil { // since we have checked this during creating controller, this should not happen. - return nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(plannercore.ImportIntoDataSource, err.Error()) + return 0, nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(plannercore.ImportIntoDataSource, err.Error()) } logutil.Logger(ctx).Info("get job importer", zap.Stringer("param", e.controller.Parameters), zap.Bool("dist-task-enabled", variable.EnableDistTask.Load())) if importFromServer { ecp, err2 := e.controller.PopulateChunks(ctx) if err2 != nil { - return nil, err2 + return 0, nil, err2 } - return importinto.NewDistImporterServerFile(param, e.importPlan, e.stmt, ecp, e.controller.TotalFileSize) + return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, ecp) } // if tidb_enable_dist_task=true, we import distributively, otherwise we import on current node. if variable.EnableDistTask.Load() { - return importinto.NewDistImporter(param, e.importPlan, e.stmt, e.controller.TotalFileSize) + return importinto.SubmitTask(ctx, e.controller.Plan, e.stmt) } - return importinto.NewDistImporterCurrNode(param, e.importPlan, e.stmt, e.controller.TotalFileSize) + return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, nil) } -func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, distImporter *importinto.DistImporter, task *proto.Task) error { - distImporter.ImportTask(task) - group := distImporter.Param().Group - err := group.Wait() +// waitTask waits for the task to finish. +// NOTE: WaitTaskDoneOrPaused also return error when task fails. +func (*ImportIntoExec) waitTask(ctx context.Context, jobID int64, task *proto.TaskBase) error { + err := handle.WaitTaskDoneOrPaused(ctx, task.ID) // when user KILL the connection, the ctx will be canceled, we need to cancel the import job. if errors.Cause(err) == context.Canceled { taskManager, err2 := fstorage.GetTaskManager() @@ -242,11 +199,7 @@ func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, di return err2 } // use background, since ctx is canceled already. - return cancelAndWaitImportJob(context.Background(), taskManager, distImporter.JobID()) - } - importResult := distImporter.Result(ctx) - if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, &importResult); err2 != nil { - logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2)) + return cancelAndWaitImportJob(context.Background(), taskManager, jobID) } return err } @@ -266,25 +219,16 @@ func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { if err2 = e.controller.CheckRequirements(ctx, sqlExec); err2 != nil { return err2 } - if err := e.importPlan.InitTiKVConfigs(ctx, newSCtx); err != nil { + if err := e.controller.InitTiKVConfigs(ctx, newSCtx); err != nil { return err } - // TODO: we didn't use this `group` here, but have to init GroupCtx, refactor this later. - group, groupCtx := errgroup.WithContext(ctx) - param := &importer.JobImportParam{ - Job: &importer.Job{}, - Group: group, - GroupCtx: groupCtx, - Done: make(chan struct{}), - Progress: importer.NewProgress(), - } importID := uuid.New().String() logutil.Logger(ctx).Info("importing data from select statement", zap.String("import-id", importID), zap.Int("concurrency", e.controller.ThreadCnt), zap.String("target-table", e.controller.FullTableName()), zap.Int64("target-table-id", e.controller.TableInfo.ID)) - ti, err2 := importer.NewTableImporter(param, e.controller, importID) + ti, err2 := importer.NewTableImporter(ctx, e.controller, importID) if err2 != nil { return err2 } @@ -336,7 +280,7 @@ func (e *ImportIntoExec) importFromSelect(ctx context.Context) error { return err } - if err2 = flushStats(ctx, newSCtx, e.importPlan.TableInfo.ID, importResult); err2 != nil { + if err2 = importer.FlushTableStats(ctx, newSCtx, e.controller.TableInfo.ID, importResult); err2 != nil { logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2)) } @@ -400,19 +344,6 @@ func (e *ImportIntoActionExec) checkPrivilegeAndStatus(ctx context.Context, mana return nil } -// flushStats flushes the stats of the table. -func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result *importer.JobImportResult) error { - if err := sessiontxn.NewTxn(ctx, se); err != nil { - return err - } - sessionVars := se.GetSessionVars() - sessionVars.TxnCtxMu.Lock() - defer sessionVars.TxnCtxMu.Unlock() - sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap) - se.StmtCommit(ctx) - return se.CommitTxn(ctx) -} - func cancelAndWaitImportJob(ctx context.Context, manager *fstorage.TaskManager, jobID int64) error { if err := manager.WithNewTxn(ctx, func(se sessionctx.Context) error { ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index c2e4e559cc08f..7fff20cd54600 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -50,6 +50,7 @@ go_library( "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", + "//pkg/sessiontxn", "//pkg/table", "//pkg/table/tables", "//pkg/tablecodec", diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index b9bef1cdd20e0..dcfdceb09c7aa 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -64,7 +64,6 @@ import ( pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) const ( @@ -433,16 +432,6 @@ func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plann return p, nil } -// InitTiKVConfigs initializes some TiKV related configs. -func (p *Plan) InitTiKVConfigs(ctx context.Context, sctx sessionctx.Context) error { - isRaftKV2, err := util.IsRaftKv2(ctx, sctx) - if err != nil { - return err - } - p.IsRaftKV2 = isRaftKV2 - return nil -} - // ASTArgsFromPlan creates ASTArgs from plan. func ASTArgsFromPlan(plan *plannercore.LoadData) *ASTArgs { return &ASTArgs{ @@ -507,6 +496,16 @@ func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*Load return c, nil } +// InitTiKVConfigs initializes some TiKV related configs. +func (e *LoadDataController) InitTiKVConfigs(ctx context.Context, sctx sessionctx.Context) error { + isRaftKV2, err := util.IsRaftKv2(ctx, sctx) + if err != nil { + return err + } + e.Plan.IsRaftKV2 = isRaftKV2 + return nil +} + func (e *LoadDataController) checkFieldParams() error { if e.DataSourceType == DataSourceTypeFile && e.Path == "" { return exeerrors.ErrLoadDataEmptyPath @@ -1358,17 +1357,6 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType { return DataSourceTypeFile } -// JobImportParam is the param of the job import. -type JobImportParam struct { - Job *Job - Group *errgroup.Group - GroupCtx context.Context - // should be closed in the end of the job. - Done chan struct{} - - Progress *Progress -} - // JobImportResult is the result of the job import. type JobImportResult struct { Affected uint64 @@ -1376,19 +1364,6 @@ type JobImportResult struct { ColSizeMap map[int64]int64 } -// JobImporter is the interface for importing a job. -type JobImporter interface { - // Param returns the param of the job import. - Param() *JobImportParam - // Import imports the job. - // import should run in routines using param.Group, when import finished, it should close param.Done. - // during import, we should use param.GroupCtx, so this method has no context param. - Import() - // Result returns the result of the job import. - Result() JobImportResult - io.Closer -} - // GetMsgFromBRError get msg from BR error. // TODO: add GetMsg() to errors package to replace this function. // see TestGetMsgFromBRError for more details. diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index f49ae4f5773be..25ab01fe68d67 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -290,8 +290,7 @@ func getTableImporter(ctx context.Context, t *testing.T, store kv.Storage, table if path != "" { require.NoError(t, controller.InitDataStore(ctx)) } - ti, err := importer.NewTableImporterForTest(&importer.JobImportParam{GroupCtx: ctx}, - controller, "11", store, &storeHelper{kvStore: store}) + ti, err := importer.NewTableImporterForTest(ctx, controller, "11", store, &storeHelper{kvStore: store}) require.NoError(t, err) return ti } diff --git a/pkg/executor/importer/progress.go b/pkg/executor/importer/progress.go index fc052add4f0c4..8ee0e882ef925 100644 --- a/pkg/executor/importer/progress.go +++ b/pkg/executor/importer/progress.go @@ -17,32 +17,18 @@ package importer import ( "maps" "sync" - - "github.com/pingcap/tidb/pkg/util/sqlexec" ) -// Job describes a import job. -type Job struct { - ID int64 - // Job don't manage the life cycle of the connection. - Conn sqlexec.SQLExecutor - User string -} - // Progress is the progress of the IMPORT INTO task. type Progress struct { - // SourceFileSize is the size of the source file in bytes. When we can't get - // the size of the source file, it will be set to -1. - SourceFileSize int64 - colSizeMu sync.Mutex - colSizeMap map[int64]int64 + colSizeMu sync.Mutex + colSizeMap map[int64]int64 } // NewProgress creates a new Progress. func NewProgress() *Progress { return &Progress{ - SourceFileSize: -1, - colSizeMap: make(map[int64]int64), + colSizeMap: make(map[int64]int64), } } diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index 5f4f58fce5b77..860cd3ef93de4 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -51,6 +51,7 @@ import ( tidbmetrics "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" tidbutil "github.com/pingcap/tidb/pkg/util" @@ -188,7 +189,7 @@ func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionS } // NewTableImporter creates a new table importer. -func NewTableImporter(param *JobImportParam, e *LoadDataController, id string) (ti *TableImporter, err error) { +func NewTableImporter(ctx context.Context, e *LoadDataController, id string) (ti *TableImporter, err error) { idAlloc := kv.NewPanickingAllocators(0) tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta()) if err != nil { @@ -222,13 +223,12 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController, id string) ( backendConfig := e.getLocalBackendCfg(tidbCfg.Path, dir) d := kvStore.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - localBackend, err := local.NewBackend(param.GroupCtx, tls, backendConfig, d) + localBackend, err := local.NewBackend(ctx, tls, backendConfig, d) if err != nil { return nil, err } return &TableImporter{ - JobImportParam: param, LoadDataController: e, id: id, backend: localBackend, @@ -253,7 +253,6 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController, id string) ( // TableImporter is a table importer. type TableImporter struct { - *JobImportParam *LoadDataController // id is the unique id for this importer. // it's the task id if we are running in distributed framework, else it's an @@ -277,7 +276,7 @@ type TableImporter struct { } // NewTableImporterForTest creates a new table importer for test. -func NewTableImporterForTest(param *JobImportParam, e *LoadDataController, id string, store tidbkv.Storage, helper local.StoreHelper) (*TableImporter, error) { +func NewTableImporterForTest(ctx context.Context, e *LoadDataController, id string, store tidbkv.Storage, helper local.StoreHelper) (*TableImporter, error) { idAlloc := kv.NewPanickingAllocators(0) tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta()) if err != nil { @@ -291,13 +290,12 @@ func NewTableImporterForTest(param *JobImportParam, e *LoadDataController, id st } backendConfig := e.getLocalBackendCfg(tidbCfg.Path, dir) - localBackend, err := local.NewBackendForTest(param.GroupCtx, backendConfig, helper) + localBackend, err := local.NewBackendForTest(ctx, backendConfig, helper) if err != nil { return nil, err } return &TableImporter{ - JobImportParam: param, LoadDataController: e, id: id, backend: localBackend, @@ -1001,3 +999,16 @@ func GetImportRootDir(tidbCfg *tidb.Config) string { sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port)) return filepath.Join(tidbCfg.TempDir, sortPathSuffix) } + +// FlushTableStats flushes the stats of the table. +func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64, result *JobImportResult) error { + if err := sessiontxn.NewTxn(ctx, se); err != nil { + return err + } + sessionVars := se.GetSessionVars() + sessionVars.TxnCtxMu.Lock() + defer sessionVars.TxnCtxMu.Unlock() + sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap) + se.StmtCommit(ctx) + return se.CommitTxn(ctx) +} diff --git a/pkg/executor/importer/table_import_testkit_test.go b/pkg/executor/importer/table_import_testkit_test.go index 199952efc43d1..fab4f939a0263 100644 --- a/pkg/executor/importer/table_import_testkit_test.go +++ b/pkg/executor/importer/table_import_testkit_test.go @@ -96,9 +96,7 @@ func TestImportFromSelectCleanup(t *testing.T) { controller, err := importer.NewLoadDataController(plan, table, &importer.ASTArgs{}) require.NoError(t, err) ti, err := importer.NewTableImporterForTest( - &importer.JobImportParam{ - GroupCtx: ctx, - }, + ctx, controller, "11", store, diff --git a/pkg/metrics/disttask.go b/pkg/metrics/disttask.go index 2e166c49b2d20..c82f070b0eb28 100644 --- a/pkg/metrics/disttask.go +++ b/pkg/metrics/disttask.go @@ -69,7 +69,7 @@ func InitDistTaskMetrics() { } // UpdateMetricsForAddTask update metrics when a task is added -func UpdateMetricsForAddTask(task *proto.Task) { +func UpdateMetricsForAddTask(task *proto.TaskBase) { DistTaskGauge.WithLabelValues(task.Type.String(), WaitingStatus).Inc() DistTaskStarttimeGauge.WithLabelValues(task.Type.String(), WaitingStatus, fmt.Sprint(task.ID)).Set(float64(time.Now().UnixMicro())) } diff --git a/tests/realtikvtest/importintotest/detach_test.go b/tests/realtikvtest/importintotest/detach_test.go index 02c4f2d742cf9..f0787b97e9015 100644 --- a/tests/realtikvtest/importintotest/detach_test.go +++ b/tests/realtikvtest/importintotest/detach_test.go @@ -15,13 +15,15 @@ package importintotest import ( + "context" "fmt" + "strconv" "time" "github.com/fsouza/fake-gcs-server/fakestorage" - "github.com/pingcap/tidb/pkg/executor" - "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" ) type detachedCase struct { @@ -41,11 +43,6 @@ var detachedCases = []detachedCase{ } func (s *mockGCSSuite) TestSameBehaviourDetachedOrNot() { - s.T().Cleanup(func() { - executor.TestDetachedTaskFinished.Store(false) - }) - - testkit.EnableFailPoint(s.T(), "github.com/pingcap/tidb/pkg/executor/testDetachedTaskFinished", "return(true)") s.tk.MustExec("SET SESSION TIME_ZONE = '+08:00';") for _, ca := range detachedCases { s.tk.MustExec("DROP DATABASE IF EXISTS test_detached;") @@ -60,14 +57,18 @@ func (s *mockGCSSuite) TestSameBehaviourDetachedOrNot() { }, Content: []byte(ca.physicalModeData), }) - executor.TestDetachedTaskFinished.Store(false) s.tk.MustQuery(fmt.Sprintf(`IMPORT INTO test_detached.t1 FROM 'gs://test-detached/1.txt?endpoint=%s' WITH thread=1;`, gcsEndpoint)) rows := s.tk.MustQuery(fmt.Sprintf(`IMPORT INTO test_detached.t2 FROM 'gs://test-detached/1.txt?endpoint=%s' WITH DETACHED, thread=1;`, gcsEndpoint)).Rows() require.Len(s.T(), rows, 1) + jobID, err := strconv.Atoi(rows[0][0].(string)) + s.NoError(err) + ctx := context.Background() + ctx = util.WithInternalSourceType(ctx, "taskManager") require.Eventually(s.T(), func() bool { - return executor.TestDetachedTaskFinished.Load() + task := s.getTaskByJobID(ctx, int64(jobID)) + return task.State == proto.TaskStateSucceed }, maxWaitTime, time.Second) r1 := s.tk.MustQuery("SELECT * FROM test_detached.t1").Sort().Rows() diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index e766531f9913d..b6b8ecd275f27 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -676,7 +676,9 @@ func (s *mockGCSSuite) TestMaxWriteSpeed() { s.tk.MustQuery("SELECT count(1) FROM load_test_write_speed.t;").Check(testkit.Rows( strconv.Itoa(lineCount), )) - require.Less(s.T(), duration+5, durationWithLimit) + // previous import might be slower depends on the environment, so we check using 4 seconds here. + // might be unstable. + require.Less(s.T(), duration+4, durationWithLimit) } func (s *mockGCSSuite) TestChecksumNotMatch() { @@ -783,7 +785,6 @@ func (s *mockGCSSuite) checkTaskMetaRedacted(jobID int64) { taskManager, err := storage.GetTaskManager() s.NoError(err) taskKey := importinto.TaskKey(jobID) - s.NoError(err) ctx := context.Background() ctx = util.WithInternalSourceType(ctx, "taskManager") task, err2 := taskManager.GetTaskByKeyWithHistory(ctx, taskKey) diff --git a/tests/realtikvtest/importintotest/job_test.go b/tests/realtikvtest/importintotest/job_test.go index cbe988056db62..d703bd32ca079 100644 --- a/tests/realtikvtest/importintotest/job_test.go +++ b/tests/realtikvtest/importintotest/job_test.go @@ -372,6 +372,15 @@ func (s *mockGCSSuite) TestShowDetachedJob() { s.compareJobInfoWithoutTime(jobInfo, rows[0]) } +func (s *mockGCSSuite) getTaskByJobID(ctx context.Context, jobID int64) *proto.Task { + taskManager, err := storage.GetTaskManager() + s.NoError(err) + taskKey := importinto.TaskKey(jobID) + task, err := taskManager.GetTaskByKeyWithHistory(ctx, taskKey) + s.NoError(err) + return task +} + func (s *mockGCSSuite) TestCancelJob() { s.prepareAndUseDB("test_cancel_job") s.tk.MustExec("CREATE TABLE t1 (i INT PRIMARY KEY);") @@ -400,15 +409,6 @@ func (s *mockGCSSuite) TestCancelJob() { err = s.tk.ExecToErr("cancel import job 9999999999") s.ErrorIs(err, exeerrors.ErrLoadDataJobNotFound) - getTask := func(jobID int64) *proto.Task { - taskManager, err := storage.GetTaskManager() - s.NoError(err) - taskKey := importinto.TaskKey(jobID) - task, err := taskManager.GetTaskByKeyWithHistory(ctx, taskKey) - s.NoError(err) - return task - } - // cancel a running job created by self testkit.EnableFailPoint(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/waitBeforeSortChunk", "return(true)") testkit.EnableFailPoint(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/syncAfterJobStarted", "return(true)") @@ -440,7 +440,7 @@ func (s *mockGCSSuite) TestCancelJob() { } s.compareJobInfoWithoutTime(jobInfo, rows[0]) s.Require().Eventually(func() bool { - task := getTask(int64(jobID1)) + task := s.getTaskByJobID(ctx, int64(jobID1)) return task.State == proto.TaskStateReverted }, maxWaitTime, 500*time.Millisecond) @@ -474,14 +474,14 @@ func (s *mockGCSSuite) TestCancelJob() { s.tk.MustExec(fmt.Sprintf("cancel import job %d", jobID2)) }() s.Require().Eventually(func() bool { - task := getTask(int64(jobID2)) + task := s.getTaskByJobID(ctx, int64(jobID2)) return task.State != proto.TaskStatePending && task.State != proto.TaskStateRunning }, 10*time.Second, 500*time.Millisecond) // resume the job importinto.TestSyncChan <- struct{}{} wg.Wait() // cancel import job will wait dist task done - task := getTask(int64(jobID2)) + task := s.getTaskByJobID(ctx, int64(jobID2)) s.Equal(proto.TaskStateReverted, task.State) rows2 := s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID2)).Rows() s.Len(rows2, 1) @@ -544,13 +544,13 @@ func (s *mockGCSSuite) TestCancelJob() { s.tk.MustExec(fmt.Sprintf("cancel import job %d", jobID2)) }() s.Require().Eventually(func() bool { - task := getTask(int64(jobID2)) + task := s.getTaskByJobID(ctx, int64(jobID2)) return task.State != proto.TaskStatePending && task.State != proto.TaskStateRunning }, 10*time.Second, 500*time.Millisecond) // resume the job importinto.TestSyncChan <- struct{}{} wg.Wait() - task = getTask(int64(jobID2)) + task = s.getTaskByJobID(ctx, int64(jobID2)) s.Equal(proto.TaskStateReverted, task.State) rows = s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID2)).Rows() s.Len(rows, 1) diff --git a/tests/realtikvtest/importintotest/one_parquet_test.go b/tests/realtikvtest/importintotest/one_parquet_test.go index fcebf684f9261..d4f21d0bc5b4e 100644 --- a/tests/realtikvtest/importintotest/one_parquet_test.go +++ b/tests/realtikvtest/importintotest/one_parquet_test.go @@ -15,16 +15,19 @@ package importintotest import ( + "context" _ "embed" "fmt" "os" "path" + "strconv" "time" "github.com/fsouza/fake-gcs-server/fakestorage" - "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" ) //go:embed test.parquet @@ -59,14 +62,17 @@ func (s *mockGCSSuite) TestDetachedLoadParquet() { )) s.tk.MustExec("TRUNCATE TABLE t;") - s.T().Cleanup(func() { executor.TestDetachedTaskFinished.Store(false) }) - testkit.EnableFailPoint(s.T(), "github.com/pingcap/tidb/pkg/executor/testDetachedTaskFinished", "return(true)") sql := fmt.Sprintf(`IMPORT INTO t FROM 'gs://test-load-parquet/p?endpoint=%s' FORMAT 'parquet' WITH detached;`, gcsEndpoint) rows := s.tk.MustQuery(sql).Rows() require.Len(s.T(), rows, 1) + jobID, err := strconv.Atoi(rows[0][0].(string)) + s.NoError(err) + ctx := context.Background() + ctx = util.WithInternalSourceType(ctx, "taskManager") require.Eventually(s.T(), func() bool { - return executor.TestDetachedTaskFinished.Load() + task := s.getTaskByJobID(ctx, int64(jobID)) + return task.State == proto.TaskStateSucceed }, maxWaitTime, time.Second) s.tk.MustQuery("SELECT * FROM t;").Check(testkit.Rows( diff --git a/tests/realtikvtest/importintotest3/BUILD.bazel b/tests/realtikvtest/importintotest3/BUILD.bazel index f513a6e4a142f..4f47492272bb6 100644 --- a/tests/realtikvtest/importintotest3/BUILD.bazel +++ b/tests/realtikvtest/importintotest3/BUILD.bazel @@ -13,13 +13,17 @@ go_test( deps = [ "//br/pkg/lightning/mydump", "//pkg/config", + "//pkg/disttask/framework/storage", "//pkg/disttask/framework/testutil", + "//pkg/disttask/importinto", "//pkg/kv", "//pkg/testkit", "//tests/realtikvtest", "@com_github_fsouza_fake_gcs_server//fakestorage", "@com_github_golang_snappy//:snappy", "@com_github_klauspost_compress//zstd", + "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_tikv_client_go_v2//util", ], ) diff --git a/tests/realtikvtest/importintotest3/from_server_test.go b/tests/realtikvtest/importintotest3/from_server_test.go index be525dd868391..41219cd7e0ca7 100644 --- a/tests/realtikvtest/importintotest3/from_server_test.go +++ b/tests/realtikvtest/importintotest3/from_server_test.go @@ -15,12 +15,19 @@ package importintotest import ( + "context" + "encoding/json" "fmt" "os" "path" + "strconv" "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/disttask/importinto" "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" ) func (s *mockGCSSuite) TestImportFromServer() { @@ -53,6 +60,17 @@ func (s *mockGCSSuite) TestImportFromServer() { s.getCompressedData(mydump.CompressionGZ, []byte("1,test1\n2,test2")), 0o644)) s.tk.MustExec("truncate table t") - s.tk.MustQuery(fmt.Sprintf("IMPORT INTO t FROM '%s'", path.Join(tempDir, "test.csv.gz"))) + rows := s.tk.MustQuery(fmt.Sprintf("IMPORT INTO t FROM '%s'", path.Join(tempDir, "test.csv.gz"))).Rows() s.tk.MustQuery("SELECT * FROM t;").Sort().Check(testkit.Rows([]string{"1 test1", "2 test2"}...)) + jobID, err := strconv.Atoi(rows[0][0].(string)) + s.NoError(err) + taskManager, err := storage.GetTaskManager() + s.NoError(err) + taskKey := importinto.TaskKey(int64(jobID)) + ctx := util.WithInternalSourceType(context.Background(), "taskManager") + task, err2 := taskManager.GetTaskByKeyWithHistory(ctx, taskKey) + s.NoError(err2) + var taskMeta importinto.TaskMeta + require.NoError(s.T(), json.Unmarshal(task.Meta, &taskMeta)) + require.Len(s.T(), taskMeta.ChunkMap, 2) }