Skip to content

Commit

Permalink
support multi cluster for one kubernetes
Browse files Browse the repository at this point in the history
Signed-off-by: duanmengkk <[email protected]>
  • Loading branch information
duanmengkk committed Aug 11, 2024
1 parent 0b5b75e commit b63545d
Show file tree
Hide file tree
Showing 13 changed files with 429 additions and 153 deletions.
75 changes: 43 additions & 32 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func leaderElectionRun(ctx context.Context, opts *options.Options) error {
}

func run(ctx context.Context, opts *options.Options) error {
globalleafManager := leafUtils.GetGlobalLeafResourceManager()
globalLeafResourceManager := leafUtils.GetGlobalLeafResourceManager()
globalLeafClientManager := leafUtils.GetGlobalLeafClientResourceManager()

config, err := clientcmd.BuildConfigFromFlags(opts.KubernetesOptions.Master, opts.KubernetesOptions.KubeConfig)
if err != nil {
Expand Down Expand Up @@ -163,13 +164,14 @@ func run(ctx context.Context, opts *options.Options) error {

// add cluster controller
clusterController := clusterManager.ClusterController{
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
RootClientset: rootClient,
EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName),
Options: opts,
RootResourceManager: rootResourceManager,
GlobalLeafManager: globalleafManager,
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
RootClientset: rootClient,
EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName),
Options: opts,
RootResourceManager: rootResourceManager,
GlobalLeafResourceManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
}
if err = clusterController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", clusterManager.ControllerName, err)
Expand All @@ -191,13 +193,14 @@ func run(ctx context.Context, opts *options.Options) error {

// add auto create mcs resources controller
autoCreateMCSController := mcs.AutoCreateMCSController{
RootClient: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.AutoCreateMCSControllerName),
Logger: mgr.GetLogger(),
AutoCreateMCSPrefix: opts.AutoCreateMCSPrefix,
RootKosmosClient: rootKosmosClient,
GlobalLeafManager: globalleafManager,
ReservedNamespaces: opts.ReservedNamespaces,
RootClient: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.AutoCreateMCSControllerName),
Logger: mgr.GetLogger(),
AutoCreateMCSPrefix: opts.AutoCreateMCSPrefix,
RootKosmosClient: rootKosmosClient,
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
ReservedNamespaces: opts.ReservedNamespaces,
}
if err = autoCreateMCSController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.AutoCreateMCSControllerName, err)
Expand All @@ -217,7 +220,9 @@ func run(ctx context.Context, opts *options.Options) error {

// init rootPodController
rootPodReconciler := podcontrollers.RootPodReconciler{
GlobalLeafManager: globalleafManager,
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,

RootClient: mgr.GetClient(),
DynamicRootClient: dynamicClient,
Options: opts,
Expand All @@ -227,35 +232,39 @@ func run(ctx context.Context, opts *options.Options) error {
}

rootPVCController := pvc.RootPVCController{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalleafManager,
RootClient: mgr.GetClient(),
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
}
if err := rootPVCController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting root pvc controller %v", err)
}

rootPVController := pv.RootPVController{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalleafManager,
RootClient: mgr.GetClient(),
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
}
if err := rootPVController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting root pv controller %v", err)
}

if len(os.Getenv("USE-ONEWAY-STORAGE")) > 0 {
onewayPVController := pv.OnewayPVController{
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
GlobalLeafManager: globalleafManager,
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
}
if err := onewayPVController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting oneway pv controller %v", err)
}

