Skip to content

Commit

Permalink
feat: externalIp support array and optimize api-server-external-service
Browse files Browse the repository at this point in the history
Signed-off-by: qiuwei <[email protected]>
  • Loading branch information
qiuwei68 committed Aug 6, 2024
1 parent a8c7eae commit c827cb7
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 82 deletions.
6 changes: 6 additions & 0 deletions deploy/crds/kosmos.io_virtualclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ spec:
description: ExternalIP is the external ip of the virtual kubernetes's
control plane
type: string
externalIps:
description: ExternalIps is the external ips of the virtual kubernetes's
control plane
items:
type: string
type: array
kubeconfig:
description: Kubeconfig is the kubeconfig of the virtual kubernetes's
control plane
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kosmos/v1alpha1/virtualcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type VirtualClusterSpec struct {
// +optional
ExternalIP string `json:"externalIP,omitempty"`

// ExternalIps is the external ips of the virtual kubernetes's control plane
// +optional
ExternalIps []string `json:"externalIps,omitempty"`

// PromotePolicies definites the policies for promote to the kubernetes's control plane
// +required
PromotePolicies []PromotePolicy `json:"promotePolicies,omitempty"`
Expand Down
144 changes: 66 additions & 78 deletions pkg/kubenest/controlplane/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,85 +6,77 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/virtualcluster"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
)

func EnsureApiServerExternalEndPoint(dynamicClient dynamic.Interface) error {
err := installApiServerExternalEndpointInVirtualCluster(dynamicClient)
func EnsureApiServerExternalEndPoint(kubeClient kubernetes.Interface) error {
err := CreateOrUpdateApiServerExternalEndpoint(kubeClient)
if err != nil {
return err
}

err = installApiServerExternalServiceInVirtualCluster(dynamicClient)
err = CreateOrUpdateApiServerExternalService(kubeClient)
if err != nil {
return err
}
return nil
}

func installApiServerExternalEndpointInVirtualCluster(dynamicClient dynamic.Interface) error {
func CreateOrUpdateApiServerExternalEndpoint(kubeClient kubernetes.Interface) error {
klog.V(4).Info("begin to get kubernetes endpoint")
kubeEndpointUnstructured, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
kubeEndpoint, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
klog.Error("get Kubernetes endpoint failed", err)
return errors.Wrap(err, "failed to get kubernetes endpoint")
}
klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpointUnstructured)
klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpoint)

if kubeEndpointUnstructured != nil {
kubeEndpoint := &corev1.Endpoints{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeEndpointUnstructured.Object, kubeEndpoint)
if err != nil {
klog.Error("switch Kubernetes endpoint to typed object failed", err)
return errors.Wrap(err, "failed to convert kubernetes endpoint to typed object")
newEndpoint := kubeEndpoint.DeepCopy()
newEndpoint.Name = constants.ApiServerExternalService
newEndpoint.Namespace = constants.DefaultNs
newEndpoint.ResourceVersion = ""

// Try to create the endpoint
_, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Create(context.TODO(), newEndpoint, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Error("create api-server-external-service endpoint failed", err)
return errors.Wrap(err, "failed to create api-server-external-service endpoint")
}

newEndpoint := kubeEndpoint.DeepCopy()
newEndpoint.Name = constants.ApiServerExternalService
newEndpoint.Namespace = constants.DefaultNs
newEndpoint.ResourceVersion = ""
newEndpointUnstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newEndpoint)
// Endpoint already exists, retrieve it
existingEndpoint, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil {
klog.Error("switch new endpoint to unstructured object failed", err)
return errors.Wrap(err, "failed to convert new endpoint to unstructured object")
klog.Error("get existing api-server-external-service endpoint failed", err)
return errors.Wrap(err, "failed to get existing api-server-external-service endpoint")
}
klog.V(4).Info("after switch the Endpoint unstructured is:", newEndpointUnstructuredObj)

newEndpointUnstructured := &unstructured.Unstructured{Object: newEndpointUnstructuredObj}
createResult, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs).Create(context.TODO(), newEndpointUnstructured, metav1.CreateOptions{})

