Skip to content

Commit

Permalink
Debug
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Nov 27, 2024
1 parent c55737a commit 60d25d0
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 7 deletions.
9 changes: 7 additions & 2 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Controller struct {

podOnNodeManageQueue queue.Queue[string]
nodeManageQueue queue.Queue[string]

podOnNodeManageQueueParallelism *queue.Parallelism[string]
nodeManageQueueParallelism *queue.Parallelism[string]
}

// Config is the configuration for the controller
Expand Down Expand Up @@ -295,6 +298,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
c.nodeLeases.ReleaseHold(nodeName)
}

c.nodeManageQueueParallelism = queue.NewParallelismQueue(ctx, c.nodeManageQueue, c.nodeLeaseSyncWorker)
go c.nodeLeaseSyncWorker(ctx)

err = c.nodeLeases.Start(ctx)
Expand All @@ -307,7 +311,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
func (c *Controller) nodeLeaseSyncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.nodeManageQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.nodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down Expand Up @@ -339,6 +343,7 @@ func (c *Controller) startStageController(ctx context.Context, ref internalversi
return fmt.Errorf("failed to init pod controller: %w", err)
}

c.podOnNodeManageQueueParallelism = queue.NewParallelismQueue(ctx, c.podOnNodeManageQueue, c.podsOnNodeSyncWorker)
go c.podsOnNodeSyncWorker(ctx)

case nodeRef:
Expand Down Expand Up @@ -559,7 +564,7 @@ func (c *Controller) Start(ctx context.Context) error {
func (c *Controller) podsOnNodeSyncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.podOnNodeManageQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.podOnNodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
func (c *NodeController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Node]) error {
go c.preprocessWorker(ctx)
c.parallelism = queue.NewParallelismQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
go c.watchResources(ctx, events)
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kwok/controllers/node_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle
// Start starts the NodeLeaseController
func (c *NodeLeaseController) Start(ctx context.Context) error {
c.parallelism = queue.NewParallelismQueue(ctx, c.delayQueue, c.syncWorker)
for i := uint(0); i < c.leaseParallelism; i++ {
go c.syncWorker(ctx)
}
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
func (c *PodController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) error {
go c.preprocessWorker(ctx)
c.parallelism = queue.NewParallelismQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
go c.watchResources(ctx, events)
return nil
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/utils/queue/parallelism.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ type Parallelism[T any] struct {
}

func NewParallelismQueue[T any](ctx context.Context, q Queue[T], startFunc func(ctx context.Context)) *Parallelism[T] {
go startFunc(ctx)
go startFunc(ctx)
go startFunc(ctx)
go startFunc(ctx)

return &Parallelism[T]{
ctx: ctx,
startFunc: startFunc,
Expand Down

0 comments on commit 60d25d0

Please sign in to comment.