Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hostport pool manager #474

Merged
merged 1 commit into from
Apr 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add hostport pool manager
Signed-off-by: duanmengkk <duanmeng_yewu@cmss.chinamobile.com>
duanmengkk committed Apr 21, 2024
commit b0f34784fcc9a4060845227aea3b826c5b132524
18 changes: 15 additions & 3 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (

"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2"
@@ -75,10 +76,21 @@ func run(ctx context.Context, opts *options.Options) error {
return fmt.Errorf("failed to build controller manager: %v", err)
}

hostKubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("could not create clientset: %v", err)
}

hostPortManager, err := vcnodecontroller.NewHostPortManager(hostKubeClient)
if err != nil {
return fmt.Errorf("failed to create host port manager: %v", err)
}

VirtualClusterInitController := controller.VirtualClusterInitController{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName),
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName),
HostPortManager: hostPortManager,
}
if err = VirtualClusterInitController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", constants.InitControllerName, err)
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package vcnodecontroller

import (
"context"
"fmt"
"sync"

"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

/**
apiVersion: v1
kind: ConfigMap
metadata:
name: kosmos-hostports
namespace: kosmos-system
data:
config.yaml: |
PortsPool:
- 5443
- 6443
- 7443
ClusterPorts:
- Port: 5443
Cluster: "cluster1"
- Port: 6443
Cluster: "cluster2"
*/

type HostPortManager struct {
HostPortPool *HostPortPool
kubeClient kubernetes.Interface
lock sync.Mutex
}

type HostPortPool struct {
PortsPool []int32 `yaml:"portsPool"`
ClusterPorts []ClusterPort `yaml:"clusterPorts"`
}

type ClusterPort struct {
Port int32 `yaml:"port"`
Cluster string `yaml:"cluster"`
}

func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) {
//todo magic Value
hostPorts, err := client.CoreV1().ConfigMaps("kosmos-system").Get(context.TODO(), "kosmos-hostports", metav1.GetOptions{})
if err != nil {
return nil, err
}
//todo magic Value
yamlData, exist := hostPorts.Data["config.yaml"]
if !exist {
return nil, fmt.Errorf("hostports not found in configmap")
}

var hostPool HostPortPool
if err := yaml.Unmarshal([]byte(yamlData), &hostPool); err != nil {
return nil, err
}
manager := &HostPortManager{
HostPortPool: &hostPool,
kubeClient: client,
}
return manager, nil
}

func (m *HostPortManager) AllocateHostIP(clusterName string) (int32, error) {
m.lock.Lock()
defer m.lock.Unlock()
for _, port := range m.HostPortPool.PortsPool {
if !m.isPortAllocated(port) {
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
return port, nil
}
}
// todo 更新 cm
return 0, fmt.Errorf("no available ports to allocate")
}

func (m *HostPortManager) ReleaseHostIP(clusterName string) error {
m.lock.Lock()
defer m.lock.Unlock()

for i, cp := range m.HostPortPool.ClusterPorts {
if cp.Cluster == clusterName {
// Remove the entry from the slice
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts[:i], m.HostPortPool.ClusterPorts[i+1:]...)
return nil
}
}
// todo 更新 cm
return fmt.Errorf("no port found for cluster %s", clusterName)
}

func (m *HostPortManager) isPortAllocated(port int32) bool {
for _, cp := range m.HostPortPool.ClusterPorts {
if cp.Port == port {
return true
}
}
return false
}
13 changes: 7 additions & 6 deletions pkg/kubenest/controller/virtualcluster_execute_controller.go
Original file line number Diff line number Diff line change
@@ -14,17 +14,18 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/workflow"
)

type Executer struct {
type Executor struct {
client.Client
virtualCluster *v1alpha1.VirtualCluster
phase *workflow.Phase
config *rest.Config
}

func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config) (*Executer, error) {
func NewExecutor(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config, hostPortManager *vcnodecontroller.HostPortManager) (*Executor, error) {
var phase *workflow.Phase

action := recognizeActionFor(virtualCluster)
@@ -35,22 +36,22 @@ func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, confi
kubenest.NewInitOptWithKubeconfig(config),
}
options := kubenest.NewPhaseInitOptions(opts...)
phase = kubenest.NewInitPhase(options)
phase = kubenest.NewInitPhase(options, hostPortManager)
case constants.DeInitAction:
//TODO deinit
default:
return nil, fmt.Errorf("failed to recognize action for virtual cluster %s", virtualCluster.Name)
}

return &Executer{
return &Executor{
virtualCluster: virtualCluster,
Client: c,
phase: phase,
config: config,
}, nil
}

