Skip to content

Commit

Permalink
change
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Jan 23, 2024
1 parent 9f5d216 commit ab62921
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 158 deletions.
3 changes: 1 addition & 2 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
ctx = util.WithInternalSourceType(ctx, "handle")
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)
schManager, err := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port")
require.NoError(t, err)
schManager := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port")

tk.MustExec("use test")
tk.MustExec("create table t1(id bigint auto_random primary key)")
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ go_library(
"//pkg/domain/infosync",
"//pkg/kv",
"//pkg/metrics",
"//pkg/resourcemanager/pool/spool",
"//pkg/resourcemanager/util",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/backoff",
Expand Down
3 changes: 1 addition & 2 deletions pkg/disttask/framework/scheduler/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,7 @@ func TestBalanceMultipleTasks(t *testing.T) {
}
ctx := context.Background()

manager, err := NewManager(ctx, mockTaskMgr, "1")
require.NoError(t, err)
manager := NewManager(ctx, mockTaskMgr, "1")
manager.slotMgr.updateCapacity(16)
manager.nodeMgr.managedNodes.Store(&[]string{"tidb1", "tidb2", "tidb3"})
b := newBalancer(Param{
Expand Down
17 changes: 5 additions & 12 deletions pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/syncutil"
Expand Down Expand Up @@ -103,7 +101,7 @@ type Manager struct {
cancel context.CancelFunc
taskMgr TaskManager
wg tidbutil.WaitGroupWrapper
gPool *spool.Pool
schedulerWG tidbutil.WaitGroupWrapper
slotMgr *SlotManager
nodeMgr *NodeManager
balancer *balancer
Expand All @@ -122,18 +120,13 @@ type Manager struct {
}

// NewManager creates a scheduler struct.
func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) (*Manager, error) {
func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Manager {
schedulerManager := &Manager{
taskMgr: taskMgr,
serverID: serverID,
slotMgr: newSlotManager(),
nodeMgr: newNodeManager(),
}
gPool, err := spool.NewPool("scheduler_pool", int32(proto.MaxConcurrentTask), util.DistTask, spool.WithBlocking(true))
if err != nil {
return nil, err
}
schedulerManager.gPool = gPool
schedulerManager.ctx, schedulerManager.cancel = context.WithCancel(ctx)
schedulerManager.mu.schedulerMap = make(map[int64]Scheduler)
schedulerManager.finishCh = make(chan struct{}, proto.MaxConcurrentTask)
Expand All @@ -143,7 +136,7 @@ func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) (*Man
slotMgr: schedulerManager.slotMgr,
})

return schedulerManager, nil
return schedulerManager
}

// Start the schedulerManager, start the scheduleTaskLoop to start multiple schedulers.
Expand Down Expand Up @@ -172,7 +165,7 @@ func (sm *Manager) Start() {
// Stop the schedulerManager.
func (sm *Manager) Stop() {
sm.cancel()
sm.gPool.ReleaseAndWait()
sm.schedulerWG.Wait()
sm.wg.Wait()
sm.clearSchedulers()
sm.initialized = false
Expand Down Expand Up @@ -310,7 +303,7 @@ func (sm *Manager) startScheduler(basicTask *proto.Task, reservedExecID string)
sm.addScheduler(task.ID, scheduler)
sm.slotMgr.reserve(basicTask, reservedExecID)
// Using the pool with block, so it wouldn't return an error.
_ = sm.gPool.Run(func() {
sm.schedulerWG.Go(func() {
defer func() {
scheduler.Close()
sm.delScheduler(task.ID)
Expand Down
6 changes: 2 additions & 4 deletions pkg/disttask/framework/scheduler/scheduler_nokit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func TestDispatcherOnNextStage(t *testing.T) {
func TestManagerSchedulersOrdered(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mgr, err := NewManager(context.Background(), nil, "1")
require.NoError(t, err)
mgr := NewManager(context.Background(), nil, "1")
for i := 1; i <= 5; i++ {
task := &proto.Task{
ID: int64(i * 10),
Expand Down Expand Up @@ -227,8 +226,7 @@ func TestSchedulerCleanupTask(t *testing.T) {
defer ctrl.Finish()
taskMgr := mock.NewMockTaskManager(ctrl)
ctx := context.Background()
mgr, err := NewManager(ctx, taskMgr, "1")
require.NoError(t, err)
mgr := NewManager(ctx, taskMgr, "1")

// normal
tasks := []*proto.Task{
Expand Down
3 changes: 1 addition & 2 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func MockSchedulerManager(t *testing.T, ctrl *gomock.Controller, pool *pools.Res
ctx := context.WithValue(context.Background(), "etcd", true)
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)
sch, err := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port")
require.NoError(t, err)
sch := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port")
scheduler.RegisterSchedulerFactory(proto.TaskTypeExample,
func(ctx context.Context, task *proto.Task, param scheduler.Param) scheduler.Scheduler {
mockScheduler := sch.MockScheduler(task)
Expand Down
4 changes: 0 additions & 4 deletions pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ go_library(
"//pkg/disttask/framework/taskexecutor/execute",
"//pkg/domain/infosync",
"//pkg/metrics",
"//pkg/resourcemanager/pool/spool",
"//pkg/resourcemanager/util",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/cpu",
Expand Down Expand Up @@ -59,8 +57,6 @@ go_test(
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/kv",
"//pkg/resourcemanager/pool/spool",
"//pkg/resourcemanager/util",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util/logutil",
Expand Down
51 changes: 7 additions & 44 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -37,7 +35,6 @@ import (
)

var (
executorPoolSize int32 = 4
// same as scheduler
checkTime = 300 * time.Millisecond
recoverMetaInterval = 90 * time.Second
Expand All @@ -49,30 +46,10 @@ var (
}
)

// ManagerBuilder is used to build a Manager.
type ManagerBuilder struct {
newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)
}

// NewManagerBuilder creates a new ManagerBuilder.
func NewManagerBuilder() *ManagerBuilder {
return &ManagerBuilder{
newPool: func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error) {
return spool.NewPool(name, size, component, options...)
},
}
}

// setPoolFactory sets the poolFactory to mock the Pool in unit test.
func (b *ManagerBuilder) setPoolFactory(poolFactory func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)) {
b.newPool = poolFactory
}

// Manager monitors the task table and manages the taskExecutors.
type Manager struct {
taskTable TaskTable
executorPool Pool
mu struct {
taskTable TaskTable
mu struct {
sync.RWMutex
// taskID -> CancelCauseFunc.
// CancelCauseFunc is used to fast cancel the executor.Run.
Expand All @@ -81,18 +58,18 @@ type Manager struct {
// id, it's the same as server id now, i.e. host:port.
id string
wg tidbutil.WaitGroupWrapper
executorWG tidbutil.WaitGroupWrapper
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
newPool func(name string, size int32, component util.Component, options ...spool.Option) (Pool, error)
slotManager *slotManager

totalCPU int
totalMem int64
}

// BuildManager builds a Manager.
func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error) {
// NewManager creates a new task executor Manager.
func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error) {
totalMem, err := memory.MemTotal()
if err != nil {
// should not happen normally, as in main function of tidb-server, we assert
Expand All @@ -109,7 +86,6 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
id: id,
taskTable: taskTable,
logger: logutil.BgLogger(),
newPool: b.newPool,
slotManager: &slotManager{
taskID2Index: make(map[int64]int),
executorTasks: make([]*proto.Task, 0),
Expand All @@ -121,12 +97,6 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
m.ctx, m.cancel = context.WithCancel(ctx)
m.mu.handlingTasks = make(map[int64]context.CancelCauseFunc)

executorPool, err := m.newPool("executor_pool", executorPoolSize, util.DistTask)
if err != nil {
return nil, err
}
m.executorPool = executorPool

return m, nil
}

Expand Down Expand Up @@ -184,7 +154,7 @@ func (m *Manager) Start() error {
// Stop stops the Manager.
func (m *Manager) Stop() {
m.cancel()
m.executorPool.ReleaseAndWait()
m.executorWG.Wait()
m.wg.Wait()
}

Expand Down Expand Up @@ -270,18 +240,11 @@ func (m *Manager) handleExecutableTasks(tasks []*proto.Task) {
m.addHandlingTask(task.ID)
m.slotManager.alloc(task)
t := task
err = m.executorPool.Run(func() {
m.executorWG.Go(func() {
defer m.slotManager.free(t.ID)
m.handleExecutableTask(t)
m.removeHandlingTask(t.ID)
})
// pool closed.
if err != nil {
m.slotManager.free(t.ID)
m.removeHandlingTask(task.ID)
m.logErr(err)
return
}
}
}

Expand Down
Loading

0 comments on commit ab62921

Please sign in to comment.