Skip to content

Commit

Permalink
cleanup: add finalizers for mcs to make code graceful
Browse files Browse the repository at this point in the history
Signed-off-by: duanmengkk <[email protected]>
  • Loading branch information
duanmengkk committed Dec 25, 2023
1 parent 04938bf commit 5888413
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 107 deletions.
2 changes: 2 additions & 0 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func run(ctx context.Context, opts *options.Options) error {
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
Logger: mgr.GetLogger(),
ReservedNamespaces: opts.ReservedNamespaces,
RateLimiterOptions: opts.RateLimiterOpts,
BackoffOptions: opts.BackoffOpts,
}
if err = ServiceExportController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err)
Expand Down
13 changes: 13 additions & 0 deletions cmd/clustertree/cluster-manager/app/options/options.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package options

import (
"time"

"github.com/spf13/pflag"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/component-base/config/options"
componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"

"github.com/kosmos.io/kosmos/pkg/utils/flags"
)

const (
Expand Down Expand Up @@ -40,6 +44,12 @@ type Options struct {

// ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources
ReservedNamespaces []string

RateLimiterOpts flags.Options

BackoffOpts flags.BackoffOptions

SyncPeriod time.Duration
}

type KubernetesOptions struct {
Expand Down Expand Up @@ -82,5 +92,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.OnewayStorageControllers, "oneway-storage-controllers", false, "Turn on or off oneway storage controllers.")
flags.StringSliceVar(&o.AutoCreateMCSPrefix, "auto-mcs-prefix", []string{}, "The prefix of namespace for service to auto create mcs resources")
flags.StringSliceVar(&o.ReservedNamespaces, "reserved-namespaces", []string{"kube-system"}, "The namespaces protected by Kosmos that the controller-manager will skip.")
flags.DurationVar(&o.SyncPeriod, "sync-period", 0, "the sync period for informer to resync.")
o.RateLimiterOpts.AddFlags(flags)
o.BackoffOpts.AddFlags(flags)
options.BindLeaderElectionFlags(&o.LeaderElection, flags)
}
12 changes: 7 additions & 5 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, fmt.Errorf("could not build dynamic client for cluster %s: %v", cluster.Name, err)
}

kosmosClient, err := kosmosversioned.NewForConfig(config)
leafKosmosClient, err := kosmosversioned.NewForConfig(config)
if err != nil {
return reconcile.Result{}, fmt.Errorf("could not build kosmos clientset for cluster %s: %v", cluster.Name, err)
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
c.ManagerCancelFuncs[cluster.Name] = &cancel
c.ControllerManagersLock.Unlock()

if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil {
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, leafKosmosClient, config); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
}

Expand Down Expand Up @@ -242,13 +242,13 @@ func (c *ClusterController) setupControllers(
clientDynamic *dynamic.DynamicClient,
leafNodeSelector map[string]kosmosv1alpha1.NodeSelector,
leafClientset kubernetes.Interface,
kosmosClient kosmosversioned.Interface,
leafKosmosClient kosmosversioned.Interface,
leafRestConfig *rest.Config) error {
c.GlobalLeafManager.AddLeafResource(&leafUtils.LeafResource{
Client: mgr.GetClient(),
DynamicClient: clientDynamic,
Clientset: leafClientset,
KosmosClient: kosmosClient,
KosmosClient: leafKosmosClient,
ClusterName: cluster.Name,
// TODO: define node options
Namespace: "",
Expand Down Expand Up @@ -279,14 +279,16 @@ func (c *ClusterController) setupControllers(
if c.Options.MultiClusterService {
serviceImportController := &mcs.ServiceImportController{
LeafClient: mgr.GetClient(),
RootKosmosClient: kosmosClient,
LeafKosmosClient: leafKosmosClient,
EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName),
Logger: mgr.GetLogger(),
LeafNodeName: cluster.Name,
// todo @wyz
IPFamilyType: cluster.Spec.ClusterLinkOptions.IPFamily,
RootResourceManager: c.RootResourceManager,
ReservedNamespaces: c.Options.ReservedNamespaces,
BackoffOptions: c.Options.BackoffOpts,
SyncPeriod: c.Options.SyncPeriod,
}
if err := serviceImportController.AddController(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ func (c *AutoCreateMCSController) Reconcile(ctx context.Context, request reconci

// The service is being deleted, in which case we should clear serviceExport and serviceImport.
if shouldDelete || !service.DeletionTimestamp.IsZero() {
if err := c.cleanUpMcsResources(ctx, request.Namespace, request.Name, clusterList); err != nil {
if err := c.cleanUpMcsResources(request.Namespace, request.Name, clusterList); err != nil {
klog.Errorf("Cleanup MCS resources failed, err: %v", err)
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
return controllerruntime.Result{}, nil
}

err := c.autoCreateMcsResources(ctx, service, clusterList)
err := c.autoCreateMcsResources(service, clusterList)
if err != nil {
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
Expand Down Expand Up @@ -209,9 +210,9 @@ func (c *AutoCreateMCSController) SetupWithManager(mgr manager.Manager) error {
Complete(c)
}

func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, namespace string, name string, clusterList *kosmosv1alpha1.ClusterList) error {
func (c *AutoCreateMCSController) cleanUpMcsResources(namespace string, name string, clusterList *kosmosv1alpha1.ClusterList) error {
// delete serviceExport in root cluster
if err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
if err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Delete serviceExport in root cluster failed %s/%s, Error: %v", namespace, name, err)
return err
Expand All @@ -229,7 +230,7 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, names
klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err)
return err
}
if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Delete serviceImport in leaf cluster failed %s/%s, Error: %v", namespace, name, err)
return err
Expand All @@ -239,15 +240,15 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, names
return nil
}

func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, service *corev1.Service, clusterList *kosmosv1alpha1.ClusterList) error {
func (c *AutoCreateMCSController) autoCreateMcsResources(service *corev1.Service, clusterList *kosmosv1alpha1.ClusterList) error {
// create serviceExport in root cluster
serviceExport := &mcsv1alpha1.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Namespace: service.Namespace,
},
}
if _, err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(service.Namespace).Create(ctx, serviceExport, metav1.CreateOptions{}); err != nil {
if _, err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(service.Namespace).Create(context.TODO(), serviceExport, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Could not create serviceExport(%s/%s) in root cluster, Error: %v", service.Namespace, service.Name, err)
return err
Expand All @@ -266,6 +267,14 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se
klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err)
return err
}

if err = c.createNamespace(leafManager.Client, service.Namespace); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Create namespace %s in leaf cluster failed, Error: %v", service.Namespace, err)
return err
}
}

serviceImport := &mcsv1alpha1.ServiceImport{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Expand All @@ -281,7 +290,7 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se
},
},
}
if _, err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(service.Namespace).Create(ctx, serviceImport, metav1.CreateOptions{}); err != nil {
if _, err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(service.Namespace).Create(context.TODO(), serviceImport, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Create serviceImport in leaf cluster failed %s/%s, Error: %v", service.Namespace, service.Name, err)
return err
Expand All @@ -290,3 +299,16 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se
}
return nil
}

func (c *AutoCreateMCSController) createNamespace(client client.Client, namespace string) error {
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
},
}
err := client.Create(context.TODO(), ns)
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit 5888413

Please sign in to comment.