-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: Refactor taskExecutor slotManager to handle task occupation #49713
disttask: Refactor taskExecutor slotManager to handle task occupation #49713
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #49713 +/- ##
================================================
+ Coverage 70.9732% 71.1871% +0.2138%
================================================
Files 1368 1430 +62
Lines 398095 427368 +29273
================================================
+ Hits 282541 304231 +21690
- Misses 95813 104068 +8255
+ Partials 19741 19069 -672
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@@ -218,7 +218,9 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) { | |||
} | |||
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID)) | |||
|
|||
if !m.slotManager.canAlloc(task) { | |||
canAlloc, tasksNeedFree := m.slotManager.canAlloc(task) | |||
m.onCanceledTasks(context.Background(), tasksNeedFree) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed in group, this method is used to make the task as cancelled, not cancel the task-executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, we only need to cancel the tasks that has lower order and occupies the slots we need to run current task, not cancel all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancel(nil) can cancel the running task without modifying the task state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 202ea21
// priority will be used in future | ||
priority int | ||
slotCount int | ||
taskWaitAlloc *proto.Task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slot manager shouldn't manage which task is waiting, and not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean manage it in tastexecutor.Manager
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean manage it intastexecutor.Manager
?
No need to manage task since we fetch runnableTask in loop
}) | ||
|
||
usedSlots := 0 | ||
for _, slotInfo := range allSlotInfos { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should check in backward to free tasks with lowest rank
return false, nil | ||
} | ||
|
||
allSlotInfos := make([]*proto.Task, 0, len(sm.executorSlotInfos)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can maintains this slice during alloc/free
usedSlots += slotInfo.Concurrency | ||
if sm.available+usedSlots >= task.Concurrency { | ||
sm.taskWaitAlloc = task | ||
return false, tasksNeedFree |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should return true, tasksNeedFree
, to make caller know that it can alloc after freeing slots taken by those tasks
Co-authored-by: D3Hunter <[email protected]>
/test all |
1 similar comment
/test all |
/retest |
|
||
// 4. task1 is released, task3 alloc success, start to run | ||
ch <- context.Canceled | ||
time.Sleep(time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't rely on time.sleep to wait free()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time is unreliable, still have chance of flaky test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time is unreliable, still have chance of flaky test.
For what you've said, I think it needs to be analyzed on a case-by-case basis. There's obviously a waiting for checkTime
, and I don't see any problem with waiting(sleep). I think what I can do is that refactor it to require.Eventually
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break | ||
} | ||
|
||
if !canAlloc { | ||
logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be debug level or remove it, to avoid print too much
m.logErr(err) | ||
return | ||
} else if !exist { | ||
continue | ||
} | ||
switch task.State { | ||
case proto.TaskStateRunning: | ||
if taskCtx.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems no need to print this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer to keep it. We can know some subtasks have been canceled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's already log at line 386
we also log in cancelTaskExecutors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated log level to debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just remove it
for _, task := range tasks { | ||
logutil.Logger(m.logCtx).Info("cancelTasks", zap.Any("task_id", task.ID)) | ||
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil { | ||
// Pause all running subtasks, don't mark subtasks as canceled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change the comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Pause all running subtasks, don't mark subtasks as canceled. | |
// Pause given subtasks temporarily, don't mark subtasks as canceled. |
like this?
// Pause all running subtasks, don't mark subtasks as canceled. | ||
// Should not change the subtask's state. | ||
cancel(nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not m.slotManager.free(t.ID) here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
Co-authored-by: D3Hunter <[email protected]>
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: D3Hunter, ywqzzy The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/retest |
What problem does this PR solve?
Issue Number: ref #49008
Problem Summary: we need free slots(resource) when higher task comes.
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.