onewayPVCController := pvc.OnewayPVCController{
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
GlobalLeafManager: globalleafManager,
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
}
if err := onewayPVCController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting oneway pvc controller %v", err)
Expand All @@ -265,10 +274,11 @@ func run(ctx context.Context, opts *options.Options) error {
// init commonController
for i, gvr := range controllers.SYNC_GVRS {
commonController := controllers.SyncResourcesReconciler{
GlobalLeafManager: globalleafManager,
GroupVersionResource: gvr,
Object: controllers.SYNC_OBJS[i],
DynamicRootClient: dynamicClient,
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
GroupVersionResource: gvr,
Object: controllers.SYNC_OBJS[i],
DynamicRootClient: dynamicClient,
// DynamicLeafClient: clientDynamic,
ControllerName: "async-controller-" + gvr.Resource,
// Namespace: cluster.Spec.Namespace,
Expand All @@ -286,8 +296,9 @@ func run(ctx context.Context, opts *options.Options) error {
}()

nodeServer := nodeserver.NodeServer{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalleafManager,
RootClient: mgr.GetClient(),
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
}
go func() {
if err := nodeServer.Start(ctx, opts); err != nil {
Expand Down
27 changes: 16 additions & 11 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ type ClusterController struct {

RootResourceManager *utils.ResourceManager

GlobalLeafManager leafUtils.LeafResourceManager
GlobalLeafResourceManager leafUtils.LeafResourceManager

GlobalLeafClientManager leafUtils.LeafClientResourceManager

LeafModelHandler leafUtils.LeafModelHandler
}
Expand Down Expand Up @@ -232,7 +234,7 @@ func (c *ClusterController) clearClusterControllers(cluster *kosmosv1alpha1.Clus
delete(c.ManagerCancelFuncs, cluster.Name)
delete(c.ControllerManagers, cluster.Name)

c.GlobalLeafManager.RemoveLeafResource(cluster.Name)
c.GlobalLeafResourceManager.RemoveLeafResource(cluster.Name)
}

func (c *ClusterController) setupControllers(
Expand All @@ -244,22 +246,25 @@ func (c *ClusterController) setupControllers(
leafClientset kubernetes.Interface,
leafKosmosClient kosmosversioned.Interface,
leafRestConfig *rest.Config) error {
c.GlobalLeafManager.AddLeafResource(&leafUtils.LeafResource{
Client: mgr.GetClient(),
DynamicClient: clientDynamic,
Clientset: leafClientset,
KosmosClient: leafKosmosClient,
ClusterName: cluster.Name,
c.GlobalLeafResourceManager.AddLeafResource(&leafUtils.LeafResource{
Cluster: cluster,
// TODO: define node options
Namespace: "",
IgnoreLabels: strings.Split("", ","),
EnableServiceAccount: true,
RestConfig: leafRestConfig,
}, cluster, nodes)
}, nodes)

c.GlobalLeafClientManager.AddLeafClientResource(&leafUtils.LeafClientResource{
Client: mgr.GetClient(),
DynamicClient: clientDynamic,
Clientset: leafClientset,
KosmosClient: leafKosmosClient,
RestConfig: leafRestConfig,
}, cluster)

nodeResourcesController := controllers.NodeResourcesController{
Leaf: mgr.GetClient(),
GlobalLeafManager: c.GlobalLeafManager,
GlobalLeafManager: c.GlobalLeafResourceManager,
Root: c.Root,
RootClientset: c.RootClientset,
Nodes: nodes,
Expand Down
23 changes: 20 additions & 3 deletions pkg/clustertree/cluster-manager/controllers/common_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -40,7 +41,8 @@ type SyncResourcesReconciler struct {

client.Client

GlobalLeafManager leafUtils.LeafResourceManager
GlobalLeafManager leafUtils.LeafResourceManager
GlobalLeafClientManager leafUtils.LeafClientResourceManager
}

func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand All @@ -65,7 +67,13 @@ func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconci
klog.Errorf("get lr(cluster: %s) err: %v", cluster, err)
return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil
}
if err = r.SyncResource(ctx, request, lr); err != nil {
lcr, err := r.leafClientResource(lr)
if err != nil {
klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

if err = r.SyncResource(ctx, request, lcr); err != nil {
klog.Errorf("sync resource %s error: %v", request.NamespacedName, err)
return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil
}
Expand Down Expand Up @@ -114,7 +122,7 @@ func (r *SyncResourcesReconciler) SetupWithManager(mgr manager.Manager, gvr sche
return nil
}

func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reconcile.Request, lr *leafUtils.LeafResource) error {
func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reconcile.Request, lr *leafUtils.LeafClientResource) error {
klog.V(4).Infof("Started sync resource processing, ns: %s, name: %s", request.Namespace, request.Name)

deleteSecretInClient := false
Expand Down Expand Up @@ -191,3 +199,12 @@ func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reco
}
return nil
}

func (r *SyncResourcesReconciler) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) {
actualClusterName := leafUtils.GetActualClusterName(lr.Cluster)
lcr, err := r.GlobalLeafClientManager.GetLeafResource(actualClusterName)
if err != nil {
return nil, fmt.Errorf("get leaf client resource err: %v", err)
}
return lcr, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ const AutoCreateMCSControllerName = "auto-mcs-controller"

// AutoCreateMCSController watches services in root cluster and auto create serviceExport and serviceImport in leaf cluster
type AutoCreateMCSController struct {
RootClient client.Client
RootKosmosClient kosmosversioned.Interface
EventRecorder record.EventRecorder
Logger logr.Logger
GlobalLeafManager clustertreeutils.LeafResourceManager
RootClient client.Client
RootKosmosClient kosmosversioned.Interface
EventRecorder record.EventRecorder
Logger logr.Logger
GlobalLeafManager clustertreeutils.LeafResourceManager
GlobalLeafClientManager clustertreeutils.LeafClientResourceManager
// AutoCreateMCSPrefix are the prefix of the namespace for service to auto create in leaf cluster
AutoCreateMCSPrefix []string
// ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources
Expand Down Expand Up @@ -225,11 +226,13 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(namespace string, name str
continue
}

leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster.Name)
actualClusterName := clustertreeutils.GetActualClusterName(newCluster)
leafManager, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName)
if err != nil {
klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err)
return err
}

if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Delete serviceImport in leaf cluster failed %s/%s, Error: %v", namespace, name, err)
Expand Down Expand Up @@ -262,9 +265,10 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(service *corev1.Service
continue
}

leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster.Name)
actualClusterName := clustertreeutils.GetActualClusterName(newCluster)
leafManager, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName)
if err != nil {
klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err)
klog.Errorf("get leafClientManager for cluster %s failed,Error: %v", cluster.Name, err)
return err
}

Expand Down
Loading

0 comments on commit b63545d

Please sign in to comment.