Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: concurrent execution of change node tasks #623

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ func run(ctx context.Context, opts *options.Options) error {
return err
}

VirtualClusterNodeController := vcnodecontroller.NodeController{
Client: mgr.GetClient(),
RootClientSet: hostKubeClient,
KosmosClient: kosmosClient,
EventRecorder: mgr.GetEventRecorderFor(constants.NodeControllerName),
Options: &opts.KubeNestOptions,
}
VirtualClusterNodeController := vcnodecontroller.NewNodeController(
mgr.GetClient(),
hostKubeClient,
mgr.GetEventRecorderFor(constants.NodeControllerName),
kosmosClient,
&opts.KubeNestOptions,
)

if err = VirtualClusterNodeController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", constants.NodeControllerName, err)
Expand Down
66 changes: 0 additions & 66 deletions hack/k8s-in-k8s/free_globalnodes.sh

This file was deleted.

115 changes: 115 additions & 0 deletions hack/k8s-in-k8s/globalnodes_helper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/bin/bash

filename="nodes.txt"
readarray -t globalnodes < "$filename"

function updateState() {
local nodename="$1"
local state="$2"
kubectl patch globalnodes $nodename -p '{"spec": {"state": "'$state'"}}' --type=merge
}

function updateNodeState() {
local nodename="$1"
local state="$2"
kubectl patch node $nodename -p '{"metadata": {"labels": {"kosmos-io/state": "'$state'"}}}'
}

function uncordon() {
local nodename="$1"
kubectl uncordon $nodename
kubectl taint nodes $nodename node.kosmos.io/unschedulable-
}


# Update the state of the global nodes
function free_globalnodes() {
local globalnode="$1"
updateState "$globalnode" "free"
updateNodeState "$globalnode" "free"
}



# Update the state of the global nodes
function reserved_globalnodes() {
local globalnode="$1"
updateState "$globalnode" "reserved"
updateNodeState "$globalnode" "reserved"
uncordon "$globalnode"
}


# Function to display progress bar
show_progress() {
local progress=$1
local total=$2
local width=$3

# Calculate percentage
local percent=$((progress * 100 / total))
local num_hashes=$((percent * width / 100))

# Generate progress bar
local bar="["
for ((i = 0; i < width; i++)); do
if ((i < num_hashes)); then
bar+="#"
else
bar+=" "
fi
done
bar+="]"

# Print progress bar with percentage
printf "\rProgress: %s %d%%" "$bar" "$percent"
}

# Total steps for the task
total_steps=${#globalnodes[@]}
# Width of the progress bar
bar_width=50

function free() {
# Simulate a task by looping through steps
for ((step = 1; step <= total_steps; step++)); do
# Simulate work with sleep
index=$((step - 1))
free_globalnodes ${globalnodes[index]}

# Update progress bar
show_progress $step $total_steps $bar_width
done

# Print a new line after the progress bar completes
echo
}

function reserved() {
# Simulate a task by looping through steps
for ((step = 1; step <= total_steps; step++)); do
# Simulate work with sleep
index=$((step - 1))
reserved_globalnodes ${globalnodes[index]}

# Update progress bar
show_progress $step $total_steps $bar_width
done

# Print a new line after the progress bar completes
echo
}


# See how we were called.
case "$1" in
free)
free
;;
reserved)
reserved
;;
*)
echo $"usage: $0 free|reserved"
exit 1
esac
12 changes: 12 additions & 0 deletions pkg/kubenest/controller/virtualcluster.node.controller/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,15 @@ func GetWaitNodeReadTime() int {
}
return num
}

