Skip to content

Commit

Permalink
Debug
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Nov 27, 2024
1 parent 21968b4 commit 5f40b20
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 12 deletions.
7 changes: 3 additions & 4 deletions pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type NodeController struct {
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Node]]
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Node]]
parallelism *queue.Parallelism[resourceStageJob[*corev1.Node]]
backoff wait.Backoff
recorder record.EventRecorder
readOnlyFunc func(nodeName string) bool
Expand Down Expand Up @@ -143,9 +144,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
// if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed
func (c *NodeController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Node]) error {
go c.preprocessWorker(ctx)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
c.parallelism = queue.NewParallelismQueue(ctx, c.delayQueue, c.playStageWorker)
go c.watchResources(ctx, events)
return nil
}
Expand Down Expand Up @@ -323,7 +322,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
node, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
node, ok := c.parallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/kwok/controllers/node_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type NodeLeaseController struct {

delayQueue queue.WeightDelayingQueue[string]
holdLeaseSet maps.SyncMap[string, bool]
parallelism *queue.Parallelism[string]

holderIdentity string
onNodeManagedFunc func(nodeName string)
Expand Down Expand Up @@ -99,16 +100,14 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle

// Start starts the NodeLeaseController
func (c *NodeLeaseController) Start(ctx context.Context) error {
for i := uint(0); i < c.leaseParallelism; i++ {
go c.syncWorker(ctx)
}
c.parallelism = queue.NewParallelismQueue(ctx, c.delayQueue, c.syncWorker)
return nil
}

func (c *NodeLeaseController) syncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.parallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type PodController struct {
playStageParallelism uint
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Pod]]
parallelism *queue.Parallelism[resourceStageJob[*corev1.Pod]]
backoff wait.Backoff
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Pod]]
recorder record.EventRecorder
Expand Down Expand Up @@ -148,9 +149,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
// It will modify the pods status to we want
func (c *PodController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) error {
go c.preprocessWorker(ctx)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
c.parallelism = queue.NewParallelismQueue(ctx, c.delayQueue, c.playStageWorker)
go c.watchResources(ctx, events)
return nil
}
Expand Down Expand Up @@ -258,7 +257,7 @@ func (c *PodController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
pod, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
pod, ok := c.parallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/utils/queue/parallelism.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"context"
"time"
)

type Parallelism[T any] struct {
ctx context.Context
startFunc func(ctx context.Context)
latestStart time.Time
Queue[T]
}

func NewParallelismQueue[T any](ctx context.Context, q Queue[T], startFunc func(ctx context.Context)) *Parallelism[T] {
go startFunc(ctx)

return &Parallelism[T]{
ctx: ctx,
startFunc: startFunc,
latestStart: time.Now(),
Queue: q,
}
}

func (p *Parallelism[T]) GetOrWaitWithDone(done <-chan struct{}) (T, bool) {
t, ok := p.Queue.GetOrWaitWithDone(done)
if !ok {
return t, false
}

if p.Len() > 10 {
now := time.Now()
if now.Sub(p.latestStart) > time.Second {
go p.startFunc(p.ctx)
p.latestStart = now
}
}
return t, true
}

0 comments on commit 5f40b20

Please sign in to comment.