Skip to content

Commit

Permalink
disttask: storage.util move to testutil (#50252)
Browse files Browse the repository at this point in the history
ref #48795
  • Loading branch information
ywqzzy authored Jan 11, 2024
1 parent 2fdc3a3 commit 986b008
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 222 deletions.
10 changes: 6 additions & 4 deletions pkg/disttask/framework/framework_pause_and_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state proto.SubtaskState, expectedCnt int64) {
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
mgr.PrintSubtaskInfo(ctx, taskID)
cntByStates, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepTwo)
testutil.PrintSubtaskInfo(ctx, mgr, taskID)
cntByStatesStepOne, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepOne)
require.NoError(t, err)
historySubTasksCnt, err := storage.GetSubtasksFromHistoryByTaskIDForTest(ctx, mgr, taskID)
cntByStatesStepTwo, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepTwo)
require.NoError(t, err)
require.Equal(t, expectedCnt, cntByStates[state]+int64(historySubTasksCnt))
historySubTasksCnt, err := testutil.GetSubtasksFromHistoryByTaskID(ctx, mgr, taskID)
require.NoError(t, err)
require.Equal(t, expectedCnt, cntByStatesStepOne[state]+cntByStatesStepTwo[state]+int64(historySubTasksCnt))
}

func TestFrameworkPauseAndResume(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/framework_role_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func checkSubtaskOnNodes(ctx context.Context, t *testing.T, taskID int64, expectedNodes []string) {
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
nodes, err := storage.GetSubtaskNodesForTest(ctx, mgr, taskID)
nodes, err := testutil.GetSubtaskNodes(ctx, mgr, taskID)
require.NoError(t, err)
slices.Sort(nodes)
slices.Sort(expectedNodes)
Expand Down
4 changes: 2 additions & 2 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestGC(t *testing.T) {

var historySubTasksCnt int
require.Eventually(t, func() bool {
historySubTasksCnt, err = storage.GetSubtasksFromHistoryForTest(ctx, mgr)
historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, mgr)
if err != nil {
return false
}
Expand All @@ -261,7 +261,7 @@ func TestGC(t *testing.T) {
scheduler.WaitTaskFinished <- struct{}{}

require.Eventually(t, func() bool {
historySubTasksCnt, err := storage.GetSubtasksFromHistoryForTest(ctx, mgr)
historySubTasksCnt, err := testutil.GetSubtasksFromHistory(ctx, mgr)
if err != nil {
return false
}
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go_library(
srcs = [
"task_state.go",
"task_table.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/storage",
visibility = ["//visibility:public"],
Expand Down
22 changes: 11 additions & 11 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestTaskTable(t *testing.T) {
require.NoError(t, err)
require.Equal(t, proto.TaskStateFailed, task.State)
require.ErrorContains(t, task.Error, "test error")
endTime, err := storage.GetTaskEndTimeForTest(ctx, gm, id)
endTime, err := testutil.GetTaskEndTime(ctx, gm, id)
require.NoError(t, err)
require.LessOrEqual(t, endTime.Sub(curTime), time.Since(curTime))
require.GreaterOrEqual(t, endTime, curTime)
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestSubTaskTable(t *testing.T) {
require.Greater(t, subtask.StartTime, ts)
require.Greater(t, subtask.UpdateTime, ts)

endTime, err := storage.GetSubtaskEndTimeForTest(ctx, sm, subtask.ID)
endTime, err := testutil.GetSubtaskEndTime(ctx, sm, subtask.ID)
require.NoError(t, err)
require.Greater(t, endTime, ts)

Expand All @@ -641,7 +641,7 @@ func TestSubTaskTable(t *testing.T) {
require.NoError(t, err)
require.Equal(t, subtask2.StartTime, subtask.StartTime)
require.Greater(t, subtask2.UpdateTime, subtask.UpdateTime)
endTime, err = storage.GetSubtaskEndTimeForTest(ctx, sm, subtask.ID)
endTime, err = testutil.GetSubtaskEndTime(ctx, sm, subtask.ID)
require.NoError(t, err)
require.Greater(t, endTime, ts)

Expand Down Expand Up @@ -962,10 +962,10 @@ func TestSubtaskHistoryTable(t *testing.T) {
testutil.CreateSubTask(t, sm, taskID, proto.StepInit, tidb3, []byte(meta), proto.TaskTypeExample, 11, false)
require.NoError(t, sm.UpdateSubtaskStateAndError(ctx, tidb3, subTask3, proto.SubtaskStateFailed, nil))

subTasks, err := storage.GetSubtasksByTaskIDForTest(ctx, sm, taskID)
subTasks, err := testutil.GetSubtasksByTaskID(ctx, sm, taskID)
require.NoError(t, err)
require.Len(t, subTasks, 3)
historySubTasksCnt, err := storage.GetSubtasksFromHistoryForTest(ctx, sm)
historySubTasksCnt, err := testutil.GetSubtasksFromHistory(ctx, sm)
require.NoError(t, err)
require.Equal(t, 0, historySubTasksCnt)
subTasks, err = sm.GetSubtasksForImportInto(ctx, taskID, proto.StepInit)
Expand All @@ -975,10 +975,10 @@ func TestSubtaskHistoryTable(t *testing.T) {
// test TransferSubTasks2History
require.NoError(t, sm.TransferSubTasks2History(ctx, taskID))

subTasks, err = storage.GetSubtasksByTaskIDForTest(ctx, sm, taskID)
subTasks, err = testutil.GetSubtasksByTaskID(ctx, sm, taskID)
require.NoError(t, err)
require.Len(t, subTasks, 0)
historySubTasksCnt, err = storage.GetSubtasksFromHistoryForTest(ctx, sm)
historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, sm)
require.NoError(t, err)
require.Equal(t, 3, historySubTasksCnt)
subTasks, err = sm.GetSubtasksForImportInto(ctx, taskID, proto.StepInit)
Expand All @@ -998,7 +998,7 @@ func TestSubtaskHistoryTable(t *testing.T) {

require.NoError(t, sm.GCSubtasks(ctx))

historySubTasksCnt, err = storage.GetSubtasksFromHistoryForTest(ctx, sm)
historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, sm)
require.NoError(t, err)
require.Equal(t, 1, historySubTasksCnt)
}
Expand All @@ -1021,7 +1021,7 @@ func TestTaskHistoryTable(t *testing.T) {
tasks, err = gm.GetTasksInStates(ctx, proto.TaskStatePending)
require.NoError(t, err)
require.Equal(t, 0, len(tasks))
num, err := storage.GetTasksFromHistoryForTest(ctx, gm)
num, err := testutil.GetTasksFromHistory(ctx, gm)
require.NoError(t, err)
require.Equal(t, 2, num)

Expand All @@ -1041,7 +1041,7 @@ func TestTaskHistoryTable(t *testing.T) {
require.Equal(t, 1, len(tasks))
tasks[0].Error = errors.New("mock err")
require.NoError(t, gm.TransferTasks2History(ctx, tasks))
num, err = storage.GetTasksFromHistoryForTest(ctx, gm)
num, err = testutil.GetTasksFromHistory(ctx, gm)
require.NoError(t, err)
require.Equal(t, 3, num)
}
Expand Down Expand Up @@ -1094,7 +1094,7 @@ func TestCancelAndExecIdChanged(t *testing.T) {

// 2. change the exec_id
// exec_id changed
require.NoError(t, sm.UpdateSubtaskExecID(ctx, "tidb2", subtask.ID))
require.NoError(t, testutil.UpdateSubtaskExecID(ctx, sm, "tidb2", subtask.ID))
// exec_id in memory unchanged, call UpdateSubtaskStateAndError.
require.NoError(t, sm.UpdateSubtaskStateAndError(ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateFailed, nil))
subtask, err = sm.GetFirstSubtaskInStates(ctx, "tidb2", 1, proto.StepInit, proto.SubtaskStatePending)
Expand Down
8 changes: 4 additions & 4 deletions pkg/disttask/framework/storage/task_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

// CancelTask cancels task.
func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error {
_, err := mgr.executeSQLWithNewSession(ctx,
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP()
Expand All @@ -47,7 +47,7 @@ func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Co

// FailTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error {
_, err := mgr.executeSQLWithNewSession(ctx,
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
error = %?,
Expand All @@ -61,7 +61,7 @@ func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState

// RevertedTask implements the scheduler.TaskManager interface.
func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error {
_, err := mgr.executeSQLWithNewSession(ctx,
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP(),
Expand Down Expand Up @@ -99,7 +99,7 @@ func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, er

// PausedTask update the task state from pausing to paused.
func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error {
_, err := mgr.executeSQLWithNewSession(ctx,
_, err := mgr.ExecuteSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP(),
Expand Down
Loading

0 comments on commit 986b008

Please sign in to comment.