func GetNodeTaskMaxGoroutines() int {
maxGoroutines := os.Getenv("NODE_TASK_MAX_GOROUTINES")
if len(maxGoroutines) == 0 {
maxGoroutines = "10"
}
num, err := strconv.Atoi(maxGoroutines)
if err != nil {
klog.Fatalf("convert NODE_TASK_MAX_GOROUTINES failed, err: %s", err)
}
return num
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package vcnodecontroller
import (
"context"
"fmt"
"sync"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
Expand All @@ -37,6 +40,19 @@ type NodeController struct {
EventRecorder record.EventRecorder
KosmosClient versioned.Interface
Options *options.KubeNestOptions
sem chan struct{}
}

func NewNodeController(client client.Client, RootClientSet kubernetes.Interface, EventRecorder record.EventRecorder, KosmosClient versioned.Interface, Options *options.KubeNestOptions) *NodeController {
r := NodeController{
Client: client,
RootClientSet: RootClientSet,
EventRecorder: EventRecorder,
KosmosClient: KosmosClient,
Options: Options,
sem: make(chan struct{}, env.GetNodeTaskMaxGoroutines()),
}
return &r
}

func (r *NodeController) SetupWithManager(mgr manager.Manager) error {
Expand Down Expand Up @@ -285,20 +301,16 @@ func (r *NodeController) DoNodeClean(ctx context.Context, virtualCluster v1alpha
}

func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, _ kubernetes.Interface) error {
for _, nodeInfo := range nodeInfos {
if err := workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{
return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error {
return workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
Opt: r.Options,
// VirtualK8sClient: _,
}); err != nil {
return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err)
}
}

return nil
})
})
}

func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error {
Expand All @@ -314,34 +326,65 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob
clusterDNS = dnssvc.Spec.ClusterIP
}

for _, nodeInfo := range nodeInfos {
if err := workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{
return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error {
return workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
KubeDNSAddress: clusterDNS,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
VirtualK8sClient: k8sClient,
Opt: r.Options,
}); err != nil {
return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, err)
}
}
return nil
})
})
}

func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error {
for _, nodeInfo := range nodeInfos {
if err := workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{
return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error {
return workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
VirtualK8sClient: k8sClient,
Opt: r.Options,
}); err != nil {
return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err)
})
})
}

func (r *NodeController) BatchProcessNodes(nodeInfos []v1alpha1.GlobalNode, f func(v1alpha1.GlobalNode) error) error {
var wg sync.WaitGroup
errChan := make(chan error, len(nodeInfos))

for _, nodeInfo := range nodeInfos {
wg.Add(1)
r.sem <- struct{}{}
go func(nodeInfo v1alpha1.GlobalNode) {
defer wg.Done()
defer func() { <-r.sem }()
if err := f(nodeInfo); err != nil {
errChan <- fmt.Errorf("[%s] batchprocessnodes failed: %s", nodeInfo.Name, err)
}
}(nodeInfo)
}

wg.Wait()
close(errChan)

var taskErr error
for err := range errChan {
if err != nil {
if taskErr == nil {
taskErr = err
} else {
taskErr = errors.Wrap(err, taskErr.Error())
}
}
}

if taskErr != nil {
return taskErr
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package task

import (
"fmt"

"k8s.io/klog/v2"
)

type PrefixedLogger struct {
level klog.Verbose
prefix string
}

func NewPrefixedLogger(level klog.Verbose, prefix string) *PrefixedLogger {
return &PrefixedLogger{level: level, prefix: prefix}
}

func (p *PrefixedLogger) Info(args ...interface{}) {
if p.level.Enabled() {
klog.InfoDepth(1, append([]interface{}{p.prefix}, args...)...)
}
}

func (p *PrefixedLogger) Infof(format string, args ...interface{}) {
if p.level.Enabled() {
klog.InfoDepth(1, fmt.Sprintf(p.prefix+format, args...))
}
}

func (p *PrefixedLogger) Error(args ...interface{}) {
klog.ErrorDepth(1, append([]interface{}{p.prefix}, args...)...)
}

func (p *PrefixedLogger) Errorf(format string, args ...interface{}) {
klog.ErrorDepth(1, fmt.Sprintf(p.prefix+format, args...))
}
Loading
Loading