Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add select host port for apiserver #479

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/kubenest/constants/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
EtcdDataVolumeName = "etcd-data"
EtcdListenClientPort = 2379
EtcdListenPeerPort = 2380
EtcdSuffix = "-etcd-client"

//controlplane kube-controller
KubeControllerReplicas = 2
Expand Down Expand Up @@ -80,6 +81,10 @@ const (
// DeInitAction represents delete virtual cluster instance
DeInitAction Action = "deInit"

//host_port_manager
HostPortsCMName = "kosmos-hostports"
HostPortsCMDataName = "config.yaml"

ManifestComponentsConfigmap = "components-manifest-cm"
NodePoolConfigmap = "node-pool"
NodeShareState = "share"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
)

/**
Expand Down Expand Up @@ -46,13 +48,11 @@ type ClusterPort struct {
}

func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) {
//todo magic Value
hostPorts, err := client.CoreV1().ConfigMaps("kosmos-system").Get(context.TODO(), "kosmos-hostports", metav1.GetOptions{})
hostPorts, err := client.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.HostPortsCMName, metav1.GetOptions{})
if err != nil {
return nil, err
}
//todo magic Value
yamlData, exist := hostPorts.Data["config.yaml"]
yamlData, exist := hostPorts.Data[constants.HostPortsCMDataName]
if !exist {
return nil, fmt.Errorf("hostports not found in configmap")
}
Expand All @@ -68,35 +68,72 @@ func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) {
return manager, nil
}

func (m *HostPortManager) AllocateHostIP(clusterName string) (int32, error) {
func (m *HostPortManager) AllocateHostPort(clusterName string) (int32, error) {
m.lock.Lock()
defer m.lock.Unlock()

//使用临时变量存储原来的cm
oldHostPool := m.HostPortPool

for _, port := range m.HostPortPool.PortsPool {
if !m.isPortAllocated(port) {
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
return port, nil
err := updateConfigMapAndRollback(m, oldHostPool)
if err != nil {
return 0, err
}
return port, err
}
}
// todo 更新 cm
return 0, fmt.Errorf("no available ports to allocate")
}

func (m *HostPortManager) ReleaseHostIP(clusterName string) error {
func (m *HostPortManager) ReleaseHostPort(clusterName string) error {
m.lock.Lock()
defer m.lock.Unlock()

oldHostPool := m.HostPortPool

for i, cp := range m.HostPortPool.ClusterPorts {
if cp.Cluster == clusterName {
// Remove the entry from the slice
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts[:i], m.HostPortPool.ClusterPorts[i+1:]...)
err := updateConfigMapAndRollback(m, oldHostPool)
if err != nil {
return err
}
return nil
}
}
// todo 更新 cm
return fmt.Errorf("no port found for cluster %s", clusterName)
}

func updateConfigMapAndRollback(m *HostPortManager, oldHostPool *HostPortPool) error {
data, err := yaml.Marshal(m.HostPortPool)
if err != nil {
m.HostPortPool = oldHostPool
return err
}

configMap, err := m.kubeClient.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.HostPortsCMName, metav1.GetOptions{})
if err != nil {
m.HostPortPool = oldHostPool
return err
}

configMap.Data[constants.HostPortsCMDataName] = string(data)

_, updateErr := m.kubeClient.CoreV1().ConfigMaps(constants.KosmosNs).Update(context.TODO(), configMap, metav1.UpdateOptions{})

if updateErr != nil {
// 回滚 HostPortPool
m.HostPortPool = oldHostPool
return updateErr
}

return nil
}

func (m *HostPortManager) isPortAllocated(port int32) bool {
for _, cp := range m.HostPortPool.ClusterPorts {
if cp.Port == port {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) err

func (c *VirtualClusterInitController) Update(original, updated *v1alpha1.VirtualCluster) error {
now := metav1.Now()
updated.Status.TimeStamp = &now
updated.Status.UpdateTime = &now
return c.Client.Patch(context.TODO(), updated, client.MergeFrom(original))
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/kubenest/controlplane/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error {
_, err := manager.AllocateHostIP(name)
port, err := manager.AllocateHostPort(name)
if err != nil {
return fmt.Errorf("failed to allocate host ip for virtual cluster apiserver, err: %w", err)
}

if err := installAPIServer(client, name, namespace); err != nil {
if err := installAPIServer(client, name, namespace, port); err != nil {
return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err)
}
return nil
Expand All @@ -35,9 +35,9 @@ func DeleteVirtualClusterAPIServer(client clientset.Interface, name, namespace s
return nil
}

func installAPIServer(client clientset.Interface, name, namespace string) error {
func installAPIServer(client clientset.Interface, name, namespace string, port int32) error {
imageRepository, imageVersion := util.GetImageMessage()
err, clusterIps := util.GetEtcdServiceClusterIp(namespace, client)
clusterIp, err := util.GetEtcdServiceClusterIp(namespace, name+constants.EtcdSuffix, client)
if err != nil {
return nil
}
Expand All @@ -47,17 +47,19 @@ func installAPIServer(client clientset.Interface, name, namespace string) error
ServiceSubnet, VirtualClusterCertsSecret, EtcdCertsSecret string
Replicas int32
EtcdListenClientPort int32
ClusterPort int32
}{
DeploymentName: fmt.Sprintf("%s-%s", name, "apiserver"),
Namespace: namespace,
ImageRepository: imageRepository,
Version: imageVersion,
EtcdClientService: clusterIps[1],
EtcdClientService: clusterIp,
ServiceSubnet: constants.ApiServerServiceSubnet,
VirtualClusterCertsSecret: fmt.Sprintf("%s-%s", name, "cert"),
EtcdCertsSecret: fmt.Sprintf("%s-%s", name, "etcd-cert"),
Replicas: constants.ApiServerReplicas,
EtcdListenClientPort: constants.ApiServerEtcdListenClientPort,
ClusterPort: port,
})
if err != nil {
return fmt.Errorf("error when parsing virtual cluster apiserver deployment template: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ spec:
- --kubelet-client-certificate=/etc/virtualcluster/pki/virtualCluster.crt
- --kubelet-client-key=/etc/virtualcluster/pki/virtualCluster.key
- --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname
- --secure-port=5443
- --secure-port={{ .ClusterPort }}
- --service-account-issuer=https://kubernetes.default.svc.cluster.local
- --service-account-key-file=/etc/virtualcluster/pki/virtualCluster.key
- --service-account-signing-key-file=/etc/virtualcluster/pki/virtualCluster.key
Expand All @@ -64,7 +64,7 @@ spec:
failureThreshold: 8
httpGet:
path: /livez
port: 5443
port: {{ .ClusterPort }}
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
Expand All @@ -74,7 +74,7 @@ spec:
failureThreshold: 3
httpGet:
path: /readyz
port: 5443
port: {{ .ClusterPort }}
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
Expand All @@ -91,7 +91,7 @@ spec:
- apiserver
topologyKey: kubernetes.io/hostname
ports:
- containerPort: 5443
- containerPort: {{ .ClusterPort }}
name: http
protocol: TCP
volumeMounts:
Expand Down
20 changes: 9 additions & 11 deletions pkg/kubenest/util/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,16 @@ func GetServiceClusterIp(namespace string, client clientset.Interface) (error, [
return nil, clusterIps
}

func GetEtcdServiceClusterIp(namespace string, client clientset.Interface) (error, []string) {
serviceLists, err := client.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{})
func GetEtcdServiceClusterIp(namespace string, serviceName string, client clientset.Interface) (string, error) {
service, err := client.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
return err, nil
return "", err
}
var clusterIps []string
if serviceLists != nil {
for _, service := range serviceLists.Items {
if service.Spec.Type == constants.EtcdServiceType && service.Spec.ClusterIP != "" {
clusterIps = append(clusterIps, service.Spec.ClusterIP)
}
}

// 检查服务是否是期望的类型并且具有有效的 ClusterIP
if service.Spec.Type == constants.EtcdServiceType && service.Spec.ClusterIP != "" {
return service.Spec.ClusterIP, nil
}
return nil, clusterIps

return "", fmt.Errorf("Service %s not found or does not have a valid ClusterIP for Etcd", serviceName)
}
Loading