Skip to content

Commit

Permalink
feat: Synchronize the pod environment variables of the main cluster t…
Browse files Browse the repository at this point in the history
…o the sub cluster

Signed-off-by: OrangeBao <[email protected]>
  • Loading branch information
OrangeBao committed Oct 27, 2023
1 parent 77baedf commit 3f4ec13
Show file tree
Hide file tree
Showing 9 changed files with 895 additions and 107 deletions.
40 changes: 19 additions & 21 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ type ClusterController struct {
Logger logr.Logger
Options *options.Options

ControllerManagers map[string]*manager.Manager
ControllerManagers map[string]manager.Manager
ManagerCancelFuncs map[string]*context.CancelFunc
ControllerManagersLock sync.Mutex

mgr *manager.Manager
RootResourceManager *utils.ResourceManager
mgr manager.Manager
}

func isRootCluster(cluster *clusterlinkv1alpha1.Cluster) bool {
Expand Down Expand Up @@ -111,11 +111,11 @@ var predicatesFunc = predicate.Funcs{

func (c *ClusterController) SetupWithManager(mgr manager.Manager) error {
c.ManagerCancelFuncs = make(map[string]*context.CancelFunc)
c.ControllerManagers = make(map[string]*manager.Manager)
c.ControllerManagers = make(map[string]manager.Manager)
c.Logger = mgr.GetLogger()

// TODO this may not be a good idea
c.mgr = &mgr
c.mgr = mgr
return controllerruntime.NewControllerManagedBy(mgr).
Named(ControllerName).
WithOptions(controller.Options{}).
Expand Down Expand Up @@ -209,11 +209,11 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
subContext, cancel := context.WithCancel(ctx)

c.ControllerManagersLock.Lock()
c.ControllerManagers[cluster.Name] = &mgr
c.ControllerManagers[cluster.Name] = mgr
c.ManagerCancelFuncs[cluster.Name] = &cancel
c.ControllerManagersLock.Unlock()

if err = c.setupControllers(&mgr, cluster, node, leafDynamic, leafClient, kosmosClient); err != nil {
if err = c.setupControllers(mgr, cluster, node, leafDynamic, leafClient, kosmosClient); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
}

Expand All @@ -240,8 +240,7 @@ func (c *ClusterController) clearClusterControllers(cluster *clusterlinkv1alpha1
delete(c.ControllerManagers, cluster.Name)
}

func (c *ClusterController) setupControllers(m *manager.Manager, cluster *clusterlinkv1alpha1.Cluster, node *corev1.Node, clientDynamic *dynamic.DynamicClient, leafClient kubernetes.Interface, kosmosClient kosmosversioned.Interface) error {
mgr := *m
func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *clusterlinkv1alpha1.Cluster, node *corev1.Node, clientDynamic *dynamic.DynamicClient, leafClient kubernetes.Interface, kosmosClient kosmosversioned.Interface) error {
nodeResourcesController := controllers.NodeResourcesController{
Leaf: mgr.GetClient(),
Root: c.Root,
Expand Down Expand Up @@ -282,33 +281,34 @@ func (c *ClusterController) setupControllers(m *manager.Manager, cluster *cluste
DynamicRootClient: c.RootDynamic,
DynamicLeafClient: clientDynamic,
}
if err := RootPodReconciler.SetupWithManager(*c.mgr); err != nil {
if err := RootPodReconciler.SetupWithManager(c.mgr); err != nil {
return fmt.Errorf("error starting RootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err)
}

podUpstreamController := podcontrollers.LeafPodReconciler{
leafPodController := podcontrollers.LeafPodReconciler{
RootClient: c.Root,
Namespace: cluster.Spec.Namespace,
}

if err := podUpstreamController.SetupWithManager(*c.mgr); err != nil {
if err := leafPodController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting podUpstreamReconciler %s: %v", podcontrollers.LeafPodControllerName, err)
}

err := c.setupStorageControllers(m, node, leafClient)
err := c.setupStorageControllers(mgr, node, leafClient)
if err != nil {
return err
}

for i, gvr := range podcontrollers.SYNC_GVRS {
demoController := podcontrollers.SyncResourcesReconciler{
for i, gvr := range controllers.SYNC_GVRS {
demoController := controllers.SyncResourcesReconciler{
GroupVersionResource: gvr,
Object: podcontrollers.SYNC_OBJS[i],
Object: controllers.SYNC_OBJS[i],
DynamicRootClient: c.RootDynamic,
DynamicLeafClient: clientDynamic,
ControllerName: "async-controller-" + gvr.Resource,
Namespace: cluster.Spec.Namespace,
}
if err := demoController.SetupWithManager(mgr, gvr); err != nil {
if err := demoController.SetupWithManager(c.mgr, gvr); err != nil {
klog.Errorf("Unable to create cluster node controller: %v", err)
return err
}
Expand All @@ -317,15 +317,13 @@ func (c *ClusterController) setupControllers(m *manager.Manager, cluster *cluste
return nil
}

func (c *ClusterController) setupStorageControllers(m *manager.Manager, node *corev1.Node, leafClient kubernetes.Interface) error {
mgr := *m

func (c *ClusterController) setupStorageControllers(mgr manager.Manager, node *corev1.Node, leafClient kubernetes.Interface) error {
rootPVCController := pvc.RootPVCController{
LeafClient: mgr.GetClient(),
RootClient: c.Root,
LeafClientSet: leafClient,
}
if err := rootPVCController.SetupWithManager(*c.mgr); err != nil {
if err := rootPVCController.SetupWithManager(c.mgr); err != nil {
return fmt.Errorf("error starting root pvc controller %v", err)
}

Expand All @@ -334,7 +332,7 @@ func (c *ClusterController) setupStorageControllers(m *manager.Manager, node *co
RootClient: c.Root,
LeafClientSet: leafClient,
}
if err := rootPVController.SetupWithManager(*c.mgr); err != nil {
if err := rootPVController.SetupWithManager(c.mgr); err != nil {
return fmt.Errorf("error starting root pv controller %v", err)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package pod
package controllers

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -45,17 +45,17 @@ type SyncResourcesReconciler struct {
}

func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
obj, err := r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{})
// skip namespace
if len(r.Namespace) > 0 && r.Namespace != request.Namespace {
return reconcile.Result{}, nil
}

_, err := r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("get %s error: %v", request.NamespacedName, err)
return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil
}

// skip namespace
if len(r.Namespace) > 0 && r.Namespace != obj.GetNamespace() {
return reconcile.Result{}, nil
}

if err = r.SyncResource(ctx, request); err != nil {
klog.Errorf("sync resource %s error: %v", request.NamespacedName, err)
return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil
Expand Down Expand Up @@ -128,7 +128,7 @@ func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reco
return nil
}

old, err := r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{})
old, err := r.DynamicLeafClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{})

if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -140,36 +140,22 @@ func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reco
return err
}

objGenerator := func() (interface{}, error) {
switch old.GetKind() {
case SYNC_KIND_CONFIGMAP:
return &corev1.ConfigMap{}, nil
case SYNC_KIND_SECRET:
return &corev1.Secret{}, nil
}
return nil, fmt.Errorf("[objGenerator] not match kind")
}

convertSelector := func(oldObj, newObj interface{}) error {
switch old.GetKind() {
case SYNC_KIND_CONFIGMAP:
utils.UpdateConfigMap(oldObj.(*corev1.ConfigMap), newObj.(*corev1.ConfigMap))
return nil
case SYNC_KIND_SECRET:
utils.UpdateSecret(oldObj.(*corev1.Secret), newObj.(*corev1.Secret))
return nil
}
return fmt.Errorf("[convertSelector] not match kind")
var latest *unstructured.Unstructured
var unstructerr error
switch old.GetKind() {
case SYNC_KIND_CONFIGMAP:
latest, unstructerr = utils.UpdateUnstructured(old, obj, &corev1.ConfigMap{}, &corev1.ConfigMap{}, utils.UpdateConfigMap)
case SYNC_KIND_SECRET:
latest, unstructerr = utils.UpdateUnstructured(old, obj, &corev1.Secret{}, &corev1.Secret{}, utils.UpdateSecret)
}

latest, err := utils.UpdateUnstructured(old, obj, objGenerator, convertSelector)
if err != nil {
return err
if unstructerr != nil {
return unstructerr
}
if utils.IsObjectUnstructuredGlobal(latest.GetAnnotations()) {
if !utils.IsObjectUnstructuredGlobal(old.GetAnnotations()) {
return nil
}
_, err = r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Update(ctx, latest, metav1.UpdateOptions{})
_, err = r.DynamicLeafClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Update(ctx, latest, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("update %s from client cluster failed, error: %v", latest.GetKind(), err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

const (
Expand Down Expand Up @@ -62,8 +63,8 @@ func (r *LeafPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, nil
}

if utils.IsKosmosPod(podCopy) {
utils.FitObjectMeta(&podCopy.ObjectMeta)
if podutils.IsKosmosPod(podCopy) {
podutils.FitObjectMeta(&podCopy.ObjectMeta)
podCopy.ResourceVersion = "0"
if err := r.RootClient.Status().Update(ctx, podCopy); err != nil && !apierrors.IsNotFound(err) {
klog.Info(errors.Wrap(err, "error while updating pod status in kubernetes"))
Expand All @@ -73,21 +74,39 @@ func (r *LeafPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, nil
}

type rootDeleteOption struct {
GracePeriodSeconds *int64
}

func (dopt *rootDeleteOption) ApplyToDelete(opt *client.DeleteOptions) {
opt.GracePeriodSeconds = dopt.GracePeriodSeconds
}

func NewRootDeleteOption(pod *corev1.Pod) client.DeleteOption {
gracePeriodSeconds := pod.DeletionGracePeriodSeconds

current := metav1.NewTime(time.Now())
if pod.DeletionTimestamp.Before(&current) {
gracePeriodSeconds = new(int64)
}

return &rootDeleteOption{
GracePeriodSeconds: gracePeriodSeconds,
}
}

func (r *LeafPodReconciler) safeDeletePodInRootCluster(ctx context.Context, request reconcile.Request) error {
rPod := corev1.Pod{}
err := r.RootClient.Get(ctx, request.NamespacedName, &rPod)
if err == nil || !apierrors.IsNotFound(err) {
rPodCopy := rPod.DeepCopy()

deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: rPodCopy.DeletionGracePeriodSeconds,
}
current := metav1.NewTime(time.Now())
if rPodCopy.DeletionTimestamp.Before(&current) {
deleteOptions.GracePeriodSeconds = new(int64)
}
if err := r.RootClient.Delete(ctx, rPodCopy); err != nil && !apierrors.IsNotFound(err) {
return err
deleteOption := NewRootDeleteOption(rPodCopy)

if err := r.RootClient.Delete(ctx, rPodCopy, deleteOption); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
}
}
return nil
Expand Down
Loading

0 comments on commit 3f4ec13

Please sign in to comment.