Skip to content

Commit

Permalink
disttask: unify task order naming to 'rank' & refactor failpoint (#51136
Browse files Browse the repository at this point in the history
)

ref #49008
  • Loading branch information
D3Hunter authored Feb 21, 2024
1 parent 38abd86 commit e34702f
Show file tree
Hide file tree
Showing 35 changed files with 92 additions and 210 deletions.
1 change: 0 additions & 1 deletion pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_test(
"//pkg/util/backoff",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
Expand Down
6 changes: 1 addition & 5 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
Expand All @@ -35,10 +34,7 @@ import (
)

func TestHandle(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu"))
})
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")

ctx := util.WithInternalSourceType(context.Background(), "handle_test")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

Expand All @@ -45,8 +46,8 @@ func TestFrameworkPauseAndResume(t *testing.T) {

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
// 1. schedule and pause one running task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)")
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()")
task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStatePaused, task1.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask"))
Expand All @@ -64,8 +65,8 @@ func TestFrameworkPauseAndResume(t *testing.T) {
require.Empty(t, errs)

// 2. pause pending task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)")
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()")
task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2", 1)
require.Equal(t, proto.TaskStatePaused, task2.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestRoleBasic(t *testing.T) {
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
Expand All @@ -71,7 +71,7 @@ func TestRoleBasic(t *testing.T) {
// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
time.Sleep(5 * time.Second)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😆", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ package integrationtests
import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

func TestFrameworkRollback(t *testing.T) {
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterRollbackTaskMeta(t, c.MockCtrl, testutil.GetMockRollbackSchedulerExt(c.MockCtrl), c.TestContext)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)")

task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
Expand Down
45 changes: 11 additions & 34 deletions pkg/disttask/framework/integrationtests/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
Expand Down Expand Up @@ -126,10 +125,7 @@ func TestFrameworkCancelTask(t *testing.T) {

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel", "1*return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel", "1*return(1)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}
Expand All @@ -138,21 +134,15 @@ func TestFrameworkSubTaskFailed(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}

func TestFrameworkSubTaskInitEnvFailed(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr", "return()"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr", "return()")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}
Expand All @@ -163,18 +153,13 @@ func TestOwnerChangeWhenSchedule(t *testing.T) {
scheduler.MockOwnerChange = func() {
c.AsyncChangeOwner()
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange", "1*return(true)"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange", "1*return(true)")
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange"))
}

func TestGC(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)")
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)")
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
Expand Down Expand Up @@ -208,10 +193,7 @@ func TestFrameworkSubtaskFinishedCancel(t *testing.T) {
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}
Expand All @@ -220,10 +202,9 @@ func TestFrameworkRunSubtaskCancel(t *testing.T) {
c := testutil.NewTestDXFContext(t, 3)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel", "1*return(true)"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel"))
}

func TestFrameworkCleanUpRoutine(t *testing.T) {
Expand All @@ -234,7 +215,7 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
scheduler.DefaultCleanUpInterval = 500 * time.Millisecond
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()")

// normal
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
Expand All @@ -249,7 +230,7 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
require.NotEmpty(t, subtasks)

// transfer err
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()")
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key2", c.TestContext)
<-scheduler.WaitCleanUpFinished
mgr, err = storage.GetTaskManager()
Expand All @@ -260,17 +241,13 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
subtasks, err = testutil.GetSubtasksFromHistory(c.Ctx, mgr)
require.NoError(t, err)
require.NotEmpty(t, subtasks)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished"))
}

func TestTaskCancelledBeforeUpdateTask(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1)

testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)"))
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask"))
}
6 changes: 4 additions & 2 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ const (
var MaxConcurrentTask = 4

// Task represents the task of distributed framework.
// tasks are run in the order of: priority asc, create_time asc, id asc.
// tasks are run in the order of rank, and the rank is defined by:
//
// priority asc, create_time asc, id asc.
type Task struct {
ID int64
Key string
Expand Down Expand Up @@ -117,7 +119,7 @@ var (
)

// Compare compares two tasks by task order.
// returns < 0 represents priority of t is higher than other.
// returns < 0 represents rank of t is higher than 'other'.
func (t *Task) Compare(other *Task) int {
if t.Priority != other.Priority {
return t.Priority - other.Priority
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// TaskManager defines the interface to access task table.
type TaskManager interface {
// GetTopUnfinishedTasks returns unfinished tasks, limited by MaxConcurrentTask*2,
// to make sure lower priority tasks can be scheduled if resource is enough.
// to make sure lower rank tasks can be scheduled if resource is enough.
// The returned tasks are sorted by task order, see proto.Task, and only contains
// some fields, see row2TaskBasic.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error)
Expand Down
10 changes: 2 additions & 8 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ import (
)

const (
// DefaultSubtaskConcurrency is the default concurrency for handling subtask.
DefaultSubtaskConcurrency = 16
// MaxSubtaskConcurrency is the maximum concurrency for handling subtask.
MaxSubtaskConcurrency = 256
// defaultBalanceSubtaskTicks is the tick interval of fetching all server infos from etcs.
defaultBalanceSubtaskTicks = 2
// for a cancelled task, it's terminal state is reverted or reverted_failed,
// so we use a special error message to indicate that the task is cancelled
// by user.
Expand All @@ -55,7 +49,6 @@ var (
// CheckTaskFinishedInterval is the interval for scheduler.
// exported for testing.
CheckTaskFinishedInterval = 500 * time.Millisecond
nonRetrySQLTime = 1
// RetrySQLTimes is the max retry times when executing SQL.
RetrySQLTimes = 30
// RetrySQLInterval is the initial interval between two SQL retries.
Expand Down Expand Up @@ -431,6 +424,8 @@ func (s *BaseScheduler) scheduleSubTask(
// the scheduled node of the subtask might not be optimal, as we run all
// scheduler in parallel, and update might be called too many times when
// multiple tasks are switching to next step.
// balancer will assign the subtasks to the right instance according to
// the system load of all nodes.
if err := s.slotMgr.update(s.ctx, s.nodeMgr, s.taskMgr); err != nil {
return err
}
Expand All @@ -439,7 +434,6 @@ func (s *BaseScheduler) scheduleSubTask(
subTasks := make([]*proto.Subtask, 0, len(metas))
for i, meta := range metas {
// we assign the subtask to the instance in a round-robin way.
// TODO: assign the subtask to the instance according to the system load of each nodes
pos := i % len(adjustedEligibleNodes)
instanceID := adjustedEligibleNodes[pos]
s.logger.Debug("create subtasks", zap.String("instanceID", instanceID))
Expand Down
5 changes: 1 addition & 4 deletions pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Mana

// Start the schedulerManager, start the scheduleTaskLoop to start multiple schedulers.
func (sm *Manager) Start() {
failpoint.Inject("disableSchedulerManager", func() {
failpoint.Return()
})
// init cached managed nodes
sm.nodeMgr.refreshManagedNodes(sm.ctx, sm.taskMgr, sm.slotMgr)

Expand Down Expand Up @@ -257,7 +254,7 @@ func (sm *Manager) scheduleTaskLoop() {
}
reservedExecID, ok := sm.slotMgr.canReserve(task)
if !ok {
// task of lower priority might be able to be scheduled.
// task of lower rank might be able to be scheduled.
continue
}
metrics.DistTaskGauge.WithLabelValues(task.Type.String(), metrics.SchedulingStatus).Inc()
Expand Down
11 changes: 2 additions & 9 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
Expand Down Expand Up @@ -175,10 +174,7 @@ func TestTaskFailInManager(t *testing.T) {
}

func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, isPauseAndResume bool) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask"))
}()
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)")
// test DispatchTaskLoop
// test parallelism control
var originalConcurrency int
Expand Down Expand Up @@ -423,10 +419,7 @@ func TestIsCancelledErr(t *testing.T) {

func TestManagerScheduleLoop(t *testing.T) {
// Mock 16 cpu node.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu"))
})
testkit.EnableFailPoint(t, "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)")
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockScheduler := mock.NewMockScheduler(ctrl)
Expand Down
10 changes: 5 additions & 5 deletions pkg/disttask/framework/scheduler/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type SlotManager struct {
reservedSlots map[string]int
// represents the number of slots taken by task on each node
// on some cases it might be larger than capacity:
// current step of higher priority task A has little subtasks, so we start
// to schedule lower priority task, but next step of A has many subtasks.
// current step of higher rank task A has little subtasks, so we start
// to schedule lower rank task, but next step of A has many subtasks.
// once initialized, the length of usedSlots should be equal to number of nodes
// managed by dist framework.
usedSlots atomic.Pointer[map[string]int]
Expand Down Expand Up @@ -121,14 +121,14 @@ func (sm *SlotManager) canReserve(task *proto.Task) (execID string, ok bool) {
return "", false
}

reservedForHigherPriority := 0
reservedForHigherRank := 0
for _, s := range sm.reservedStripes {
if s.task.Compare(task) >= 0 {
break
}
reservedForHigherPriority += s.stripes
reservedForHigherRank += s.stripes
}
if task.Concurrency+reservedForHigherPriority <= capacity {
if task.Concurrency+reservedForHigherRank <= capacity {
return "", true
}

Expand Down
Loading

0 comments on commit e34702f

Please sign in to comment.