Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: unify task order naming to 'rank' & refactor failpoint #51136

Merged
merged 6 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_test(
"//pkg/util/backoff",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
Expand Down
6 changes: 1 addition & 5 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
Expand All @@ -35,10 +34,7 @@ import (
)

func TestHandle(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu"))
})
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")

ctx := util.WithInternalSourceType(context.Background(), "handle_test")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

Expand All @@ -45,8 +46,8 @@ func TestFrameworkPauseAndResume(t *testing.T) {

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
// 1. schedule and pause one running task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)")
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()")
task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStatePaused, task1.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask"))
Expand All @@ -64,8 +65,8 @@ func TestFrameworkPauseAndResume(t *testing.T) {
require.Empty(t, errs)

// 2. pause pending task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)")
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()")
task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2", 1)
require.Equal(t, proto.TaskStatePaused, task2.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestRoleBasic(t *testing.T) {
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
Expand All @@ -71,7 +71,7 @@ func TestRoleBasic(t *testing.T) {
// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
time.Sleep(5 * time.Second)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😆", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ package integrationtests
import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

func TestFrameworkRollback(t *testing.T) {
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterRollbackTaskMeta(t, c.MockCtrl, testutil.GetMockRollbackSchedulerExt(c.MockCtrl), c.TestContext)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)")

task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
Expand Down
45 changes: 11 additions & 34 deletions pkg/disttask/framework/integrationtests/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
Expand Down Expand Up @@ -126,10 +125,7 @@ func TestFrameworkCancelTask(t *testing.T) {

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel", "1*return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel", "1*return(1)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}
Expand All @@ -138,21 +134,15 @@ func TestFrameworkSubTaskFailed(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}

func TestFrameworkSubTaskInitEnvFailed(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr", "return()"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr", "return()")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}
Expand All @@ -163,18 +153,13 @@ func TestOwnerChangeWhenSchedule(t *testing.T) {
scheduler.MockOwnerChange = func() {
c.AsyncChangeOwner()
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange", "1*return(true)"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange", "1*return(true)")
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange"))
}

func TestGC(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)")
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)")
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
Expand Down Expand Up @@ -208,10 +193,7 @@ func TestFrameworkSubtaskFinishedCancel(t *testing.T) {
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}
Expand All @@ -220,10 +202,9 @@ func TestFrameworkRunSubtaskCancel(t *testing.T) {
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel", "1*return(true)"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel"))
}

func TestFrameworkCleanUpRoutine(t *testing.T) {
Expand All @@ -234,7 +215,7 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
scheduler.DefaultCleanUpInterval = 500 * time.Millisecond
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()")

// normal
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
Expand All @@ -249,7 +230,7 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
require.NotEmpty(t, subtasks)

// transfer err
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()")
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key2", c.TestContext)
<-scheduler.WaitCleanUpFinished
mgr, err = storage.GetTaskManager()
Expand All @@ -260,17 +241,13 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
subtasks, err = testutil.GetSubtasksFromHistory(c.Ctx, mgr)
require.NoError(t, err)
require.NotEmpty(t, subtasks)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished"))
}

func TestTaskCancelledBeforeUpdateTask(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask"))
}
6 changes: 4 additions & 2 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ const (
var MaxConcurrentTask = 4

// Task represents the task of distributed framework.
// tasks are run in the order of: priority asc, create_time asc, id asc.
// tasks are run in the order of rank, and the rank is defined by:
//
// priority asc, create_time asc, id asc.
type Task struct {
ID int64
Key string
Expand Down Expand Up @@ -117,7 +119,7 @@ var (
)

// Compare compares two tasks by task order.
// returns < 0 represents priority of t is higher than other.
// returns < 0 represents rank of t is higher than 'other'.
func (t *Task) Compare(other *Task) int {
if t.Priority != other.Priority {
return t.Priority - other.Priority
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// TaskManager defines the interface to access task table.
type TaskManager interface {
// GetTopUnfinishedTasks returns unfinished tasks, limited by MaxConcurrentTask*2,
// to make sure lower priority tasks can be scheduled if resource is enough.
// to make sure lower rank tasks can be scheduled if resource is enough.
// The returned tasks are sorted by task order, see proto.Task, and only contains
// some fields, see row2TaskBasic.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error)
Expand Down
10 changes: 2 additions & 8 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ import (
)

const (
// DefaultSubtaskConcurrency is the default concurrency for handling subtask.
DefaultSubtaskConcurrency = 16
// MaxSubtaskConcurrency is the maximum concurrency for handling subtask.
MaxSubtaskConcurrency = 256
// defaultBalanceSubtaskTicks is the tick interval of fetching all server infos from etcs.
defaultBalanceSubtaskTicks = 2
// for a cancelled task, it's terminal state is reverted or reverted_failed,
// so we use a special error message to indicate that the task is cancelled
// by user.
Expand All @@ -55,7 +49,6 @@ var (
// CheckTaskFinishedInterval is the interval for scheduler.
// exported for testing.
CheckTaskFinishedInterval = 500 * time.Millisecond
nonRetrySQLTime = 1
// RetrySQLTimes is the max retry times when executing SQL.
RetrySQLTimes = 30
// RetrySQLInterval is the initial interval between two SQL retries.
Expand Down Expand Up @@ -431,6 +424,8 @@ func (s *BaseScheduler) scheduleSubTask(
// the scheduled node of the subtask might not be optimal, as we run all
// scheduler in parallel, and update might be called too many times when
// multiple tasks are switching to next step.
// balancer will assign the subtasks to the right instance according to
// the system load of all nodes.
if err := s.slotMgr.update(s.ctx, s.nodeMgr, s.taskMgr); err != nil {
return err
}
Expand All @@ -439,7 +434,6 @@ func (s *BaseScheduler) scheduleSubTask(
subTasks := make([]*proto.Subtask, 0, len(metas))
for i, meta := range metas {
// we assign the subtask to the instance in a round-robin way.
// TODO: assign the subtask to the instance according to the system load of each nodes
pos := i % len(adjustedEligibleNodes)
instanceID := adjustedEligibleNodes[pos]
s.logger.Debug("create subtasks", zap.String("instanceID", instanceID))
Expand Down
5 changes: 1 addition & 4 deletions pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Mana

// Start the schedulerManager, start the scheduleTaskLoop to start multiple schedulers.
func (sm *Manager) Start() {
failpoint.Inject("disableSchedulerManager", func() {
failpoint.Return()
})
// init cached managed nodes
sm.nodeMgr.refreshManagedNodes(sm.ctx, sm.taskMgr, sm.slotMgr)

Expand Down Expand Up @@ -257,7 +254,7 @@ func (sm *Manager) scheduleTaskLoop() {
}
reservedExecID, ok := sm.slotMgr.canReserve(task)
if !ok {
// task of lower priority might be able to be scheduled.
// task of lower rank might be able to be scheduled.
continue
}
metrics.DistTaskGauge.WithLabelValues(task.Type.String(), metrics.SchedulingStatus).Inc()
Expand Down
11 changes: 2 additions & 9 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
Expand Down Expand Up @@ -175,10 +174,7 @@ func TestTaskFailInManager(t *testing.T) {
}

func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, isPauseAndResume bool) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)")
// test DispatchTaskLoop
// test parallelism control
var originalConcurrency int
Expand Down Expand Up @@ -423,10 +419,7 @@ func TestIsCancelledErr(t *testing.T) {

func TestManagerScheduleLoop(t *testing.T) {
// Mock 16 cpu node.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu"))
})
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)")
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockScheduler := mock.NewMockScheduler(ctrl)
Expand Down
10 changes: 5 additions & 5 deletions pkg/disttask/framework/scheduler/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type SlotManager struct {
reservedSlots map[string]int
// represents the number of slots taken by task on each node
// on some cases it might be larger than capacity:
// current step of higher priority task A has little subtasks, so we start
// to schedule lower priority task, but next step of A has many subtasks.
// current step of higher rank task A has little subtasks, so we start
// to schedule lower rank task, but next step of A has many subtasks.
// once initialized, the length of usedSlots should be equal to number of nodes
// managed by dist framework.
usedSlots atomic.Pointer[map[string]int]
Expand Down Expand Up @@ -121,14 +121,14 @@ func (sm *SlotManager) canReserve(task *proto.Task) (execID string, ok bool) {
return "", false
}

reservedForHigherPriority := 0
reservedForHigherRank := 0
for _, s := range sm.reservedStripes {
if s.task.Compare(task) >= 0 {
break
}
reservedForHigherPriority += s.stripes
reservedForHigherRank += s.stripes
}
if task.Concurrency+reservedForHigherPriority <= capacity {
if task.Concurrency+reservedForHigherRank <= capacity {
return "", true
}

Expand Down
Loading