Skip to content

Commit

Permalink
distask: refine dispatcher test (#45821)
Browse files Browse the repository at this point in the history
close #45788
  • Loading branch information
ywqzzy authored Aug 4, 2023
1 parent 3fa6b94 commit 64450c2
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,16 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {

// 3s
cnt := 60
checkGetRunningTaskCnt := func() {
checkGetRunningTaskCnt := func(expected int) {
var retCnt int
for i := 0; i < cnt; i++ {
retCnt = dsp.GetRunningTaskCnt()
if retCnt == taskCnt {
if retCnt == expected {
break
}
time.Sleep(time.Millisecond * 50)
}
require.Equal(t, retCnt, taskCnt)
require.Equal(t, retCnt, expected)
}

checkTaskRunningCnt := func() []*proto.Task {
Expand Down Expand Up @@ -215,7 +215,7 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
taskIDs = append(taskIDs, taskID)
}
// test normal flow
checkGetRunningTaskCnt()
checkGetRunningTaskCnt(taskCnt)
tasks := checkTaskRunningCnt()
for i, taskID := range taskIDs {
require.Equal(t, int64(i+1), tasks[i].ID)
Expand All @@ -227,7 +227,7 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
if taskCnt == 1 {
taskID, err := mgr.AddNewGlobalTask(fmt.Sprintf("%d", taskCnt), taskTypeExample, 0, nil)
require.NoError(t, err)
checkGetRunningTaskCnt()
checkGetRunningTaskCnt(taskCnt)
// Clean the task.
deleteTasks(t, store, taskID)
dsp.DelRunningTask(taskID)
Expand All @@ -254,7 +254,8 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
}
checkGetTaskState(proto.TaskStateSucceed)
require.Len(t, tasks, taskCnt)
require.Equal(t, 0, dsp.GetRunningTaskCnt())

checkGetRunningTaskCnt(0)
return
}

Expand Down Expand Up @@ -286,7 +287,7 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
}
checkGetTaskState(proto.TaskStateReverted)
require.Len(t, tasks, taskCnt)
require.Equal(t, 0, dsp.GetRunningTaskCnt())
checkGetRunningTaskCnt(0)
}

func TestSimpleNormalFlow(t *testing.T) {
Expand Down

0 comments on commit 64450c2

Please sign in to comment.