Skip to content

Commit

Permalink
importinto: remove separate wait routine in detached mode and refactor (
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Feb 27, 2024
1 parent 707b0a4 commit 332086e
Show file tree
Hide file tree
Showing 18 changed files with 144 additions and 320 deletions.
2 changes: 1 addition & 1 deletion pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, co
if err != nil {
return nil, err
}
metrics.UpdateMetricsForAddTask(task)
metrics.UpdateMetricsForAddTask(&task.TaskBase)

NotifyTaskChange()
return task, nil
Expand Down
153 changes: 22 additions & 131 deletions pkg/disttask/importinto/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"encoding/json"
"fmt"

"github.com/google/uuid"
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
Expand All @@ -38,105 +38,26 @@ import (
"go.uber.org/zap"
)

// DistImporter is a JobImporter for distributed IMPORT INTO.
type DistImporter struct {
*importer.JobImportParam
plan *importer.Plan
stmt string
logger *zap.Logger
// the instance to import data, used for single-node import, nil means import data on all instances.
instance *infosync.ServerInfo
// the files to import, when import from server file, we need to pass those file to the framework.
chunkMap map[int32][]Chunk
sourceFileSize int64
// only set after submit task
jobID int64
taskID int64
}

// NewDistImporter creates a new DistImporter.
func NewDistImporter(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error) {
return &DistImporter{
JobImportParam: param,
plan: plan,
stmt: stmt,
logger: logutil.BgLogger(),
sourceFileSize: sourceFileSize,
}, nil
}

// NewDistImporterCurrNode creates a new DistImporter to import data on current node.
func NewDistImporterCurrNode(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error) {
// SubmitStandaloneTask submits a task to the distribute framework that only runs on the current node.
// when import from server-disk, pass engine checkpoints too, as scheduler might run on another
// node where we can't access the data files.
func SubmitStandaloneTask(ctx context.Context, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint) (int64, *proto.TaskBase, error) {
serverInfo, err := infosync.GetServerInfo()
if err != nil {
return nil, err
}
return &DistImporter{
JobImportParam: param,
plan: plan,
stmt: stmt,
logger: logutil.BgLogger(),
instance: serverInfo,
sourceFileSize: sourceFileSize,
}, nil
}

// NewDistImporterServerFile creates a new DistImporter to import given files on current node.
// we also run import on current node.
// todo: merge all 3 ctor into one.
func NewDistImporterServerFile(param *importer.JobImportParam, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint, sourceFileSize int64) (*DistImporter, error) {
distImporter, err := NewDistImporterCurrNode(param, plan, stmt, sourceFileSize)
if err != nil {
return nil, err
}
distImporter.chunkMap = toChunkMap(ecp)
return distImporter, nil
}

// Param implements JobImporter.Param.
func (ti *DistImporter) Param() *importer.JobImportParam {
return ti.JobImportParam
}

// Import implements JobImporter.Import.
func (*DistImporter) Import() {
// todo: remove it
}

// ImportTask import task.
func (ti *DistImporter) ImportTask(task *proto.Task) {
ti.logger.Info("start distribute IMPORT INTO")
ti.Group.Go(func() error {
defer close(ti.Done)
// task is run using distribute framework, so we only wait for the task to finish.
return handle.WaitTaskDoneOrPaused(ti.GroupCtx, task.ID)
})
}

// Result implements JobImporter.Result.
func (ti *DistImporter) Result(ctx context.Context) importer.JobImportResult {
var result importer.JobImportResult
taskMeta, err := getTaskMeta(ctx, ti.jobID)
if err != nil {
return result
}

return importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
return 0, nil, err
}
return doSubmitTask(ctx, plan, stmt, serverInfo, toChunkMap(ecp))
}

// Close implements the io.Closer interface.
func (*DistImporter) Close() error {
return nil
// SubmitTask submits a task to the distribute framework that runs on all managed nodes.
func SubmitTask(ctx context.Context, plan *importer.Plan, stmt string) (int64, *proto.TaskBase, error) {
return doSubmitTask(ctx, plan, stmt, nil, nil)
}

// SubmitTask submits a task to the distribute framework.
func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, error) {
func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instance *infosync.ServerInfo, chunkMap map[int32][]Chunk) (int64, *proto.TaskBase, error) {
var instances []*infosync.ServerInfo
if ti.instance != nil {
instances = append(instances, ti.instance)
if instance != nil {
instances = append(instances, instance)
}
// we use taskManager to submit task, user might not have the privilege to system tables.
taskManager, err := storage.GetTaskManager()
Expand All @@ -146,7 +67,6 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err
}

var jobID, taskID int64
plan := ti.plan
if err = taskManager.WithNewTxn(ctx, func(se sessionctx.Context) error {
var err2 error
exec := se.(sqlexec.SQLExecutor)
Expand All @@ -163,7 +83,7 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("there's pending or running jobs")
}
jobID, err2 = importer.CreateJob(ctx, exec, plan.DBName, plan.TableInfo.Name.L, plan.TableInfo.ID,
plan.User, plan.Parameters, ti.sourceFileSize)
plan.User, plan.Parameters, plan.TotalFileSize)
if err2 != nil {
return err2
}
Expand All @@ -173,9 +93,9 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err
logicalPlan := &LogicalPlan{
JobID: jobID,
Plan: *plan,
Stmt: ti.stmt,
Stmt: stmt,
EligibleInstances: instances,
ChunkMap: ti.chunkMap,
ChunkMap: chunkMap,
}
planCtx := planner.PlanCtx{
Ctx: ctx,
Expand All @@ -194,51 +114,22 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err
return 0, nil, err
}
handle.NotifyTaskChange()
task, err := taskManager.GetTaskByID(ctx, taskID)
task, err := taskManager.GetTaskBaseByID(ctx, taskID)
if err != nil {
return 0, nil, err
}

metrics.UpdateMetricsForAddTask(task)
// update logger with task id.
ti.jobID = jobID
ti.taskID = taskID
ti.logger = ti.logger.With(zap.Int64("task-id", task.ID))

ti.logger.Info("job submitted to task queue",
zap.Int64("job-id", jobID), zap.Int("thread-cnt", plan.ThreadCnt))
logutil.BgLogger().Info("job submitted to task queue",
zap.Int64("job-id", jobID),
zap.Int64("task-id", task.ID),
zap.String("data-size", units.BytesSize(float64(plan.TotalFileSize))),
zap.Int("thread-cnt", plan.ThreadCnt))

return jobID, task, nil
}

func (*DistImporter) taskKey() string {
// task key is meaningless to IMPORT INTO, so we use a random uuid.
return fmt.Sprintf("%s/%s", proto.ImportInto, uuid.New().String())
}

// JobID returns the job id.
func (ti *DistImporter) JobID() int64 {
return ti.jobID
}

func getTaskMeta(ctx context.Context, jobID int64) (*TaskMeta, error) {
taskManager, err := storage.GetTaskManager()
ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
if err != nil {
return nil, err
}
taskKey := TaskKey(jobID)
task, err := taskManager.GetTaskByKey(ctx, taskKey)
if err != nil {
return nil, err
}
var taskMeta TaskMeta
if err := json.Unmarshal(task.Meta, &taskMeta); err != nil {
return nil, errors.Trace(err)
}
return &taskMeta, nil
}

// GetTaskImportedRows gets the number of imported rows of a job.
// Note: for finished job, we can get the number of imported rows from task meta.
func GetTaskImportedRows(ctx context.Context, jobID int64) (uint64, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
return handle.RunWithRetry(ctx, scheduler.RetrySQLTimes, backoffer, logger,
func(ctx context.Context) (bool, error) {
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
}); err != nil {
logger.Warn("flush table stats failed", zap.Error(err))
}
exec := se.(sqlexec.SQLExecutor)
return importer.FinishJob(ctx, exec, taskMeta.JobID, summary)
})
Expand Down
6 changes: 1 addition & 5 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ func getTableImporter(ctx context.Context, taskID int64, taskMeta *TaskMeta) (*i
return nil, err
}

return importer.NewTableImporter(&importer.JobImportParam{
GroupCtx: ctx,
Progress: importer.NewProgress(),
Job: &importer.Job{},
}, controller, strconv.FormatInt(taskID, 10))
return importer.NewTableImporter(ctx, controller, strconv.FormatInt(taskID, 10))
}

func (s *importStepExecutor) Init(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 332086e

Please sign in to comment.