From b0f34784fcc9a4060845227aea3b826c5b132524 Mon Sep 17 00:00:00 2001 From: duanmengkk Date: Sun, 21 Apr 2024 22:12:28 +0800 Subject: [PATCH] add hostport pool manager Signed-off-by: duanmengkk --- cmd/kubenest/operator/app/operator.go | 18 ++- .../host_port_manager.go | 107 ++++++++++++++++++ .../virtualcluster_execute_controller.go | 13 ++- .../virtualcluster_init_controller.go | 8 +- pkg/kubenest/controlplane/apiserver.go | 8 +- pkg/kubenest/init.go | 13 ++- pkg/kubenest/tasks/apiserver.go | 1 + pkg/kubenest/tasks/data.go | 6 +- 8 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 pkg/kubenest/controller/virtualcluster.node.controller/host_port_manager.go diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index 41fda9911..df69aca52 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" cliflag "k8s.io/component-base/cli/flag" "k8s.io/klog/v2" @@ -75,10 +76,21 @@ func run(ctx context.Context, opts *options.Options) error { return fmt.Errorf("failed to build controller manager: %v", err) } + hostKubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("could not create clientset: %v", err) + } + + hostPortManager, err := vcnodecontroller.NewHostPortManager(hostKubeClient) + if err != nil { + return fmt.Errorf("failed to create host port manager: %v", err) + } + VirtualClusterInitController := controller.VirtualClusterInitController{ - Client: mgr.GetClient(), - Config: mgr.GetConfig(), - EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName), + Client: mgr.GetClient(), + Config: mgr.GetConfig(), + EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName), + HostPortManager: hostPortManager, } if err = VirtualClusterInitController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", constants.InitControllerName, err) diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/host_port_manager.go b/pkg/kubenest/controller/virtualcluster.node.controller/host_port_manager.go new file mode 100644 index 000000000..aee51c25a --- /dev/null +++ b/pkg/kubenest/controller/virtualcluster.node.controller/host_port_manager.go @@ -0,0 +1,107 @@ +package vcnodecontroller + +import ( + "context" + "fmt" + "sync" + + "gopkg.in/yaml.v3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +/** +apiVersion: v1 +kind: ConfigMap +metadata: + name: kosmos-hostports + namespace: kosmos-system +data: + config.yaml: | + PortsPool: + - 5443 + - 6443 + - 7443 + ClusterPorts: + - Port: 5443 + Cluster: "cluster1" + - Port: 6443 + Cluster: "cluster2" +*/ + +type HostPortManager struct { + HostPortPool *HostPortPool + kubeClient kubernetes.Interface + lock sync.Mutex +} + +type HostPortPool struct { + PortsPool []int32 `yaml:"portsPool"` + ClusterPorts []ClusterPort `yaml:"clusterPorts"` +} + +type ClusterPort struct { + Port int32 `yaml:"port"` + Cluster string `yaml:"cluster"` +} + +func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) { + //todo magic Value + hostPorts, err := client.CoreV1().ConfigMaps("kosmos-system").Get(context.TODO(), "kosmos-hostports", metav1.GetOptions{}) + if err != nil { + return nil, err + } + //todo magic Value + yamlData, exist := hostPorts.Data["config.yaml"] + if !exist { + return nil, fmt.Errorf("hostports not found in configmap") + } + + var hostPool HostPortPool + if err := yaml.Unmarshal([]byte(yamlData), &hostPool); err != nil { + return nil, err + } + manager := &HostPortManager{ + HostPortPool: &hostPool, + kubeClient: client, + } + return manager, nil +} + +func (m *HostPortManager) AllocateHostIP(clusterName string) (int32, error) { + m.lock.Lock() + defer m.lock.Unlock() + for _, port := range m.HostPortPool.PortsPool { + if !m.isPortAllocated(port) { + m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName}) + m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName}) + return port, nil + } + } + // todo 更新 cm + return 0, fmt.Errorf("no available ports to allocate") +} + +func (m *HostPortManager) ReleaseHostIP(clusterName string) error { + m.lock.Lock() + defer m.lock.Unlock() + + for i, cp := range m.HostPortPool.ClusterPorts { + if cp.Cluster == clusterName { + // Remove the entry from the slice + m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts[:i], m.HostPortPool.ClusterPorts[i+1:]...) + return nil + } + } + // todo 更新 cm + return fmt.Errorf("no port found for cluster %s", clusterName) +} + +func (m *HostPortManager) isPortAllocated(port int32) bool { + for _, cp := range m.HostPortPool.ClusterPorts { + if cp.Port == port { + return true + } + } + return false +} diff --git a/pkg/kubenest/controller/virtualcluster_execute_controller.go b/pkg/kubenest/controller/virtualcluster_execute_controller.go index 9d4d22e88..2b76daae9 100644 --- a/pkg/kubenest/controller/virtualcluster_execute_controller.go +++ b/pkg/kubenest/controller/virtualcluster_execute_controller.go @@ -14,17 +14,18 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" ) -type Executer struct { +type Executor struct { client.Client virtualCluster *v1alpha1.VirtualCluster phase *workflow.Phase config *rest.Config } -func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config) (*Executer, error) { +func NewExecutor(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config, hostPortManager *vcnodecontroller.HostPortManager) (*Executor, error) { var phase *workflow.Phase action := recognizeActionFor(virtualCluster) @@ -35,14 +36,14 @@ func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, confi kubenest.NewInitOptWithKubeconfig(config), } options := kubenest.NewPhaseInitOptions(opts...) - phase = kubenest.NewInitPhase(options) + phase = kubenest.NewInitPhase(options, hostPortManager) case constants.DeInitAction: //TODO deinit default: return nil, fmt.Errorf("failed to recognize action for virtual cluster %s", virtualCluster.Name) } - return &Executer{ + return &Executor{ virtualCluster: virtualCluster, Client: c, phase: phase, @@ -50,7 +51,7 @@ func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, confi }, nil } -func (e *Executer) Execute() error { +func (e *Executor) Execute() error { klog.InfoS("Start execute the workflow", "workflow", "virtual cluster", klog.KObj(e.virtualCluster)) if err := e.phase.Run(); err != nil { @@ -64,7 +65,7 @@ func (e *Executer) Execute() error { return nil } -func (e *Executer) afterRunPhase() error { +func (e *Executor) afterRunPhase() error { localClusterClient, err := clientset.NewForConfig(e.config) if err != nil { return fmt.Errorf("error when creating local cluster client, err: %w", err) diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 95168da20..5aa4198ea 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -19,12 +19,14 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" ) type VirtualClusterInitController struct { client.Client - Config *rest.Config - EventRecorder record.EventRecorder + Config *rest.Config + EventRecorder record.EventRecorder + HostPortManager *vcnodecontroller.HostPortManager } func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -74,7 +76,7 @@ func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) err func (c *VirtualClusterInitController) syncVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) error { klog.V(2).Infof("Reconciling virtual cluster", "name", virtualCluster.Name) - executer, err := NewExecuter(virtualCluster, c.Client, c.Config) + executer, err := NewExecutor(virtualCluster, c.Client, c.Config, c.HostPortManager) if err != nil { return err } diff --git a/pkg/kubenest/controlplane/apiserver.go b/pkg/kubenest/controlplane/apiserver.go index 141480cde..ded8de7fb 100644 --- a/pkg/kubenest/controlplane/apiserver.go +++ b/pkg/kubenest/controlplane/apiserver.go @@ -10,13 +10,19 @@ import ( clientsetscheme "k8s.io/client-go/kubernetes/scheme" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver" "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) var errAllocated = errors.New("provided port is already allocated") -func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string) error { +func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error { + _, err := manager.AllocateHostIP(name) + if err != nil { + return fmt.Errorf("failed to allocate host ip for virtual cluster apiserver, err: %w", err) + } + if err := installAPIServer(client, name, namespace); err != nil { return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err) } diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 4fd1b98e1..54f9a49ca 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -11,6 +11,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" "github.com/kosmos.io/kosmos/pkg/kubenest/tasks" "github.com/kosmos.io/kosmos/pkg/kubenest/util" "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" @@ -31,6 +32,7 @@ type initData struct { kosmosClient versioned.Interface virtualClusterDataDir string privateRegistry string + hostPortManager *vcnodecontroller.HostPortManager } type InitOptions struct { @@ -42,7 +44,7 @@ type InitOptions struct { virtualCluster *v1alpha1.VirtualCluster } -func NewInitPhase(opts *InitOptions) *workflow.Phase { +func NewInitPhase(opts *InitOptions, hostPortManager *vcnodecontroller.HostPortManager) *workflow.Phase { initPhase := workflow.NewPhase() initPhase.AppendTask(tasks.NewVirtualClusterServiceTask()) @@ -56,7 +58,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase { initPhase.AppendTask(tasks.NewCheckControlPlaneTask()) initPhase.SetDataInitializer(func() (workflow.RunData, error) { - return newRunData(opts) + return newRunData(opts, hostPortManager) }) return initPhase } @@ -95,7 +97,7 @@ func NewInitOptWithKubeconfig(config *rest.Config) InitOpt { } } -func newRunData(opt *InitOptions) (*initData, error) { +func newRunData(opt *InitOptions, hostPortManager *vcnodecontroller.HostPortManager) (*initData, error) { if err := opt.Validate(); err != nil { return nil, err } @@ -138,6 +140,7 @@ func newRunData(opt *InitOptions) (*initData, error) { virtualClusterDataDir: opt.virtualClusterDataDir, privateRegistry: utils.DefaultImageRepository, CertStore: cert.NewCertStore(), + hostPortManager: hostPortManager, }, nil } @@ -165,6 +168,10 @@ func (i initData) GetNamespace() string { return i.namespace } +func (i initData) GetHostPortManager() *vcnodecontroller.HostPortManager { + return i.hostPortManager +} + func (i initData) ControlplaneAddress() string { return i.controlplaneAddr } diff --git a/pkg/kubenest/tasks/apiserver.go b/pkg/kubenest/tasks/apiserver.go index d98fe2340..ed861f893 100644 --- a/pkg/kubenest/tasks/apiserver.go +++ b/pkg/kubenest/tasks/apiserver.go @@ -55,6 +55,7 @@ func runVirtualClusterAPIServer(r workflow.RunData) error { data.RemoteClient(), data.GetName(), data.GetNamespace(), + data.GetHostPortManager(), ) if err != nil { return fmt.Errorf("failed to install virtual cluster apiserver component, err: %w", err) diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go index b813d9352..4ca405c2b 100644 --- a/pkg/kubenest/tasks/data.go +++ b/pkg/kubenest/tasks/data.go @@ -1,16 +1,18 @@ package tasks import ( - clientset "k8s.io/client-go/kubernetes" - "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/util/cert" + clientset "k8s.io/client-go/kubernetes" + + vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" ) type InitData interface { cert.CertStore GetName() string GetNamespace() string + GetHostPortManager() *vcnodecontroller.HostPortManager ControlplaneAddress() string ServiceClusterIp() []string RemoteClient() clientset.Interface