Skip to content

Commit

Permalink
fix: concurrent execution of change node tasks
Browse files Browse the repository at this point in the history
Signed-off-by: baoyinghai_yewu <[email protected]>
  • Loading branch information
OrangeBao committed Jun 4, 2024
1 parent 48b80e9 commit 944430d
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 124 deletions.
14 changes: 7 additions & 7 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ func run(ctx context.Context, opts *options.Options) error {
return err
}

VirtualClusterNodeController := vcnodecontroller.NodeController{
Client: mgr.GetClient(),
RootClientSet: hostKubeClient,
KosmosClient: kosmosClient,
EventRecorder: mgr.GetEventRecorderFor(constants.NodeControllerName),
Options: &opts.KubeNestOptions,
}
VirtualClusterNodeController := vcnodecontroller.NewNodeController(
mgr.GetClient(),
hostKubeClient,
mgr.GetEventRecorderFor(constants.NodeControllerName),
kosmosClient,
&opts.KubeNestOptions,
)

if err = VirtualClusterNodeController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", constants.NodeControllerName, err)
Expand Down
66 changes: 0 additions & 66 deletions hack/k8s-in-k8s/free_globalnodes.sh

This file was deleted.

115 changes: 115 additions & 0 deletions hack/k8s-in-k8s/globalnodes_helper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/bin/bash

filename="nodes.txt"
readarray -t globalnodes < "$filename"

function updateState() {
local nodename="$1"
local state="$2"
kubectl patch globalnodes $nodename -p '{"spec": {"state": "'$state'"}}' --type=merge
}

function updateNodeState() {
local nodename="$1"
local state="$2"
kubectl patch node $nodename -p '{"metadata": {"labels": {"kosmos-io/state": "'$state'"}}}'
}

function uncordon() {
local nodename="$1"
kubectl uncordon $nodename
kubectl taint nodes $nodename node.kosmos.io/unschedulable-
}


# Update the state of the global nodes
function free_globalnodes() {
local globalnode="$1"
updateState "$globalnode" "free"
updateNodeState "$globalnode" "free"
}



# Update the state of the global nodes
function reserved_globalnodes() {
local globalnode="$1"
updateState "$globalnode" "reserved"
updateNodeState "$globalnode" "reserved"
uncordon "$globalnode"
}


# Function to display progress bar
show_progress() {
local progress=$1
local total=$2
local width=$3

# Calculate percentage
local percent=$((progress * 100 / total))
local num_hashes=$((percent * width / 100))

# Generate progress bar
local bar="["
for ((i = 0; i < width; i++)); do
if ((i < num_hashes)); then
bar+="#"
else
bar+=" "
fi
done
bar+="]"

# Print progress bar with percentage
printf "\rProgress: %s %d%%" "$bar" "$percent"
}

# Total steps for the task
total_steps=${#globalnodes[@]}
# Width of the progress bar
bar_width=50

function free() {
# Simulate a task by looping through steps
for ((step = 1; step <= total_steps; step++)); do
# Simulate work with sleep
index=$((step - 1))
free_globalnodes ${globalnodes[index]}

# Update progress bar
show_progress $step $total_steps $bar_width
done

# Print a new line after the progress bar completes
echo
}

function reserved() {
# Simulate a task by looping through steps
for ((step = 1; step <= total_steps; step++)); do
# Simulate work with sleep
index=$((step - 1))
reserved_globalnodes ${globalnodes[index]}

# Update progress bar
show_progress $step $total_steps $bar_width
done

# Print a new line after the progress bar completes
echo
}


# See how we were called.
case "$1" in
free)
free
;;
reserved)
reserved
;;
*)
echo $"usage: $0 free|reserved"
exit 1
esac
12 changes: 12 additions & 0 deletions pkg/kubenest/controller/virtualcluster.node.controller/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,15 @@ func GetWaitNodeReadTime() int {
}
return num
}

