Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Record the assigned port in the virtualcluster #556

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,12 @@ func run(ctx context.Context, opts *options.Options) error {
return fmt.Errorf("could not create clientset: %v", err)
}

hostPortManager, err := vcnodecontroller.NewHostPortManager(hostKubeClient)
if err != nil {
return fmt.Errorf("failed to create host port manager: %v", err)
}

VirtualClusterInitController := controller.VirtualClusterInitController{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName),
HostPortManager: hostPortManager,
RootClientSet: hostKubeClient,
KosmosClient: kosmosClient,
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName),
RootClientSet: hostKubeClient,
KosmosClient: kosmosClient,
}
if err = VirtualClusterInitController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", constants.InitControllerName, err)
Expand Down
3 changes: 3 additions & 0 deletions deploy/crds/kosmos.io_virtualclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ spec:
phase:
description: Phase is the phase of kosmos-operator handling the VirtualCluster
type: string
port:
format: int32
type: integer
reason:
type: string
updateTime:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/kosmos/v1alpha1/virtualcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type VirtualClusterStatus struct {
Reason string `json:"reason,omitempty" protobuf:"bytes,4,opt,name=reason"`
// +optional
UpdateTime *metav1.Time `json:"updateTime,omitempty" protobuf:"bytes,7,opt,name=updateTime"`
// +optional
Port int32 `json:"port,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
6 changes: 6 additions & 0 deletions pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

7 changes: 3 additions & 4 deletions pkg/kubenest/controller/virtualcluster_execute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/workflow"
)

Expand All @@ -22,7 +21,7 @@ type Executor struct {
config *rest.Config
}

func NewExecutor(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config, hostPortManager *vcnodecontroller.HostPortManager) (*Executor, error) {
func NewExecutor(virtualCluster *v1alpha1.VirtualCluster, c client.Client, config *rest.Config) (*Executor, error) {
var phase *workflow.Phase

opts := []kubenest.InitOpt{
Expand All @@ -33,9 +32,9 @@ func NewExecutor(virtualCluster *v1alpha1.VirtualCluster, c client.Client, confi
action := recognizeActionFor(virtualCluster)
switch action {
case constants.InitAction:
phase = kubenest.NewInitPhase(options, hostPortManager)
phase = kubenest.NewInitPhase(options)
case constants.DeInitAction:
phase = kubenest.UninstallPhase(options, hostPortManager)
phase = kubenest.UninstallPhase(options)
default:
return nil, fmt.Errorf("failed to recognize action for virtual cluster %s", virtualCluster.Name)
}
Expand Down
81 changes: 72 additions & 9 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/pkg/errors"
"gopkg.in/yaml.v3"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -29,18 +30,16 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
)

type VirtualClusterInitController struct {
client.Client
Config *rest.Config
EventRecorder record.EventRecorder
HostPortManager *vcnodecontroller.HostPortManager
RootClientSet kubernetes.Interface
KosmosClient versioned.Interface
lock sync.Mutex
Config *rest.Config
EventRecorder record.EventRecorder
RootClientSet kubernetes.Interface
KosmosClient versioned.Interface
lock sync.Mutex
}

type NodePool struct {
Expand All @@ -50,6 +49,10 @@ type NodePool struct {
State string `json:"state" yaml:"state"`
}

type HostPortPool struct {
PortsPool []int32 `yaml:"portsPool"`
}

const (
VirtualClusterControllerFinalizer = "kosmos.io/virtualcluster-controller"
RequeueTime = 10 * time.Second
Expand Down Expand Up @@ -230,7 +233,13 @@ func (c *VirtualClusterInitController) removeFinalizer(virtualCluster *v1alpha1.
func (c *VirtualClusterInitController) createVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) error {
klog.V(2).Infof("Reconciling virtual cluster", "name", virtualCluster.Name)

executer, err := NewExecutor(virtualCluster, c.Client, c.Config, c.HostPortManager)
//Assign host port
_, err := c.AllocateHostPort(virtualCluster)
if err != nil {
return errors.Wrap(err, "Error in assign host port!")
}

executer, err := NewExecutor(virtualCluster, c.Client, c.Config)
if err != nil {
return err
}
Expand Down Expand Up @@ -258,7 +267,7 @@ func (c *VirtualClusterInitController) createVirtualCluster(virtualCluster *v1al

func (c *VirtualClusterInitController) destroyVirtualCluster(virtualCluster *v1alpha1.VirtualCluster) error {
klog.V(2).Infof("Destroying virtual cluster %s", virtualCluster.Name)
execute, err := NewExecutor(virtualCluster, c.Client, c.Config, c.HostPortManager)
execute, err := NewExecutor(virtualCluster, c.Client, c.Config)
if err != nil {
return err
}
Expand Down Expand Up @@ -453,3 +462,57 @@ func mapContains(big map[string]string, small map[string]string) bool {
}
return true
}

func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataKey string) (*HostPortPool, error) {
hostPorts, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), cmName, metav1.GetOptions{})
if err != nil {
return nil, err
}

yamlData, exist := hostPorts.Data[dataKey]
if !exist {
return nil, fmt.Errorf("key '%s' not found in ConfigMap '%s'", dataKey, cmName)
}

var hostPool HostPortPool
if err := yaml.Unmarshal([]byte(yamlData), &hostPool); err != nil {
return nil, err
}

return &hostPool, nil
}

func (c *VirtualClusterInitController) isPortAllocated(port int32) bool {
vcList := &v1alpha1.VirtualClusterList{}
err := c.List(context.Background(), vcList)
if err != nil {
klog.Errorf("list virtual cluster error: %v", err)
return false
}

for _, vc := range vcList.Items {
if vc.Status.Port == port {
return true
}
}

return false
}

func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1.VirtualCluster) (int32, error) {
c.lock.Lock()
defer c.lock.Unlock()

hostPool, err := GetHostPortPoolFromConfigMap(c.RootClientSet, constants.KosmosNs, constants.HostPortsCMName, constants.HostPortsCMDataName)
if err != nil {
return 0, err
}

for _, port := range hostPool.PortsPool {
if !c.isPortAllocated(port) {
virtualCluster.Status.Port = port
return port, nil
}
}
return 0, fmt.Errorf("no available ports to allocate")
}
15 changes: 2 additions & 13 deletions pkg/kubenest/controlplane/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,24 @@ import (
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/yaml"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
)

func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error {
port, err := manager.AllocateHostPort(name)
if err != nil {
return fmt.Errorf("failed to allocate host ip for virtual cluster apiserver, err: %w", err)
}

func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, port int32) error {
if err := installAPIServer(client, name, namespace, port); err != nil {
return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err)
}
return nil
}

func DeleteVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error {
func DeleteVirtualClusterAPIServer(client clientset.Interface, name, namespace string) error {
deployName := fmt.Sprintf("%s-%s", name, "apiserver")
if err := util.DeleteDeployment(client, deployName, namespace); err != nil {
return errors.Wrapf(err, "Failed to delete deployment %s/%s", deployName, namespace)
}
err := manager.ReleaseHostPort(name)
if err != nil {
klog.Warningf("Error releasing host port for cluster %s: %v", name, err)
}
return nil
}

Expand Down
Loading
Loading