// Update the existing endpoint
newEndpoint.SetResourceVersion(existingEndpoint.ResourceVersion)
_, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Update(context.TODO(), newEndpoint, metav1.UpdateOptions{})
if err != nil {
klog.Error("create api-server-external-service endpoint failed", err)
return errors.Wrap(err, "failed to create api-server-external-service endpoint")
klog.Error("update api-server-external-service endpoint failed", err)
return errors.Wrap(err, "failed to update api-server-external-service endpoint")
} else {
klog.V(4).Info("success create api-server-external-service endpoint:", createResult)
klog.V(4).Info("successfully updated api-server-external-service endpoint")
}
} else {
return errors.New("kubernetes endpoint does not exist")
klog.V(4).Info("successfully created api-server-external-service endpoint")
}

return nil
}

func installApiServerExternalServiceInVirtualCluster(dynamicClient dynamic.Interface) error {
port, err := getEndPointPort(dynamicClient)
func CreateOrUpdateApiServerExternalService(kubeClient kubernetes.Interface) error {
port, err := getEndPointPort(kubeClient)
if err != nil {
return fmt.Errorf("error when getEndPointPort: %w", err)
}
Expand All @@ -97,62 +89,58 @@ func installApiServerExternalServiceInVirtualCluster(dynamicClient dynamic.Inter
return fmt.Errorf("error when parsing api-server-external-serive template: %w", err)
}

var obj unstructured.Unstructured
if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &obj); err != nil {
return fmt.Errorf("err when decoding api-server-external service in virtual cluster: %w", err)
var svc corev1.Service
if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &svc); err != nil {
return fmt.Errorf("err when decoding api-server-external-service in virtual cluster: %w", err)
}

err = util.CreateObject(dynamicClient, "default", "api-server-external-service", &obj)
// Try to create the service
_, err = kubeClient.CoreV1().Services(constants.DefaultNs).Create(context.TODO(), &svc, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("error when creating api-server-external service in virtual cluster err: %w", err)
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create api-server-external-service service: %w", err)
}

// Service already exists, retrieve it
existingSvc, err := kubeClient.CoreV1().Services(constants.DefaultNs).Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get existing api-server-external-service service: %w", err)
}

// Update the existing service
svc.ResourceVersion = existingSvc.ResourceVersion
_, err = kubeClient.CoreV1().Services(constants.DefaultNs).Update(context.TODO(), &svc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update api-server-external-service service: %w", err)
}
klog.V(4).Info("successfully updated api-server-external-service service")
} else {
klog.V(4).Info("successfully created api-server-external-service service")
}

return nil
}

