Skip to content

Commit

Permalink
Merge pull request #474 from duanmengkk/main
Browse files Browse the repository at this point in the history
add hostport pool manager
  • Loading branch information
kosmos-robot authored Apr 21, 2024
2 parents 9bbb7b3 + b0f3478 commit f18f7e4
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 18 deletions.
18 changes: 15 additions & 3 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -75,10 +76,21 @@ func run(ctx context.Context, opts *options.Options) error {
return fmt.Errorf("failed to build controller manager: %v", err)
}

hostKubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("could not create clientset: %v", err)
}

hostPortManager, err := vcnodecontroller.NewHostPortManager(hostKubeClient)
if err != nil {
return fmt.Errorf("failed to create host port manager: %v", err)
}

VirtualClusterInitController := controller.VirtualClusterInitController{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName),
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName),
HostPortManager: hostPortManager,
}
if err = VirtualClusterInitController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", constants.InitControllerName, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package vcnodecontroller

import (
"context"
"fmt"
"sync"

"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

/**
apiVersion: v1
kind: ConfigMap
metadata:
name: kosmos-hostports
namespace: kosmos-system
data:
config.yaml: |
PortsPool:
- 5443
- 6443
- 7443
ClusterPorts:
- Port: 5443
Cluster: "cluster1"
- Port: 6443
Cluster: "cluster2"
*/

type HostPortManager struct {
HostPortPool *HostPortPool
kubeClient kubernetes.Interface
lock sync.Mutex
}

type HostPortPool struct {
PortsPool []int32 `yaml:"portsPool"`
ClusterPorts []ClusterPort `yaml:"clusterPorts"`
}

type ClusterPort struct {
Port int32 `yaml:"port"`
Cluster string `yaml:"cluster"`
}

func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) {
//todo magic Value
hostPorts, err := client.CoreV1().ConfigMaps("kosmos-system").Get(context.TODO(), "kosmos-hostports", metav1.GetOptions{})
if err != nil {
return nil, err
}
//todo magic Value
yamlData, exist := hostPorts.Data["config.yaml"]
if !exist {
return nil, fmt.Errorf("hostports not found in configmap")
}

var hostPool HostPortPool
if err := yaml.Unmarshal([]byte(yamlData), &hostPool); err != nil {
return nil, err
}
manager := &HostPortManager{
HostPortPool: &hostPool,
kubeClient: client,
}
return manager, nil
}

func (m *HostPortManager) AllocateHostIP(clusterName string) (int32, error) {
m.lock.Lock()
defer m.lock.Unlock()
for _, port := range m.HostPortPool.PortsPool {
if !m.isPortAllocated(port) {
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
return port, nil
}
}
// todo 更新 cm
return 0, fmt.Errorf("no available ports to allocate")
}

func (m *HostPortManager) ReleaseHostIP(clusterName string) error {
m.lock.Lock()
defer m.lock.Unlock()

for i, cp := range m.HostPortPool.ClusterPorts {
if cp.Cluster == clusterName {
// Remove the entry from the slice
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts[:i], m.HostPortPool.ClusterPorts[i+1:]...)
return nil
}
}
// todo 更新 cm
return fmt.Errorf("no port found for cluster %s", clusterName)
}

func (m *HostPortManager) isPortAllocated(port int32) bool {
for _, cp := range m.HostPortPool.ClusterPorts {
if cp.Port == port {
return true
}
}
return false
}
13 changes: 7 additions & 6 deletions pkg/kubenest/controller/virtualcluster_execute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/workflow"
)

