-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: task executor slot manager #49136
disttask: task executor slot manager #49136
Conversation
…k-rc/scheduler-slot-manager
…k-rc/scheduler-slot-manager
Skipping CI for Draft Pull Request. |
Skipping CI for Draft Pull Request. |
…k-rc/scheduler-slot-manager
/ok-to-test |
@@ -179,7 +185,7 @@ func TestManager(t *testing.T) { | |||
mockTaskTable := mock.NewMockTaskTable(ctrl) | |||
mockInternalExecutor := mock.NewMockTaskExecutor(ctrl) | |||
mockPool := mock.NewMockPool(ctrl) | |||
b := NewManagerBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add manager test with slot_manager
pkg/ddl/constant.go
Outdated
@@ -79,6 +80,7 @@ const ( | |||
exec_id varchar(256), | |||
exec_expired timestamp, | |||
state varchar(64) not null, | |||
priority int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upgrade
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is useless, I will remove it
priorityQueue := make(proto.TaskPriorityQueue, 0) | ||
heap.Init(&priorityQueue) | ||
for _, task := range tasks { | ||
heap.Push(&priorityQueue, proto.WrapPriorityQueue(task)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not considering handling tasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has been filtered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has been filtered
But we will take running tasks into consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has been filtered
But we should take running tasks into consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean after TiDB restarts?
|
||
type slotInfo struct { | ||
taskID int | ||
priority int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is reserved, I can add a comment for it.
@@ -45,6 +47,7 @@ var ( | |||
// ManagerBuilder is used to build a Manager. | |||
type ManagerBuilder struct { | |||
newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) | |||
// slotAlloctor alloctor.Alloctor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What for?
func NewTestManagerBuilder(ctrl *gomock.Controller) *ManagerBuilder { | ||
b := NewManagerBuilder() | ||
return b | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this function?
type MockAlloctor struct { | ||
ctrl *gomock.Controller | ||
recorder *MockAlloctorMockRecorder | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not used for now
pkg/resourcemanager/BUILD.bazel
Outdated
@@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | |||
go_library( | |||
name = "resourcemanager", | |||
srcs = [ | |||
"rm.go", | |||
"resource_manager.go", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resource_manager.go is more meaningful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resource_manager.go is more meaningful
Not the code of disttask
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package taskexecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slot_manager.go. ----> slots.go
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slot_manager_test.go ----> slots_test.go
This commit refactors the slot management in the task executor module. It introduces a new `slotManager` struct that handles the reservation and release of slots for executing tasks. The `slotManager` keeps track of the available slots and the tasks currently occupying the slots. The changes include renaming some files and functions for better clarity. The `slot_manager.go` file is renamed to `slot.go`, and the `slot_manager_test.go` file is renamed to `slot_test.go`. The `slotManager` struct now has a `reserve` method to reserve slots for a task and an `unReserve` method to release the slots when the task is completed. These changes improve the readability and maintainability of the code, making it easier to understand and modify the slot management logic in the task executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
Co-authored-by: EasonBall <[email protected]>
/retest |
|
||
// canReserve is used to check whether the instance has enough slots to run the task. | ||
func (sm *slotManager) canReserve(task *proto.Task) bool { | ||
return sm.available >= task.Concurrency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sm.RLock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const ( | ||
managerID = "test" | ||
taskID = int64(1) | ||
taskID2 = int64(2) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those simple var can just duplicate in test where it's used
defer func() { | ||
failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick") | ||
}() | ||
go m.fetchAndHandleRunnableTasksLoop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can call onRunnableTasks
directly to simplify mock, and we will not need failpoint anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not use failpoint, how can we check the status of slotManager
?🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can call onRunnableTasks
with 1 task and check in this case
|
||
failpoint.Inject("taskTick", func() { | ||
<-onRunnableTaskTick | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed, just let executor ext block on Run
using mock
type slotInfo struct { | ||
taskID int | ||
// priority will be used in future | ||
priority int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better store task itself, priority itself cannot determine task order. ok to do it later
…k-rc/scheduler-slot-manager
/remove-label needs-rebase |
@wuhuizuo: The label(s) In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
continue | ||
} | ||
m.addHandlingTask(task.ID) | ||
m.slotManager.alloc(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's moved out, we have to free on error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@okJiang: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: D3Hunter, ywqzzy The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What problem does this PR solve?
Issue Number: close #49135
Problem Summary:
In distributed framework, we introduced slot manager to control resource.
What changed and how does it work?
in scheduler(task executor)
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.