func GetNodeTaskMaxGoroutines() int {
maxGoroutines := os.Getenv("NODE_TASK_MAX_GOROUTINES")
if len(maxGoroutines) == 0 {
maxGoroutines = "10"
}
num, err := strconv.Atoi(maxGoroutines)
if err != nil {
klog.Fatalf("convert NODE_TASK_MAX_GOROUTINES failed, err: %s", err)
}
return num
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package vcnodecontroller
import (
"context"
"fmt"
"sync"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
Expand All @@ -37,6 +40,19 @@ type NodeController struct {
EventRecorder record.EventRecorder
KosmosClient versioned.Interface
Options *options.KubeNestOptions
sem chan struct{}
}

func NewNodeController(client client.Client, RootClientSet kubernetes.Interface, EventRecorder record.EventRecorder, KosmosClient versioned.Interface, Options *options.KubeNestOptions) *NodeController {
r := NodeController{
Client: client,
RootClientSet: RootClientSet,
EventRecorder: EventRecorder,
KosmosClient: KosmosClient,
Options: Options,
sem: make(chan struct{}, env.GetNodeTaskMaxGoroutines()),
}
return &r
}

func (r *NodeController) SetupWithManager(mgr manager.Manager) error {
Expand Down Expand Up @@ -285,20 +301,16 @@ func (r *NodeController) DoNodeClean(ctx context.Context, virtualCluster v1alpha
}

func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, _ kubernetes.Interface) error {
for _, nodeInfo := range nodeInfos {
if err := workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{
return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error {
return workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
Opt: r.Options,
// VirtualK8sClient: _,
}); err != nil {
return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err)
}
}

return nil
})
})
}

func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error {
Expand All @@ -314,34 +326,65 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob
clusterDNS = dnssvc.Spec.ClusterIP
}

for _, nodeInfo := range nodeInfos {
if err := workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{
return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error {
return workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
KubeDNSAddress: clusterDNS,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
VirtualK8sClient: k8sClient,
Opt: r.Options,
}); err != nil {
return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, err)
}
}
return nil
})
})
}

func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error {
for _, nodeInfo := range nodeInfos {
if err := workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{
return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error {
return workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
VirtualK8sClient: k8sClient,
Opt: r.Options,
}); err != nil {
return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err)
})
})
}

func (r *NodeController) BatchProcessNodes(nodeInfos []v1alpha1.GlobalNode, f func(v1alpha1.GlobalNode) error) error {
var wg sync.WaitGroup
errChan := make(chan error, len(nodeInfos))

for _, nodeInfo := range nodeInfos {
wg.Add(1)
r.sem <- struct{}{}
go func(nodeInfo v1alpha1.GlobalNode) {
defer wg.Done()
defer func() { <-r.sem }()
if err := f(nodeInfo); err != nil {
errChan <- fmt.Errorf("[%s] batchprocessnodes failed: %s", nodeInfo.Name, err)
}
}(nodeInfo)
}

wg.Wait()
close(errChan)

var taskErr error
for err := range errChan {
if err != nil {
if taskErr == nil {
taskErr = err
} else {
taskErr = errors.Wrap(err, taskErr.Error())
}
}
}

if taskErr != nil {
return taskErr
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package task

import (
"fmt"

"k8s.io/klog/v2"
)

type PrefixedLogger struct {
level klog.Verbose
prefix string
}

func NewPrefixedLogger(level klog.Verbose, prefix string) *PrefixedLogger {
return &PrefixedLogger{level: level, prefix: prefix}
}

func (p *PrefixedLogger) Info(args ...interface{}) {
if p.level.Enabled() {
klog.InfoDepth(1, append([]interface{}{p.prefix}, args...)...)
}
}

func (p *PrefixedLogger) Infof(format string, args ...interface{}) {
if p.level.Enabled() {
klog.InfoDepth(1, fmt.Sprintf(p.prefix+format, args...))
}
}

func (p *PrefixedLogger) Error(args ...interface{}) {
klog.ErrorDepth(1, append([]interface{}{p.prefix}, args...)...)
}

func (p *PrefixedLogger) Errorf(format string, args ...interface{}) {
klog.ErrorDepth(1, fmt.Sprintf(p.prefix+format, args...))
}
Loading

0 comments on commit 944430d

Please sign in to comment.