func getEndPointPort(dynamicClient dynamic.Interface) (int32, error) {
func getEndPointPort(kubeClient kubernetes.Interface) (int32, error) {
klog.V(4).Info("begin to get Endpoints ports...")
endpointsRes := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs)

endpointsRaw, err := endpointsRes.Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
endpoints, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil {
klog.Errorf("get Endpoints failed: %v", err)
return 0, err
}

subsets, found, err := unstructured.NestedSlice(endpointsRaw.Object, "subsets")
if !found || err != nil {
klog.Errorf("The subsets field was not found or parsing error occurred: %v", err)
return 0, fmt.Errorf("subsets field not found or error parsing it")
}

if len(subsets) == 0 {
if len(endpoints.Subsets) == 0 {
klog.Errorf("subsets is empty")
return 0, fmt.Errorf("No subsets found in the endpoints")
}

subset := subsets[0].(map[string]interface{})
ports, found, err := unstructured.NestedSlice(subset, "ports")
if !found || err != nil {
klog.Errorf("ports field not found or parsing error: %v", err)
return 0, fmt.Errorf("ports field not found or error parsing it")
}

if len(ports) == 0 {
subset := endpoints.Subsets[0]
if len(subset.Ports) == 0 {
klog.Errorf("Port not found in the endpoint")
return 0, fmt.Errorf("No ports found in the endpoint")
}

port := ports[0].(map[string]interface{})
portNum, found, err := unstructured.NestedInt64(port, "port")
if !found || err != nil {
klog.Errorf("ports field not found or parsing error: %v", err)
return 0, fmt.Errorf("port field not found or error parsing it")
}

klog.V(4).Infof("The port number was successfully obtained: %d", portNum)
return int32(portNum), nil
port := subset.Ports[0].Port
klog.V(4).Infof("The port number was successfully obtained: %d", port)
return port, nil
}
4 changes: 4 additions & 0 deletions pkg/kubenest/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type initData struct {
virtualClusterDataDir string
privateRegistry string
externalIP string
externalIps []string
hostPort int32
hostPortMap map[string]int32
kubeNestOptions *ko.KubeNestOptions
Expand Down Expand Up @@ -185,6 +186,7 @@ func newRunData(opt *InitOptions) (*initData, error) {
privateRegistry: utils.DefaultImageRepository,
CertStore: cert.NewCertStore(),
externalIP: opt.virtualCluster.Spec.ExternalIP,
externalIps: opt.virtualCluster.Spec.ExternalIps,
hostPort: opt.virtualCluster.Status.Port,
hostPortMap: opt.virtualCluster.Status.PortMap,
kubeNestOptions: opt.KubeNestOptions,
Expand Down Expand Up @@ -250,6 +252,8 @@ func (i initData) ExternalIP() string {
return i.externalIP
}

func (i initData) ExternalIPs() []string { return i.externalIps }

func (i initData) HostPort() int32 {
return i.hostPort
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kubenest/tasks/cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func mutateCertConfig(data InitData, cc *cert.CertConfig) error {
ControlplaneAddr: data.ControlplaneAddress(),
ClusterIps: data.ServiceClusterIp(),
ExternalIP: data.ExternalIP(),
ExternalIPs: data.ExternalIPs(),
}, cc)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/kubenest/tasks/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type InitData interface {
DataDir() string
VirtualCluster() *v1alpha1.VirtualCluster
ExternalIP() string
ExternalIPs() []string
HostPort() int32
HostPortMap() map[string]int32
DynamicClient() *dynamic.DynamicClient
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubenest/tasks/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package tasks
import (
"context"
"fmt"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -54,12 +53,13 @@ func runEndPointInVirtualClusterTask(r workflow.RunData) error {
if err != nil {
return err
}
dynamicClient, err := dynamic.NewForConfig(config)

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}

err = controlplane.EnsureApiServerExternalEndPoint(dynamicClient)
err = controlplane.EnsureApiServerExternalEndPoint(kubeClient)
if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/kubenest/util/cert/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type AltNamesMutatorConfig struct {
ControlplaneAddr string
ClusterIps []string
ExternalIP string
ExternalIPs []string
}

func (config *CertConfig) defaultPublicKeyAlgorithm() {
Expand Down Expand Up @@ -232,6 +233,13 @@ func proxyServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames,
if len(cfg.ExternalIP) > 0 {
appendSANsToAltNames(altNames, []string{cfg.ExternalIP})
}

if len(cfg.ExternalIPs) > 0 {
for _, externalIp := range cfg.ExternalIPs {
appendSANsToAltNames(altNames, []string{externalIp})
}
}

if len(cfg.ClusterIps) > 0 {
for _, clusterIp := range cfg.ClusterIps {
appendSANsToAltNames(altNames, []string{clusterIp})
Expand Down Expand Up @@ -273,6 +281,13 @@ func apiServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, e
if len(cfg.ExternalIP) > 0 {
appendSANsToAltNames(altNames, []string{cfg.ExternalIP})
}

if len(cfg.ExternalIPs) > 0 {
for _, externalIp := range cfg.ExternalIPs {
appendSANsToAltNames(altNames, []string{externalIp})
}
}

if len(cfg.ClusterIps) > 0 {
for _, clusterIp := range cfg.ClusterIps {
appendSANsToAltNames(altNames, []string{clusterIp})
Expand Down

0 comments on commit c827cb7

Please sign in to comment.