func (e *Executer) Execute() error {
func (e *Executor) Execute() error {
klog.InfoS("Start execute the workflow", "workflow", "virtual cluster", klog.KObj(e.virtualCluster))

if err := e.phase.Run(); err != nil {
@@ -64,7 +65,7 @@ func (e *Executer) Execute() error {
return nil
}

func (e *Executer) afterRunPhase() error {
func (e *Executor) afterRunPhase() error {
localClusterClient, err := clientset.NewForConfig(e.config)
if err != nil {
return fmt.Errorf("error when creating local cluster client, err: %w", err)
8 changes: 5 additions & 3 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
@@ -19,12 +19,14 @@ import (

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
)

type VirtualClusterInitController struct {
client.Client
Config *rest.Config
EventRecorder record.EventRecorder
Config *rest.Config
EventRecorder record.EventRecorder
HostPortManager *vcnodecontroller.HostPortManager
}

func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
@@ -74,7 +76,7 @@ func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) err

func (c *VirtualClusterInitController) syncVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) error {
klog.V(2).Infof("Reconciling virtual cluster", "name", virtualCluster.Name)
executer, err := NewExecuter(virtualCluster, c.Client, c.Config)
executer, err := NewExecutor(virtualCluster, c.Client, c.Config, c.HostPortManager)
if err != nil {
return err
}
8 changes: 7 additions & 1 deletion pkg/kubenest/controlplane/apiserver.go
Original file line number Diff line number Diff line change
@@ -10,13 +10,19 @@ import (
clientsetscheme "k8s.io/client-go/kubernetes/scheme"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
)

var errAllocated = errors.New("provided port is already allocated")

func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string) error {
func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error {
_, err := manager.AllocateHostIP(name)
if err != nil {
return fmt.Errorf("failed to allocate host ip for virtual cluster apiserver, err: %w", err)
}

if err := installAPIServer(client, name, namespace); err != nil {
return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err)
}
13 changes: 10 additions & 3 deletions pkg/kubenest/init.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/tasks"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
"github.com/kosmos.io/kosmos/pkg/kubenest/util/cert"
@@ -31,6 +32,7 @@ type initData struct {
kosmosClient versioned.Interface
virtualClusterDataDir string
privateRegistry string
hostPortManager *vcnodecontroller.HostPortManager
}

type InitOptions struct {
@@ -42,7 +44,7 @@ type InitOptions struct {
virtualCluster *v1alpha1.VirtualCluster
}

func NewInitPhase(opts *InitOptions) *workflow.Phase {
func NewInitPhase(opts *InitOptions, hostPortManager *vcnodecontroller.HostPortManager) *workflow.Phase {
initPhase := workflow.NewPhase()

initPhase.AppendTask(tasks.NewVirtualClusterServiceTask())
@@ -56,7 +58,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase {
initPhase.AppendTask(tasks.NewCheckControlPlaneTask())

initPhase.SetDataInitializer(func() (workflow.RunData, error) {
return newRunData(opts)
return newRunData(opts, hostPortManager)
})
return initPhase
}
@@ -95,7 +97,7 @@ func NewInitOptWithKubeconfig(config *rest.Config) InitOpt {
}
}

func newRunData(opt *InitOptions) (*initData, error) {
func newRunData(opt *InitOptions, hostPortManager *vcnodecontroller.HostPortManager) (*initData, error) {
if err := opt.Validate(); err != nil {
return nil, err
}
@@ -138,6 +140,7 @@ func newRunData(opt *InitOptions) (*initData, error) {
virtualClusterDataDir: opt.virtualClusterDataDir,
privateRegistry: utils.DefaultImageRepository,
CertStore: cert.NewCertStore(),
hostPortManager: hostPortManager,
}, nil
}

@@ -165,6 +168,10 @@ func (i initData) GetNamespace() string {
return i.namespace
}

func (i initData) GetHostPortManager() *vcnodecontroller.HostPortManager {
return i.hostPortManager
}

func (i initData) ControlplaneAddress() string {
return i.controlplaneAddr
}
1 change: 1 addition & 0 deletions pkg/kubenest/tasks/apiserver.go
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ func runVirtualClusterAPIServer(r workflow.RunData) error {
data.RemoteClient(),
data.GetName(),
data.GetNamespace(),
data.GetHostPortManager(),
)
if err != nil {
return fmt.Errorf("failed to install virtual cluster apiserver component, err: %w", err)
6 changes: 4 additions & 2 deletions pkg/kubenest/tasks/data.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package tasks

import (
clientset "k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/util/cert"
clientset "k8s.io/client-go/kubernetes"

vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
)

type InitData interface {
cert.CertStore
GetName() string
GetNamespace() string
GetHostPortManager() *vcnodecontroller.HostPortManager
ControlplaneAddress() string
ServiceClusterIp() []string
RemoteClient() clientset.Interface