From 986b00849c0392bdc5ff3812d92590bf8e2f6fb4 Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Thu, 11 Jan 2024 13:28:55 +0800 Subject: [PATCH] disttask: storage.util move to testutil (#50252) ref pingcap/tidb#48795 --- .../framework_pause_and_resume_test.go | 10 +- pkg/disttask/framework/framework_role_test.go | 2 +- pkg/disttask/framework/framework_test.go | 4 +- pkg/disttask/framework/storage/BUILD.bazel | 1 - pkg/disttask/framework/storage/table_test.go | 22 +-- pkg/disttask/framework/storage/task_state.go | 8 +- pkg/disttask/framework/storage/task_table.go | 99 ++++++------ pkg/disttask/framework/storage/util.go | 145 ------------------ pkg/disttask/framework/testutil/BUILD.bazel | 1 + pkg/disttask/framework/testutil/table_util.go | 121 +++++++++++++++ 10 files changed, 191 insertions(+), 222 deletions(-) delete mode 100644 pkg/disttask/framework/storage/util.go diff --git a/pkg/disttask/framework/framework_pause_and_resume_test.go b/pkg/disttask/framework/framework_pause_and_resume_test.go index b864857215782..28a2ad92d3de0 100644 --- a/pkg/disttask/framework/framework_pause_and_resume_test.go +++ b/pkg/disttask/framework/framework_pause_and_resume_test.go @@ -30,12 +30,14 @@ import ( func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state proto.SubtaskState, expectedCnt int64) { mgr, err := storage.GetTaskManager() require.NoError(t, err) - mgr.PrintSubtaskInfo(ctx, taskID) - cntByStates, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepTwo) + testutil.PrintSubtaskInfo(ctx, mgr, taskID) + cntByStatesStepOne, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepOne) require.NoError(t, err) - historySubTasksCnt, err := storage.GetSubtasksFromHistoryByTaskIDForTest(ctx, mgr, taskID) + cntByStatesStepTwo, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepTwo) require.NoError(t, err) - require.Equal(t, expectedCnt, cntByStates[state]+int64(historySubTasksCnt)) + historySubTasksCnt, err := testutil.GetSubtasksFromHistoryByTaskID(ctx, mgr, taskID) + require.NoError(t, err) + require.Equal(t, expectedCnt, cntByStatesStepOne[state]+cntByStatesStepTwo[state]+int64(historySubTasksCnt)) } func TestFrameworkPauseAndResume(t *testing.T) { diff --git a/pkg/disttask/framework/framework_role_test.go b/pkg/disttask/framework/framework_role_test.go index 9633bd1e28823..337189c02cd82 100644 --- a/pkg/disttask/framework/framework_role_test.go +++ b/pkg/disttask/framework/framework_role_test.go @@ -31,7 +31,7 @@ import ( func checkSubtaskOnNodes(ctx context.Context, t *testing.T, taskID int64, expectedNodes []string) { mgr, err := storage.GetTaskManager() require.NoError(t, err) - nodes, err := storage.GetSubtaskNodesForTest(ctx, mgr, taskID) + nodes, err := testutil.GetSubtaskNodes(ctx, mgr, taskID) require.NoError(t, err) slices.Sort(nodes) slices.Sort(expectedNodes) diff --git a/pkg/disttask/framework/framework_test.go b/pkg/disttask/framework/framework_test.go index f937907f8ad68..1842dcfc932a6 100644 --- a/pkg/disttask/framework/framework_test.go +++ b/pkg/disttask/framework/framework_test.go @@ -251,7 +251,7 @@ func TestGC(t *testing.T) { var historySubTasksCnt int require.Eventually(t, func() bool { - historySubTasksCnt, err = storage.GetSubtasksFromHistoryForTest(ctx, mgr) + historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, mgr) if err != nil { return false } @@ -261,7 +261,7 @@ func TestGC(t *testing.T) { scheduler.WaitTaskFinished <- struct{}{} require.Eventually(t, func() bool { - historySubTasksCnt, err := storage.GetSubtasksFromHistoryForTest(ctx, mgr) + historySubTasksCnt, err := testutil.GetSubtasksFromHistory(ctx, mgr) if err != nil { return false } diff --git a/pkg/disttask/framework/storage/BUILD.bazel b/pkg/disttask/framework/storage/BUILD.bazel index 0af0ea5551dd9..c6bc56af48cd8 100644 --- a/pkg/disttask/framework/storage/BUILD.bazel +++ b/pkg/disttask/framework/storage/BUILD.bazel @@ -5,7 +5,6 @@ go_library( srcs = [ "task_state.go", "task_table.go", - "util.go", ], importpath = "github.com/pingcap/tidb/pkg/disttask/framework/storage", visibility = ["//visibility:public"], diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index 8b76d60e68066..3e3bce31c9f6f 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -140,7 +140,7 @@ func TestTaskTable(t *testing.T) { require.NoError(t, err) require.Equal(t, proto.TaskStateFailed, task.State) require.ErrorContains(t, task.Error, "test error") - endTime, err := storage.GetTaskEndTimeForTest(ctx, gm, id) + endTime, err := testutil.GetTaskEndTime(ctx, gm, id) require.NoError(t, err) require.LessOrEqual(t, endTime.Sub(curTime), time.Since(curTime)) require.GreaterOrEqual(t, endTime, curTime) @@ -619,7 +619,7 @@ func TestSubTaskTable(t *testing.T) { require.Greater(t, subtask.StartTime, ts) require.Greater(t, subtask.UpdateTime, ts) - endTime, err := storage.GetSubtaskEndTimeForTest(ctx, sm, subtask.ID) + endTime, err := testutil.GetSubtaskEndTime(ctx, sm, subtask.ID) require.NoError(t, err) require.Greater(t, endTime, ts) @@ -641,7 +641,7 @@ func TestSubTaskTable(t *testing.T) { require.NoError(t, err) require.Equal(t, subtask2.StartTime, subtask.StartTime) require.Greater(t, subtask2.UpdateTime, subtask.UpdateTime) - endTime, err = storage.GetSubtaskEndTimeForTest(ctx, sm, subtask.ID) + endTime, err = testutil.GetSubtaskEndTime(ctx, sm, subtask.ID) require.NoError(t, err) require.Greater(t, endTime, ts) @@ -962,10 +962,10 @@ func TestSubtaskHistoryTable(t *testing.T) { testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb3, []byte(meta), proto.TaskTypeExample, 11, false) require.NoError(t, sm.UpdateSubtaskStateAndError(ctx, tidb3, subTask3, proto.SubtaskStateFailed, nil)) - subTasks, err := storage.GetSubtasksByTaskIDForTest(ctx, sm, taskID) + subTasks, err := testutil.GetSubtasksByTaskID(ctx, sm, taskID) require.NoError(t, err) require.Len(t, subTasks, 3) - historySubTasksCnt, err := storage.GetSubtasksFromHistoryForTest(ctx, sm) + historySubTasksCnt, err := testutil.GetSubtasksFromHistory(ctx, sm) require.NoError(t, err) require.Equal(t, 0, historySubTasksCnt) subTasks, err = sm.GetSubtasksForImportInto(ctx, taskID, proto.StepInit) @@ -975,10 +975,10 @@ func TestSubtaskHistoryTable(t *testing.T) { // test TransferSubTasks2History require.NoError(t, sm.TransferSubTasks2History(ctx, taskID)) - subTasks, err = storage.GetSubtasksByTaskIDForTest(ctx, sm, taskID) + subTasks, err = testutil.GetSubtasksByTaskID(ctx, sm, taskID) require.NoError(t, err) require.Len(t, subTasks, 0) - historySubTasksCnt, err = storage.GetSubtasksFromHistoryForTest(ctx, sm) + historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, sm) require.NoError(t, err) require.Equal(t, 3, historySubTasksCnt) subTasks, err = sm.GetSubtasksForImportInto(ctx, taskID, proto.StepInit) @@ -998,7 +998,7 @@ func TestSubtaskHistoryTable(t *testing.T) { require.NoError(t, sm.GCSubtasks(ctx)) - historySubTasksCnt, err = storage.GetSubtasksFromHistoryForTest(ctx, sm) + historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, sm) require.NoError(t, err) require.Equal(t, 1, historySubTasksCnt) } @@ -1021,7 +1021,7 @@ func TestTaskHistoryTable(t *testing.T) { tasks, err = gm.GetTasksInStates(ctx, proto.TaskStatePending) require.NoError(t, err) require.Equal(t, 0, len(tasks)) - num, err := storage.GetTasksFromHistoryForTest(ctx, gm) + num, err := testutil.GetTasksFromHistory(ctx, gm) require.NoError(t, err) require.Equal(t, 2, num) @@ -1041,7 +1041,7 @@ func TestTaskHistoryTable(t *testing.T) { require.Equal(t, 1, len(tasks)) tasks[0].Error = errors.New("mock err") require.NoError(t, gm.TransferTasks2History(ctx, tasks)) - num, err = storage.GetTasksFromHistoryForTest(ctx, gm) + num, err = testutil.GetTasksFromHistory(ctx, gm) require.NoError(t, err) require.Equal(t, 3, num) } @@ -1094,7 +1094,7 @@ func TestCancelAndExecIdChanged(t *testing.T) { // 2. change the exec_id // exec_id changed - require.NoError(t, sm.UpdateSubtaskExecID(ctx, "tidb2", subtask.ID)) + require.NoError(t, testutil.UpdateSubtaskExecID(ctx, sm, "tidb2", subtask.ID)) // exec_id in memory unchanged, call UpdateSubtaskStateAndError. require.NoError(t, sm.UpdateSubtaskStateAndError(ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateFailed, nil)) subtask, err = sm.GetFirstSubtaskInStates(ctx, "tidb2", 1, proto.StepInit, proto.SubtaskStatePending) diff --git a/pkg/disttask/framework/storage/task_state.go b/pkg/disttask/framework/storage/task_state.go index 0252707185e58..72761aaeb1b5f 100644 --- a/pkg/disttask/framework/storage/task_state.go +++ b/pkg/disttask/framework/storage/task_state.go @@ -24,7 +24,7 @@ import ( // CancelTask cancels task. func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, state_update_time = CURRENT_TIMESTAMP() @@ -47,7 +47,7 @@ func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Co // FailTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error { - _, err := mgr.executeSQLWithNewSession(ctx, + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, error = %?, @@ -61,7 +61,7 @@ func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState // RevertedTask implements the scheduler.TaskManager interface. func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, state_update_time = CURRENT_TIMESTAMP(), @@ -99,7 +99,7 @@ func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, er // PausedTask update the task state from pausing to paused. func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_global_task set state = %?, state_update_time = CURRENT_TIMESTAMP(), diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 02f3598924954..cc2fc340e5359 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -47,10 +47,10 @@ const ( // TODO: dispatcher_id will update to scheduler_id later taskColumns = basicTaskColumns + `, start_time, state_update_time, meta, dispatcher_id, error` // InsertTaskColumns is the columns used in insert task. - InsertTaskColumns = `task_key, type, state, priority, concurrency, step, meta, create_time` - + InsertTaskColumns = `task_key, type, state, priority, concurrency, step, meta, create_time` basicSubtaskColumns = `id, step, task_key, type, exec_id, state, concurrency, create_time, ordinal` - subtaskColumns = basicSubtaskColumns + `, start_time, state_update_time, meta, summary` + // SubtaskColumns is the columns for subtask. + SubtaskColumns = basicSubtaskColumns + `, start_time, state_update_time, meta, summary` // InsertSubtaskColumns is the columns used in insert subtask. InsertSubtaskColumns = `step, task_key, exec_id, meta, state, type, concurrency, ordinal, create_time, checkpoint, summary` ) @@ -214,7 +214,8 @@ func (mgr *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Co }) } -func (mgr *TaskManager) executeSQLWithNewSession(ctx context.Context, sql string, args ...interface{}) (rs []chunk.Row, err error) { +// ExecuteSQLWithNewSession executes one SQL with new session. +func (mgr *TaskManager) ExecuteSQLWithNewSession(ctx context.Context, sql string, args ...interface{}) (rs []chunk.Row, err error) { err = mgr.WithNewSession(func(se sessionctx.Context) error { rs, err = sqlexec.ExecSQL(ctx, se, sql, args...) return err @@ -267,7 +268,7 @@ func (mgr *TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx // GetOneTask get a task from task table, it's used by scheduler only. func (mgr *TaskManager) GetOneTask(ctx context.Context) (task *proto.Task, err error) { - rs, err := mgr.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where state = %? limit 1", proto.TaskStatePending) + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where state = %? limit 1", proto.TaskStatePending) if err != nil { return task, err } @@ -281,7 +282,7 @@ func (mgr *TaskManager) GetOneTask(ctx context.Context) (task *proto.Task, err e // GetTopUnfinishedTasks implements the scheduler.TaskManager interface. func (mgr *TaskManager) GetTopUnfinishedTasks(ctx context.Context) (task []*proto.Task, err error) { - rs, err := mgr.executeSQLWithNewSession(ctx, + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicTaskColumns+` from mysql.tidb_global_task where state in (%?, %?, %?, %?, %?, %?) order by priority asc, create_time asc, id asc @@ -310,7 +311,7 @@ func (mgr *TaskManager) GetTasksInStates(ctx context.Context, states ...interfac return task, nil } - rs, err := mgr.executeSQLWithNewSession(ctx, + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task "+ "where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)"+ " order by priority asc, create_time asc, id asc", states...) @@ -330,7 +331,7 @@ func (mgr *TaskManager) GetTasksFromHistoryInStates(ctx context.Context, states return task, nil } - rs, err := mgr.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task_history where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)", states...) + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task_history where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)", states...) if err != nil { return task, err } @@ -343,7 +344,7 @@ func (mgr *TaskManager) GetTasksFromHistoryInStates(ctx context.Context, states // GetTaskByID gets the task by the task ID. func (mgr *TaskManager) GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error) { - rs, err := mgr.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where id = %?", taskID) + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where id = %?", taskID) if err != nil { return task, err } @@ -356,7 +357,7 @@ func (mgr *TaskManager) GetTaskByID(ctx context.Context, taskID int64) (task *pr // GetTaskByIDWithHistory gets the task by the task ID from both tidb_global_task and tidb_global_task_history. func (mgr *TaskManager) GetTaskByIDWithHistory(ctx context.Context, taskID int64) (task *proto.Task, err error) { - rs, err := mgr.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where id = %? "+ + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where id = %? "+ "union select "+taskColumns+" from mysql.tidb_global_task_history where id = %?", taskID, taskID) if err != nil { return task, err @@ -370,7 +371,7 @@ func (mgr *TaskManager) GetTaskByIDWithHistory(ctx context.Context, taskID int64 // GetTaskByKey gets the task by the task key. func (mgr *TaskManager) GetTaskByKey(ctx context.Context, key string) (task *proto.Task, err error) { - rs, err := mgr.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where task_key = %?", key) + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where task_key = %?", key) if err != nil { return task, err } @@ -383,7 +384,7 @@ func (mgr *TaskManager) GetTaskByKey(ctx context.Context, key string) (task *pro // GetTaskByKeyWithHistory gets the task from history table by the task key. func (mgr *TaskManager) GetTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error) { - rs, err := mgr.executeSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where task_key = %?"+ + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select "+taskColumns+" from mysql.tidb_global_task where task_key = %?"+ "union select "+taskColumns+" from mysql.tidb_global_task_history where task_key = %?", key, key) if err != nil { return task, err @@ -399,7 +400,7 @@ func (mgr *TaskManager) GetTaskByKeyWithHistory(ctx context.Context, key string) func (mgr *TaskManager) GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error) { // concurrency of subtasks of some step is the same, we use max(concurrency) // to make group by works. - rs, err := mgr.executeSQLWithNewSession(ctx, ` + rs, err := mgr.ExecuteSQLWithNewSession(ctx, ` select exec_id, sum(concurrency) from ( @@ -449,8 +450,8 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask { return subtask } -// row2SubTask converts a row to a subtask. -func row2SubTask(r chunk.Row) *proto.Subtask { +// Row2SubTask converts a row to a subtask. +func Row2SubTask(r chunk.Row) *proto.Subtask { subtask := row2BasicSubTask(r) // subtask defines start/update time as bigint, to ensure backward compatible, // we keep it that way, and we convert it here. @@ -476,7 +477,7 @@ func (mgr *TaskManager) GetSubtasksByStepAndStates(ctx context.Context, tidbID s for _, state := range states { args = append(args, state) } - rs, err := mgr.executeSQLWithNewSession(ctx, `select `+subtaskColumns+` from mysql.tidb_background_subtask + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+SubtaskColumns+` from mysql.tidb_background_subtask where exec_id = %? and task_key = %? and step = %? and state in (`+strings.Repeat("%?,", len(states)-1)+"%?)", args...) if err != nil { @@ -485,7 +486,7 @@ func (mgr *TaskManager) GetSubtasksByStepAndStates(ctx context.Context, tidbID s subtasks := make([]*proto.Subtask, len(rs)) for i, row := range rs { - subtasks[i] = row2SubTask(row) + subtasks[i] = Row2SubTask(row) } return subtasks, nil } @@ -496,7 +497,7 @@ func (mgr *TaskManager) GetSubtasksByExecIdsAndStepAndState(ctx context.Context, for _, execID := range execIDs { args = append(args, execID) } - rs, err := mgr.executeSQLWithNewSession(ctx, `select `+subtaskColumns+` from mysql.tidb_background_subtask + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+SubtaskColumns+` from mysql.tidb_background_subtask where task_key = %? and step = %? and state = %? and exec_id in (`+strings.Repeat("%?,", len(execIDs)-1)+"%?)", args...) if err != nil { @@ -505,7 +506,7 @@ func (mgr *TaskManager) GetSubtasksByExecIdsAndStepAndState(ctx context.Context, subtasks := make([]*proto.Subtask, len(rs)) for i, row := range rs { - subtasks[i] = row2SubTask(row) + subtasks[i] = Row2SubTask(row) } return subtasks, nil } @@ -516,7 +517,7 @@ func (mgr *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID stri for _, state := range states { args = append(args, state) } - rs, err := mgr.executeSQLWithNewSession(ctx, `select `+subtaskColumns+` from mysql.tidb_background_subtask + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+SubtaskColumns+` from mysql.tidb_background_subtask where exec_id = %? and task_key = %? and step = %? and state in (`+strings.Repeat("%?,", len(states)-1)+"%?) limit 1", args...) if err != nil { @@ -526,17 +527,7 @@ func (mgr *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID stri if len(rs) == 0 { return nil, nil } - return row2SubTask(rs[0]), nil -} - -// UpdateSubtaskExecID updates the subtask's exec_id, used for testing now. -func (mgr *TaskManager) UpdateSubtaskExecID(ctx context.Context, tidbID string, subtaskID int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, - `update mysql.tidb_background_subtask - set exec_id = %?, state_update_time = unix_timestamp() - where id = %?`, - tidbID, subtaskID) - return err + return Row2SubTask(rs[0]), nil } // UpdateErrorToSubtask updates the error to subtask. @@ -544,7 +535,7 @@ func (mgr *TaskManager) UpdateErrorToSubtask(ctx context.Context, execID string, if err == nil { return nil } - _, err1 := mgr.executeSQLWithNewSession(ctx, + _, err1 := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_background_subtask set state = %?, error = %?, @@ -566,7 +557,7 @@ func (mgr *TaskManager) UpdateErrorToSubtask(ctx context.Context, execID string, // GetActiveSubtasks implements TaskManager.GetActiveSubtasks. func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.Subtask, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, ` + rs, err := mgr.ExecuteSQLWithNewSession(ctx, ` select `+basicSubtaskColumns+` from mysql.tidb_background_subtask where task_key = %? and state in (%?, %?)`, taskID, proto.TaskStatePending, proto.TaskStateRunning) @@ -582,7 +573,7 @@ func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([] // GetSubtasksByStepAndState gets the subtask by step and state. func (mgr *TaskManager) GetSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.TaskState) ([]*proto.Subtask, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, `select `+subtaskColumns+` from mysql.tidb_background_subtask + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+SubtaskColumns+` from mysql.tidb_background_subtask where task_key = %? and state = %? and step = %?`, taskID, state, step) if err != nil { @@ -593,14 +584,14 @@ func (mgr *TaskManager) GetSubtasksByStepAndState(ctx context.Context, taskID in } subtasks := make([]*proto.Subtask, 0, len(rs)) for _, r := range rs { - subtasks = append(subtasks, row2SubTask(r)) + subtasks = append(subtasks, Row2SubTask(r)) } return subtasks, nil } // GetSubtaskRowCount gets the subtask row count. func (mgr *TaskManager) GetSubtaskRowCount(ctx context.Context, taskID int64, step proto.Step) (int64, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, `select + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select cast(sum(json_extract(summary, '$.row_count')) as signed) as row_count from mysql.tidb_background_subtask where task_key = %? and step = %?`, taskID, step) @@ -615,7 +606,7 @@ func (mgr *TaskManager) GetSubtaskRowCount(ctx context.Context, taskID int64, st // UpdateSubtaskRowCount updates the subtask row count. func (mgr *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int64, rowCount int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_background_subtask set summary = json_set(summary, '$.row_count', %?) where id = %?`, rowCount, subtaskID) @@ -624,7 +615,7 @@ func (mgr *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int // GetSubtaskCntGroupByStates gets the subtask count by states. func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, ` + rs, err := mgr.ExecuteSQLWithNewSession(ctx, ` select state, count(*) from mysql.tidb_background_subtask where task_key = %? and step = %? @@ -645,7 +636,7 @@ func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID i // CollectSubTaskError collects the subtask error. func (mgr *TaskManager) CollectSubTaskError(ctx context.Context, taskID int64) ([]error, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select error from mysql.tidb_background_subtask where task_key = %? AND state in (%?, %?)`, taskID, proto.SubtaskStateFailed, proto.SubtaskStateCanceled) if err != nil { @@ -679,7 +670,7 @@ func (mgr *TaskManager) HasSubtasksInStates(ctx context.Context, tidbID string, for _, state := range states { args = append(args, state) } - rs, err := mgr.executeSQLWithNewSession(ctx, `select 1 from mysql.tidb_background_subtask + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select 1 from mysql.tidb_background_subtask where exec_id = %? and task_key = %? and step = %? and state in (`+strings.Repeat("%?,", len(states)-1)+"%?) limit 1", args...) if err != nil { @@ -738,7 +729,7 @@ func (*TaskManager) InitMetaSession(ctx context.Context, se sessionctx.Context, // if not there might has a data race. func (mgr *TaskManager) RecoverMeta(ctx context.Context, execID string, role string) error { cpuCount := cpu.GetCPUCount() - _, err := mgr.executeSQLWithNewSession(ctx, ` + _, err := mgr.ExecuteSQLWithNewSession(ctx, ` insert into mysql.dist_framework_meta(host, role, cpu_count, keyspace_id) values (%?, %?, %?, -1) on duplicate key @@ -752,7 +743,7 @@ func (mgr *TaskManager) UpdateSubtaskStateAndError( ctx context.Context, execID string, id int64, state proto.SubtaskState, subTaskErr error) error { - _, err := mgr.executeSQLWithNewSession(ctx, `update mysql.tidb_background_subtask + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_background_subtask set state = %?, error = %?, state_update_time = unix_timestamp() where id = %? and exec_id = %?`, state, serializeErr(subTaskErr), id, execID) return err @@ -760,7 +751,7 @@ func (mgr *TaskManager) UpdateSubtaskStateAndError( // FinishSubtask updates the subtask meta and mark state to succeed. func (mgr *TaskManager) FinishSubtask(ctx context.Context, execID string, id int64, meta []byte) error { - _, err := mgr.executeSQLWithNewSession(ctx, `update mysql.tidb_background_subtask + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_background_subtask set meta = %?, state = %?, state_update_time = unix_timestamp(), end_time = CURRENT_TIMESTAMP() where id = %? and exec_id = %?`, meta, proto.TaskStateSucceed, id, execID) @@ -769,7 +760,7 @@ func (mgr *TaskManager) FinishSubtask(ctx context.Context, execID string, id int // DeleteSubtasksByTaskID deletes the subtask of the given task ID. func (mgr *TaskManager) DeleteSubtasksByTaskID(ctx context.Context, taskID int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, `delete from mysql.tidb_background_subtask + _, err := mgr.ExecuteSQLWithNewSession(ctx, `delete from mysql.tidb_background_subtask where task_key = %?`, taskID) if err != nil { return err @@ -780,7 +771,7 @@ func (mgr *TaskManager) DeleteSubtasksByTaskID(ctx context.Context, taskID int64 // GetTaskExecutorIDsByTaskID gets the task executor IDs of the given task ID. func (mgr *TaskManager) GetTaskExecutorIDsByTaskID(ctx context.Context, taskID int64) ([]string, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, `select distinct(exec_id) from mysql.tidb_background_subtask + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select distinct(exec_id) from mysql.tidb_background_subtask where task_key = %?`, taskID) if err != nil { return nil, err @@ -800,7 +791,7 @@ func (mgr *TaskManager) GetTaskExecutorIDsByTaskID(ctx context.Context, taskID i // GetTaskExecutorIDsByTaskIDAndStep gets the task executor IDs of the given global task ID and step. func (mgr *TaskManager) GetTaskExecutorIDsByTaskIDAndStep(ctx context.Context, taskID int64, step proto.Step) ([]string, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, `select distinct(exec_id) from mysql.tidb_background_subtask + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select distinct(exec_id) from mysql.tidb_background_subtask where task_key = %? and step = %?`, taskID, step) if err != nil { return nil, err @@ -820,7 +811,7 @@ func (mgr *TaskManager) GetTaskExecutorIDsByTaskIDAndStep(ctx context.Context, t // IsTaskExecutorCanceled checks if subtask 'execID' of task 'taskID' has been canceled somehow. func (mgr *TaskManager) IsTaskExecutorCanceled(ctx context.Context, execID string, taskID int64) (bool, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, "select 1 from mysql.tidb_background_subtask where task_key = %? and exec_id = %?", taskID, execID) + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select 1 from mysql.tidb_background_subtask where task_key = %? and exec_id = %?", taskID, execID) if err != nil { return false, err } @@ -873,14 +864,14 @@ func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) err // PauseSubtasks update all running/pending subtasks to pasued state. func (mgr *TaskManager) PauseSubtasks(ctx context.Context, execID string, taskID int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_background_subtask set state = "paused" where task_key = %? and state in ("running", "pending") and exec_id = %?`, taskID, execID) return err } // ResumeSubtasks update all paused subtasks to pending state. func (mgr *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error { - _, err := mgr.executeSQLWithNewSession(ctx, + _, err := mgr.ExecuteSQLWithNewSession(ctx, `update mysql.tidb_background_subtask set state = "pending", error = null where task_key = %? and state = "paused"`, taskID) return err } @@ -1113,7 +1104,7 @@ func serializeErr(err error) []byte { // IsTaskCancelling checks whether the task state is cancelling. func (mgr *TaskManager) IsTaskCancelling(ctx context.Context, taskID int64) (bool, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, "select 1 from mysql.tidb_global_task where id=%? and state = %?", + rs, err := mgr.ExecuteSQLWithNewSession(ctx, "select 1 from mysql.tidb_global_task where id=%? and state = %?", taskID, proto.TaskStateCancelling, ) @@ -1132,7 +1123,7 @@ func (mgr *TaskManager) GetSubtasksForImportInto(ctx context.Context, taskID int ) err = mgr.WithNewTxn(ctx, func(se sessionctx.Context) error { rs, err = sqlexec.ExecSQL(ctx, se, - `select `+subtaskColumns+` from mysql.tidb_background_subtask where task_key = %? and step = %?`, + `select `+SubtaskColumns+` from mysql.tidb_background_subtask where task_key = %? and step = %?`, taskID, step, ) if err != nil { @@ -1142,7 +1133,7 @@ func (mgr *TaskManager) GetSubtasksForImportInto(ctx context.Context, taskID int // To avoid the situation that the subtasks has been `TransferSubTasks2History` // when the user show import jobs, we need to check the history table. rsFromHistory, err := sqlexec.ExecSQL(ctx, se, - `select `+subtaskColumns+` from mysql.tidb_background_subtask_history where task_key = %? and step = %?`, + `select `+SubtaskColumns+` from mysql.tidb_background_subtask_history where task_key = %? and step = %?`, taskID, step, ) if err != nil { @@ -1161,7 +1152,7 @@ func (mgr *TaskManager) GetSubtasksForImportInto(ctx context.Context, taskID int } subtasks := make([]*proto.Subtask, 0, len(rs)) for _, r := range rs { - subtasks = append(subtasks, row2SubTask(r)) + subtasks = append(subtasks, Row2SubTask(r)) } return subtasks, nil } @@ -1188,7 +1179,7 @@ func (mgr *TaskManager) GCSubtasks(ctx context.Context) error { subtaskHistoryKeepSeconds = val } }) - _, err := mgr.executeSQLWithNewSession( + _, err := mgr.ExecuteSQLWithNewSession( ctx, fmt.Sprintf("DELETE FROM mysql.tidb_background_subtask_history WHERE state_update_time < UNIX_TIMESTAMP() - %d ;", subtaskHistoryKeepSeconds), ) diff --git a/pkg/disttask/framework/storage/util.go b/pkg/disttask/framework/storage/util.go deleted file mode 100644 index 8102c6a3a0b0e..0000000000000 --- a/pkg/disttask/framework/storage/util.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "context" - "fmt" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" -) - -// GetSubtasksFromHistoryForTest gets subtasks from history table for test. -func GetSubtasksFromHistoryForTest(ctx context.Context, mgr *TaskManager) (int, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, - "select * from mysql.tidb_background_subtask_history") - if err != nil { - return 0, err - } - return len(rs), nil -} - -// GetSubtasksFromHistoryByTaskIDForTest gets subtasks by taskID from history table for test. -func GetSubtasksFromHistoryByTaskIDForTest(ctx context.Context, mgr *TaskManager, taskID int64) (int, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, - `select `+subtaskColumns+` from mysql.tidb_background_subtask_history where task_key = %?`, taskID) - if err != nil { - return 0, err - } - return len(rs), nil -} - -// GetSubtasksByTaskIDForTest gets subtasks by taskID for test. -func GetSubtasksByTaskIDForTest(ctx context.Context, mgr *TaskManager, taskID int64) ([]*proto.Subtask, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, - `select `+subtaskColumns+` from mysql.tidb_background_subtask where task_key = %?`, taskID) - if err != nil { - return nil, err - } - if len(rs) == 0 { - return nil, nil - } - subtasks := make([]*proto.Subtask, 0, len(rs)) - for _, r := range rs { - subtasks = append(subtasks, row2SubTask(r)) - } - return subtasks, nil -} - -// GetTasksFromHistoryForTest gets tasks from history table for test. -func GetTasksFromHistoryForTest(ctx context.Context, mgr *TaskManager) (int, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, - "select * from mysql.tidb_global_task_history") - if err != nil { - return 0, err - } - return len(rs), nil -} - -// GetTaskEndTimeForTest gets task's endTime for test. -func GetTaskEndTimeForTest(ctx context.Context, mgr *TaskManager, taskID int64) (time.Time, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, - `select end_time - from mysql.tidb_global_task - where id = %?`, taskID) - - if err != nil { - return time.Time{}, nil - } - if !rs[0].IsNull(0) { - return rs[0].GetTime(0).GoTime(time.Local) - } - return time.Time{}, nil -} - -// GetSubtaskEndTimeForTest gets subtask's endTime for test. -func GetSubtaskEndTimeForTest(ctx context.Context, mgr *TaskManager, subtaskID int64) (time.Time, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, - `select end_time - from mysql.tidb_background_subtask - where id = %?`, subtaskID) - - if err != nil { - return time.Time{}, nil - } - if !rs[0].IsNull(0) { - return rs[0].GetTime(0).GoTime(time.Local) - } - return time.Time{}, nil -} - -// GetSubtaskNodesForTest gets subtasks running nodes for one task for test. -func GetSubtaskNodesForTest(ctx context.Context, mgr *TaskManager, taskID int64) ([]string, error) { - rs, err := mgr.executeSQLWithNewSession(ctx, - `select distinct(exec_id) from mysql.tidb_background_subtask where task_key=%?`, taskID) - if err != nil { - return nil, err - } - nodes := make([]string, 0, len(rs)) - for _, r := range rs { - if !r.IsNull(0) { - nodes = append(nodes, r.GetString(0)) - } - } - return nodes, nil -} - -// PrintSubtaskInfo log the subtask info by taskKey for test. -func (mgr *TaskManager) PrintSubtaskInfo(ctx context.Context, taskID int64) { - rs, _ := mgr.executeSQLWithNewSession(ctx, - `select `+subtaskColumns+` from mysql.tidb_background_subtask_history where task_key = %?`, taskID) - rs2, _ := mgr.executeSQLWithNewSession(ctx, - `select `+subtaskColumns+` from mysql.tidb_background_subtask where task_key = %?`, taskID) - rs = append(rs, rs2...) - - for _, r := range rs { - errBytes := r.GetBytes(13) - var err error - if len(errBytes) > 0 { - stdErr := errors.Normalize("") - err1 := stdErr.UnmarshalJSON(errBytes) - if err1 != nil { - err = err1 - } else { - err = stdErr - } - } - logutil.BgLogger().Info(fmt.Sprintf("subTask: %v\n", row2SubTask(r)), zap.Error(err)) - } -} diff --git a/pkg/disttask/framework/testutil/BUILD.bazel b/pkg/disttask/framework/testutil/BUILD.bazel index 4365592eccb78..f60efa966e5ef 100644 --- a/pkg/disttask/framework/testutil/BUILD.bazel +++ b/pkg/disttask/framework/testutil/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/sessionctx", "//pkg/store/mockstore", "//pkg/testkit", + "//pkg/util/logutil", "//pkg/util/sqlexec", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/disttask/framework/testutil/table_util.go b/pkg/disttask/framework/testutil/table_util.go index 2b7444ac842c3..a06fc1692a463 100644 --- a/pkg/disttask/framework/testutil/table_util.go +++ b/pkg/disttask/framework/testutil/table_util.go @@ -16,15 +16,18 @@ package testutil import ( "context" + "fmt" "testing" "time" "github.com/ngaut/pools" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" ) @@ -74,3 +77,121 @@ func getTaskManager(t *testing.T, pool *pools.ResourcePool) *storage.TaskManager require.NoError(t, err) return manager } + +// GetSubtasksFromHistory gets subtasks from history table for test. +func GetSubtasksFromHistory(ctx context.Context, mgr *storage.TaskManager) (int, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, + "select * from mysql.tidb_background_subtask_history") + if err != nil { + return 0, err + } + return len(rs), nil +} + +// GetSubtasksFromHistoryByTaskID gets subtasks by taskID from history table for test. +func GetSubtasksFromHistoryByTaskID(ctx context.Context, mgr *storage.TaskManager, taskID int64) (int, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, + `select `+storage.SubtaskColumns+` from mysql.tidb_background_subtask_history where task_key = %?`, taskID) + if err != nil { + return 0, err + } + return len(rs), nil +} + +// GetSubtasksByTaskID gets subtasks by taskID for test. +func GetSubtasksByTaskID(ctx context.Context, mgr *storage.TaskManager, taskID int64) ([]*proto.Subtask, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, + `select `+storage.SubtaskColumns+` from mysql.tidb_background_subtask where task_key = %?`, taskID) + if err != nil { + return nil, err + } + if len(rs) == 0 { + return nil, nil + } + subtasks := make([]*proto.Subtask, 0, len(rs)) + for _, r := range rs { + subtasks = append(subtasks, storage.Row2SubTask(r)) + } + return subtasks, nil +} + +// GetTasksFromHistory gets tasks from history table for test. +func GetTasksFromHistory(ctx context.Context, mgr *storage.TaskManager) (int, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, + "select * from mysql.tidb_global_task_history") + if err != nil { + return 0, err + } + return len(rs), nil +} + +// GetTaskEndTime gets task's endTime for test. +func GetTaskEndTime(ctx context.Context, mgr *storage.TaskManager, taskID int64) (time.Time, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, + `select end_time + from mysql.tidb_global_task + where id = %?`, taskID) + + if err != nil { + return time.Time{}, nil + } + if !rs[0].IsNull(0) { + return rs[0].GetTime(0).GoTime(time.Local) + } + return time.Time{}, nil +} + +// GetSubtaskEndTime gets subtask's endTime for test. +func GetSubtaskEndTime(ctx context.Context, mgr *storage.TaskManager, subtaskID int64) (time.Time, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, + `select end_time + from mysql.tidb_background_subtask + where id = %?`, subtaskID) + + if err != nil { + return time.Time{}, nil + } + if !rs[0].IsNull(0) { + return rs[0].GetTime(0).GoTime(time.Local) + } + return time.Time{}, nil +} + +// GetSubtaskNodes gets subtasks running nodes for one task for test. +func GetSubtaskNodes(ctx context.Context, mgr *storage.TaskManager, taskID int64) ([]string, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, + `select distinct(exec_id) from mysql.tidb_background_subtask where task_key=%?`, taskID) + if err != nil { + return nil, err + } + nodes := make([]string, 0, len(rs)) + for _, r := range rs { + if !r.IsNull(0) { + nodes = append(nodes, r.GetString(0)) + } + } + return nodes, nil +} + +// UpdateSubtaskExecID updates the subtask's exec_id, used for testing now. +func UpdateSubtaskExecID(ctx context.Context, mgr *storage.TaskManager, tidbID string, subtaskID int64) error { + _, err := mgr.ExecuteSQLWithNewSession(ctx, + `update mysql.tidb_background_subtask + set exec_id = %?, state_update_time = unix_timestamp() + where id = %?`, + tidbID, subtaskID) + return err +} + +// PrintSubtaskInfo log the subtask info by taskKey for test. +func PrintSubtaskInfo(ctx context.Context, mgr *storage.TaskManager, taskID int64) { + rs, _ := mgr.ExecuteSQLWithNewSession(ctx, + `select `+storage.SubtaskColumns+` from mysql.tidb_background_subtask_history where task_key = %?`, taskID) + rs2, _ := mgr.ExecuteSQLWithNewSession(ctx, + `select `+storage.SubtaskColumns+` from mysql.tidb_background_subtask where task_key = %?`, taskID) + rs = append(rs, rs2...) + + for _, r := range rs { + logutil.BgLogger().Info(fmt.Sprintf("subTask: %v\n", storage.Row2SubTask(r))) + } +}