Skip to content

Commit

Permalink
Merge pull request kosmos-io#403 from rxy0210/feature_sync_leaf
Browse files Browse the repository at this point in the history
fix: update projected in feature_sync_leaf
  • Loading branch information
kosmos-robot authored Feb 22, 2024
2 parents 6c4f417 + 032453f commit 4dbedef
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 71 deletions.
188 changes: 117 additions & 71 deletions pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

const (
RootPodControllerName = "root-pod-controller"
RootPodRequeueTime = 10 * time.Second
)

type RootPodReconciler struct {
Expand Down Expand Up @@ -128,17 +127,17 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
lr, err := r.GlobalLeafManager.GetLeafResourceByNodeName(nodeName)
if err != nil {
// wait for leaf resource init
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
if err := r.DeletePodInLeafCluster(ctx, lr, request.NamespacedName, false); err != nil {
klog.Errorf("delete pod in leaf error[1]: %v, %s", err, request.NamespacedName)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
}
return reconcile.Result{}, nil
}
klog.Errorf("get %s error: %v", request.NamespacedName, err)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

rootpod := *(cachepod.DeepCopy())
Expand All @@ -154,7 +153,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req

targetNode := &corev1.Node{}
if err := r.RootClient.Get(ctx, nn, targetNode); err != nil {
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

if targetNode.Annotations == nil {
Expand All @@ -171,13 +170,13 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
// TODO: GlobalLeafResourceManager may not inited....
// belongs to the current node
if !r.GlobalLeafManager.HasNode(rootpod.Spec.NodeName) {
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

lr, err := r.GlobalLeafManager.GetLeafResourceByNodeName(rootpod.Spec.NodeName)
if err != nil {
// wait for leaf resource init
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

// skip namespace
Expand All @@ -189,7 +188,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
if !rootpod.GetDeletionTimestamp().IsZero() {
if err := r.DeletePodInLeafCluster(ctx, lr, request.NamespacedName, true); err != nil {
klog.Errorf("delete pod in leaf error[1]: %v, %s", err, request.NamespacedName)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
return reconcile.Result{}, nil
}
Expand All @@ -202,21 +201,20 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
if errors.IsNotFound(err) {
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
klog.Errorf("create pod inleaf error, err: %s", err)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
} else {
return reconcile.Result{}, nil
}
} else {
klog.Errorf("get pod in leaf error[3]: %v, %s", err, request.NamespacedName)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
}

// update pod in leaf
if podutils.ShouldEnqueue(leafPod, &rootpod) {
if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
klog.Errorf("Error Update pod in leafcluster. %v", err)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
}

Expand Down Expand Up @@ -561,73 +559,122 @@ func (r *RootPodReconciler) changeToMasterCoreDNS(ctx context.Context, pod *core
}
}

// projectedHandler Process the project volume, creating and mounting secret, configmap, DownwardAPI,
// createProjectedHandler Process the project volume, creating and mounting secret, configmap, DownwardAPI,
// and ServiceAccountToken from the project volume in the member cluster to the pod of the host cluster
func (r *RootPodReconciler) projectedHandler(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) {
func (r *RootPodReconciler) createProjectedHandler(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) {
if pod.Spec.Volumes == nil {
return
}

for _, volume := range pod.Spec.Volumes {
if volume.Projected != nil {
falseValue := false
pod.Spec.AutomountServiceAccountToken = &falseValue

saName := pod.Spec.ServiceAccountName
var sources []corev1.VolumeProjection

for _, projectedVolumeSource := range volume.Projected.Sources {
// Process all resources for the rootpod
if projectedVolumeSource.ServiceAccountToken != nil {
tokenSecretName, err := r.createSATokenInLeafCluster(ctx, lr, saName, pod)
if err != nil {
klog.Errorf("[convertAuth] create sa secret failed, ns: %s, pod: %s, err: %s", pod.Namespace, pod.Name, err)
return
}
secretProjection := corev1.VolumeProjection{
Secret: &corev1.SecretProjection{
Items: []corev1.KeyToPath{
{
Key: "token",
Path: projectedVolumeSource.ServiceAccountToken.Path,
},
},
},
}
secretProjection.Secret.Name = tokenSecretName
sources = append(sources, secretProjection)
}
if projectedVolumeSource.ConfigMap != nil {
cmName, err := r.createConfigMapInLeafCluster(ctx, lr, projectedVolumeSource.ConfigMap.Name, pod)
if err != nil {
klog.Errorf("[convertAuth] create configmap failed, ns: %s, cm: %s, err: %s", pod.Namespace, cmName, err)
return
}
cmDeepCopy := projectedVolumeSource.DeepCopy()
cmDeepCopy.ConfigMap.Name = cmName
sources = append(sources, *cmDeepCopy)
}
if projectedVolumeSource.Secret != nil {
Secret := projectedVolumeSource.Secret
seName, err := r.createSecretInLeafCluster(ctx, lr, Secret.Name, pod)
if err != nil {
klog.Errorf("[convertAuth] create secret failed, ns: %s, cm: %s, err: %s", pod.Namespace, seName, err)
return
}
secretDeepCopy := projectedVolumeSource.DeepCopy()
secretDeepCopy.Secret.Name = seName
sources = append(sources, *secretDeepCopy)
}
if projectedVolumeSource.DownwardAPI != nil {
DownwardAPIProjection := corev1.VolumeProjection{
DownwardAPI: projectedVolumeSource.DownwardAPI,
}
sources = append(sources, DownwardAPIProjection)
if sources := r.projectedHandler(ctx, lr, volume, pod); sources != nil {
volume.Projected.Sources = sources
}
}
}
}

// updateProjectedHandler update projected volume
func (r *RootPodReconciler) updateProjectedHandler(ctx context.Context, lr *leafUtils.LeafResource, rootPod, podCopy *corev1.Pod) {
if rootPod.Spec.Volumes == nil {
return
}
var leafPodVolumes []corev1.Volume
if podCopy.Spec.Volumes == nil {
leafPodVolumes = nil
} else {
leafPodVolumes = podCopy.Spec.Volumes
}

var volumeCopy []corev1.Volume

for _, volume := range rootPod.Spec.Volumes {
if volume.Projected != nil {
if _, flag := findVolumeInClient(volume, leafPodVolumes); !flag {
if sources := r.projectedHandler(ctx, lr, volume, podCopy); sources != nil {
volume.Projected.Sources = sources
}
}
volume.Projected.Sources = sources
}
volumeCopy = append(volumeCopy, volume)
}
podCopy.Spec.Volumes = volumeCopy
}

func (r *RootPodReconciler) projectedHandler(ctx context.Context, lr *leafUtils.LeafResource, volume corev1.Volume, pod *corev1.Pod) []corev1.VolumeProjection {
falseValue := false
pod.Spec.AutomountServiceAccountToken = &falseValue

saName := pod.Spec.ServiceAccountName
var sources []corev1.VolumeProjection

for _, projectedVolumeSource := range volume.Projected.Sources {
// Process all resources for the rootpod
if projectedVolumeSource.ServiceAccountToken != nil {
tokenSecretName, err := r.createSATokenInLeafCluster(ctx, lr, saName, pod)
if err != nil {
klog.Errorf("[convertAuth] create sa secret failed, ns: %s, pod: %s, err: %s", pod.Namespace, pod.Name, err)
return nil
}
secretProjection := corev1.VolumeProjection{
Secret: &corev1.SecretProjection{
Items: []corev1.KeyToPath{
{
Key: "token",
Path: projectedVolumeSource.ServiceAccountToken.Path,
},
},
},
}
secretProjection.Secret.Name = tokenSecretName
sources = append(sources, secretProjection)
}
if projectedVolumeSource.ConfigMap != nil {
cmName, err := r.createConfigMapInLeafCluster(ctx, lr, projectedVolumeSource.ConfigMap.Name, pod)
if err != nil {
klog.Errorf("[convertAuth] create configmap failed, ns: %s, cm: %s, err: %s", pod.Namespace, cmName, err)
return nil
}
cmDeepCopy := projectedVolumeSource.DeepCopy()
cmDeepCopy.ConfigMap.Name = cmName
sources = append(sources, *cmDeepCopy)
}
if projectedVolumeSource.Secret != nil {
Secret := projectedVolumeSource.Secret
seName, err := r.createSecretInLeafCluster(ctx, lr, Secret.Name, pod)
if err != nil {
klog.Errorf("[convertAuth] create secret failed, ns: %s, cm: %s, err: %s", pod.Namespace, seName, err)
return nil
}
secretDeepCopy := projectedVolumeSource.DeepCopy()
secretDeepCopy.Secret.Name = seName
sources = append(sources, *secretDeepCopy)
}
if projectedVolumeSource.DownwardAPI != nil {
DownwardAPIProjection := corev1.VolumeProjection{
DownwardAPI: projectedVolumeSource.DownwardAPI,
}
sources = append(sources, DownwardAPIProjection)
}
}
return sources
}

func findVolumeInClient(volumeInRoot corev1.Volume, volumes []corev1.Volume) (corev1.Volume, bool) {
if volumes == nil {
return corev1.Volume{}, false
}

for _, volume := range volumes {
if volume.Projected != nil && volume.Name == volumeInRoot.Name {
if reflect.DeepEqual(volume.Projected, volumeInRoot.Projected) {
return volume, true
}
}
}

return corev1.Volume{}, false
}

// createServiceAccountInLeafCluster Create an sa corresponding to token-secret in member cluster
Expand Down Expand Up @@ -883,7 +930,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
klog.V(4).Infof("Creating Volumes successed %+v", basicPod)
}

r.projectedHandler(ctx, lr, basicPod)
r.createProjectedHandler(ctx, lr, basicPod)

if !r.Options.MultiClusterService {
r.changeToMasterCoreDNS(ctx, basicPod, r.Options)
Expand Down Expand Up @@ -921,11 +968,10 @@ func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leaf
if reflect.DeepEqual(leafPod.Spec, podCopy.Spec) &&
reflect.DeepEqual(leafPod.Annotations, podCopy.Annotations) &&
reflect.DeepEqual(leafPod.Labels, podCopy.Labels) {
klog.V(4).Info("Skipping leaf pod update")
return nil
}

r.projectedHandler(ctx, lr, podCopy)
r.updateProjectedHandler(ctx, lr, rootPod, podCopy)

if !r.Options.MultiClusterService {
r.changeToMasterCoreDNS(ctx, podCopy, r.Options)
Expand Down
4 changes: 4 additions & 0 deletions pkg/utils/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
Expand Down Expand Up @@ -152,6 +154,8 @@ const (

// LabelNodeRoleNode specifies that a node hosts node components
LabelNodeRoleNode = "node-role.kubernetes.io/node"

DefaultRequeueTime = 10 * time.Second
)

const (
Expand Down

0 comments on commit 4dbedef

Please sign in to comment.