type Executer struct {
type Executor struct {
client.Client
virtualCluster *v1alpha1.VirtualCluster
phase *workflow.Phase
config *rest.Config
}

func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config) (*Executer, error) {
func NewExecutor(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config, hostPortManager *vcnodecontroller.HostPortManager) (*Executor, error) {
var phase *workflow.Phase

action := recognizeActionFor(virtualCluster)
Expand All @@ -35,22 +36,22 @@ func NewExecuter(virtualCluster *v1alpha1.VirtualCluster, c client.Client, confi
kubenest.NewInitOptWithKubeconfig(config),
}
options := kubenest.NewPhaseInitOptions(opts...)
phase = kubenest.NewInitPhase(options)
phase = kubenest.NewInitPhase(options, hostPortManager)
case constants.DeInitAction:
//TODO deinit
default:
return nil, fmt.Errorf("failed to recognize action for virtual cluster %s", virtualCluster.Name)
}

return &Executer{
return &Executor{
virtualCluster: virtualCluster,
Client: c,
phase: phase,
config: config,
}, nil
}

func (e *Executer) Execute() error {
func (e *Executor) Execute() error {
klog.InfoS("Start execute the workflow", "workflow", "virtual cluster", klog.KObj(e.virtualCluster))

if err := e.phase.Run(); err != nil {
Expand All @@ -64,7 +65,7 @@ func (e *Executer) Execute() error {
return nil
}

func (e *Executer) afterRunPhase() error {
func (e *Executor) afterRunPhase() error {
localClusterClient, err := clientset.NewForConfig(e.config)
if err != nil {
return fmt.Errorf("error when creating local cluster client, err: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
)

type VirtualClusterInitController struct {
client.Client
Config *rest.Config
EventRecorder record.EventRecorder
Config *rest.Config
EventRecorder record.EventRecorder
HostPortManager *vcnodecontroller.HostPortManager
}

func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand Down Expand Up @@ -74,7 +76,7 @@ func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) err

func (c *VirtualClusterInitController) syncVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) error {
klog.V(2).Infof("Reconciling virtual cluster", "name", virtualCluster.Name)
executer, err := NewExecuter(virtualCluster, c.Client, c.Config)
executer, err := NewExecutor(virtualCluster, c.Client, c.Config, c.HostPortManager)
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kubenest/controlplane/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ import (
clientsetscheme "k8s.io/client-go/kubernetes/scheme"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
)

var errAllocated = errors.New("provided port is already allocated")

func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string) error {
func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error {
_, err := manager.AllocateHostIP(name)
if err != nil {
return fmt.Errorf("failed to allocate host ip for virtual cluster apiserver, err: %w", err)
}

if err := installAPIServer(client, name, namespace); err != nil {
return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err)
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/kubenest/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/tasks"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
"github.com/kosmos.io/kosmos/pkg/kubenest/util/cert"
Expand All @@ -31,6 +32,7 @@ type initData struct {
kosmosClient versioned.Interface
virtualClusterDataDir string
privateRegistry string
hostPortManager *vcnodecontroller.HostPortManager
}

type InitOptions struct {
Expand All @@ -42,7 +44,7 @@ type InitOptions struct {
virtualCluster *v1alpha1.VirtualCluster
}

func NewInitPhase(opts *InitOptions) *workflow.Phase {
func NewInitPhase(opts *InitOptions, hostPortManager *vcnodecontroller.HostPortManager) *workflow.Phase {
initPhase := workflow.NewPhase()

initPhase.AppendTask(tasks.NewVirtualClusterServiceTask())
Expand All @@ -56,7 +58,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase {
initPhase.AppendTask(tasks.NewCheckControlPlaneTask())

initPhase.SetDataInitializer(func() (workflow.RunData, error) {
return newRunData(opts)
return newRunData(opts, hostPortManager)
})
return initPhase
}
Expand Down Expand Up @@ -95,7 +97,7 @@ func NewInitOptWithKubeconfig(config *rest.Config) InitOpt {
}
}

func newRunData(opt *InitOptions) (*initData, error) {
func newRunData(opt *InitOptions, hostPortManager *vcnodecontroller.HostPortManager) (*initData, error) {
if err := opt.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,6 +140,7 @@ func newRunData(opt *InitOptions) (*initData, error) {
virtualClusterDataDir: opt.virtualClusterDataDir,
privateRegistry: utils.DefaultImageRepository,
CertStore: cert.NewCertStore(),
hostPortManager: hostPortManager,
}, nil
}

Expand Down Expand Up @@ -165,6 +168,10 @@ func (i initData) GetNamespace() string {
return i.namespace
}

func (i initData) GetHostPortManager() *vcnodecontroller.HostPortManager {
return i.hostPortManager
}

func (i initData) ControlplaneAddress() string {
return i.controlplaneAddr
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kubenest/tasks/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func runVirtualClusterAPIServer(r workflow.RunData) error {
data.RemoteClient(),
data.GetName(),
data.GetNamespace(),
data.GetHostPortManager(),
)
if err != nil {
return fmt.Errorf("failed to install virtual cluster apiserver component, err: %w", err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kubenest/tasks/data.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package tasks

import (
clientset "k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/util/cert"
clientset "k8s.io/client-go/kubernetes"

vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
)

type InitData interface {
cert.CertStore
GetName() string
GetNamespace() string
GetHostPortManager() *vcnodecontroller.HostPortManager
ControlplaneAddress() string
ServiceClusterIp() []string
RemoteClient() clientset.Interface
Expand Down

0 comments on commit f18f7e4

Please sign in to comment.