diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index 4c52bc7f2..e3671a7ff 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -60,12 +60,12 @@ type ClusterController struct { Logger logr.Logger Options *options.Options - ControllerManagers map[string]*manager.Manager + ControllerManagers map[string]manager.Manager ManagerCancelFuncs map[string]*context.CancelFunc ControllerManagersLock sync.Mutex - mgr *manager.Manager RootResourceManager *utils.ResourceManager + mgr manager.Manager } func isRootCluster(cluster *clusterlinkv1alpha1.Cluster) bool { @@ -111,11 +111,11 @@ var predicatesFunc = predicate.Funcs{ func (c *ClusterController) SetupWithManager(mgr manager.Manager) error { c.ManagerCancelFuncs = make(map[string]*context.CancelFunc) - c.ControllerManagers = make(map[string]*manager.Manager) + c.ControllerManagers = make(map[string]manager.Manager) c.Logger = mgr.GetLogger() // TODO this may not be a good idea - c.mgr = &mgr + c.mgr = mgr return controllerruntime.NewControllerManagedBy(mgr). Named(ControllerName). WithOptions(controller.Options{}). @@ -209,11 +209,11 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req subContext, cancel := context.WithCancel(ctx) c.ControllerManagersLock.Lock() - c.ControllerManagers[cluster.Name] = &mgr + c.ControllerManagers[cluster.Name] = mgr c.ManagerCancelFuncs[cluster.Name] = &cancel c.ControllerManagersLock.Unlock() - if err = c.setupControllers(&mgr, cluster, node, leafDynamic, leafClient, kosmosClient); err != nil { + if err = c.setupControllers(mgr, cluster, node, leafDynamic, leafClient, kosmosClient); err != nil { return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err) } @@ -240,8 +240,7 @@ func (c *ClusterController) clearClusterControllers(cluster *clusterlinkv1alpha1 delete(c.ControllerManagers, cluster.Name) } -func (c *ClusterController) setupControllers(m *manager.Manager, cluster *clusterlinkv1alpha1.Cluster, node *corev1.Node, clientDynamic *dynamic.DynamicClient, leafClient kubernetes.Interface, kosmosClient kosmosversioned.Interface) error { - mgr := *m +func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *clusterlinkv1alpha1.Cluster, node *corev1.Node, clientDynamic *dynamic.DynamicClient, leafClient kubernetes.Interface, kosmosClient kosmosversioned.Interface) error { nodeResourcesController := controllers.NodeResourcesController{ Leaf: mgr.GetClient(), Root: c.Root, @@ -282,33 +281,34 @@ func (c *ClusterController) setupControllers(m *manager.Manager, cluster *cluste DynamicRootClient: c.RootDynamic, DynamicLeafClient: clientDynamic, } - if err := RootPodReconciler.SetupWithManager(*c.mgr); err != nil { + if err := RootPodReconciler.SetupWithManager(c.mgr); err != nil { return fmt.Errorf("error starting RootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err) } - podUpstreamController := podcontrollers.LeafPodReconciler{ + leafPodController := podcontrollers.LeafPodReconciler{ RootClient: c.Root, Namespace: cluster.Spec.Namespace, } - if err := podUpstreamController.SetupWithManager(*c.mgr); err != nil { + if err := leafPodController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting podUpstreamReconciler %s: %v", podcontrollers.LeafPodControllerName, err) } - err := c.setupStorageControllers(m, node, leafClient) + err := c.setupStorageControllers(mgr, node, leafClient) if err != nil { return err } - for i, gvr := range podcontrollers.SYNC_GVRS { - demoController := podcontrollers.SyncResourcesReconciler{ + for i, gvr := range controllers.SYNC_GVRS { + demoController := controllers.SyncResourcesReconciler{ GroupVersionResource: gvr, - Object: podcontrollers.SYNC_OBJS[i], + Object: controllers.SYNC_OBJS[i], DynamicRootClient: c.RootDynamic, DynamicLeafClient: clientDynamic, ControllerName: "async-controller-" + gvr.Resource, + Namespace: cluster.Spec.Namespace, } - if err := demoController.SetupWithManager(mgr, gvr); err != nil { + if err := demoController.SetupWithManager(c.mgr, gvr); err != nil { klog.Errorf("Unable to create cluster node controller: %v", err) return err } @@ -317,15 +317,13 @@ func (c *ClusterController) setupControllers(m *manager.Manager, cluster *cluste return nil } -func (c *ClusterController) setupStorageControllers(m *manager.Manager, node *corev1.Node, leafClient kubernetes.Interface) error { - mgr := *m - +func (c *ClusterController) setupStorageControllers(mgr manager.Manager, node *corev1.Node, leafClient kubernetes.Interface) error { rootPVCController := pvc.RootPVCController{ LeafClient: mgr.GetClient(), RootClient: c.Root, LeafClientSet: leafClient, } - if err := rootPVCController.SetupWithManager(*c.mgr); err != nil { + if err := rootPVCController.SetupWithManager(c.mgr); err != nil { return fmt.Errorf("error starting root pvc controller %v", err) } @@ -334,7 +332,7 @@ func (c *ClusterController) setupStorageControllers(m *manager.Manager, node *co RootClient: c.Root, LeafClientSet: leafClient, } - if err := rootPVController.SetupWithManager(*c.mgr); err != nil { + if err := rootPVController.SetupWithManager(c.mgr); err != nil { return fmt.Errorf("error starting root pv controller %v", err) } diff --git a/pkg/clustertree/cluster-manager/controllers/pod/sync_resources_controller.go b/pkg/clustertree/cluster-manager/controllers/common_controller.go similarity index 79% rename from pkg/clustertree/cluster-manager/controllers/pod/sync_resources_controller.go rename to pkg/clustertree/cluster-manager/controllers/common_controller.go index 3b6bb550d..bbbd3ae28 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/sync_resources_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/common_controller.go @@ -1,13 +1,13 @@ -package pod +package controllers import ( "context" - "fmt" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/klog/v2" @@ -45,17 +45,17 @@ type SyncResourcesReconciler struct { } func (r *SyncResourcesReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - obj, err := r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{}) + // skip namespace + if len(r.Namespace) > 0 && r.Namespace != request.Namespace { + return reconcile.Result{}, nil + } + + _, err := r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{}) if err != nil { klog.Errorf("get %s error: %v", request.NamespacedName, err) return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil } - // skip namespace - if len(r.Namespace) > 0 && r.Namespace != obj.GetNamespace() { - return reconcile.Result{}, nil - } - if err = r.SyncResource(ctx, request); err != nil { klog.Errorf("sync resource %s error: %v", request.NamespacedName, err) return reconcile.Result{RequeueAfter: SyncResourcesRequeueTime}, nil @@ -128,7 +128,7 @@ func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reco return nil } - old, err := r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{}) + old, err := r.DynamicLeafClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { @@ -140,36 +140,22 @@ func (r *SyncResourcesReconciler) SyncResource(ctx context.Context, request reco return err } - objGenerator := func() (interface{}, error) { - switch old.GetKind() { - case SYNC_KIND_CONFIGMAP: - return &corev1.ConfigMap{}, nil - case SYNC_KIND_SECRET: - return &corev1.Secret{}, nil - } - return nil, fmt.Errorf("[objGenerator] not match kind") - } - - convertSelector := func(oldObj, newObj interface{}) error { - switch old.GetKind() { - case SYNC_KIND_CONFIGMAP: - utils.UpdateConfigMap(oldObj.(*corev1.ConfigMap), newObj.(*corev1.ConfigMap)) - return nil - case SYNC_KIND_SECRET: - utils.UpdateSecret(oldObj.(*corev1.Secret), newObj.(*corev1.Secret)) - return nil - } - return fmt.Errorf("[convertSelector] not match kind") + var latest *unstructured.Unstructured + var unstructerr error + switch old.GetKind() { + case SYNC_KIND_CONFIGMAP: + latest, unstructerr = utils.UpdateUnstructured(old, obj, &corev1.ConfigMap{}, &corev1.ConfigMap{}, utils.UpdateConfigMap) + case SYNC_KIND_SECRET: + latest, unstructerr = utils.UpdateUnstructured(old, obj, &corev1.Secret{}, &corev1.Secret{}, utils.UpdateSecret) } - latest, err := utils.UpdateUnstructured(old, obj, objGenerator, convertSelector) - if err != nil { - return err + if unstructerr != nil { + return unstructerr } - if utils.IsObjectUnstructuredGlobal(latest.GetAnnotations()) { + if !utils.IsObjectUnstructuredGlobal(old.GetAnnotations()) { return nil } - _, err = r.DynamicRootClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Update(ctx, latest, metav1.UpdateOptions{}) + _, err = r.DynamicLeafClient.Resource(r.GroupVersionResource).Namespace(request.Namespace).Update(ctx, latest, metav1.UpdateOptions{}) if err != nil { klog.Errorf("update %s from client cluster failed, error: %v", latest.GetKind(), err) return err diff --git a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go index 56c753e76..e0d412760 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go @@ -20,6 +20,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( @@ -62,8 +63,8 @@ func (r *LeafPodReconciler) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{}, nil } - if utils.IsKosmosPod(podCopy) { - utils.FitObjectMeta(&podCopy.ObjectMeta) + if podutils.IsKosmosPod(podCopy) { + podutils.FitObjectMeta(&podCopy.ObjectMeta) podCopy.ResourceVersion = "0" if err := r.RootClient.Status().Update(ctx, podCopy); err != nil && !apierrors.IsNotFound(err) { klog.Info(errors.Wrap(err, "error while updating pod status in kubernetes")) @@ -73,21 +74,39 @@ func (r *LeafPodReconciler) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{}, nil } +type rootDeleteOption struct { + GracePeriodSeconds *int64 +} + +func (dopt *rootDeleteOption) ApplyToDelete(opt *client.DeleteOptions) { + opt.GracePeriodSeconds = dopt.GracePeriodSeconds +} + +func NewRootDeleteOption(pod *corev1.Pod) client.DeleteOption { + gracePeriodSeconds := pod.DeletionGracePeriodSeconds + + current := metav1.NewTime(time.Now()) + if pod.DeletionTimestamp.Before(¤t) { + gracePeriodSeconds = new(int64) + } + + return &rootDeleteOption{ + GracePeriodSeconds: gracePeriodSeconds, + } +} + func (r *LeafPodReconciler) safeDeletePodInRootCluster(ctx context.Context, request reconcile.Request) error { rPod := corev1.Pod{} err := r.RootClient.Get(ctx, request.NamespacedName, &rPod) if err == nil || !apierrors.IsNotFound(err) { rPodCopy := rPod.DeepCopy() - deleteOptions := metav1.DeleteOptions{ - GracePeriodSeconds: rPodCopy.DeletionGracePeriodSeconds, - } - current := metav1.NewTime(time.Now()) - if rPodCopy.DeletionTimestamp.Before(¤t) { - deleteOptions.GracePeriodSeconds = new(int64) - } - if err := r.RootClient.Delete(ctx, rPodCopy); err != nil && !apierrors.IsNotFound(err) { - return err + deleteOption := NewRootDeleteOption(rPodCopy) + + if err := r.RootClient.Delete(ctx, rPodCopy, deleteOption); err != nil { + if !apierrors.IsNotFound(err) { + return err + } } } return nil diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index 22279c332..3068836a9 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -10,6 +10,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -27,6 +28,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( @@ -45,13 +47,80 @@ type RootPodReconciler struct { IgnoreLabels []string EnableServiceAccount bool - DynamicLeafClient dynamic.Interface + DynamicLeafClient dynamic.Interface + DynamicRootClient dynamic.Interface + envResourceManager utils.EnvResourceManager +} + +type envResourceManager struct { DynamicRootClient dynamic.Interface } +// GetConfigMap retrieves the specified config map from the cache. +func (rm *envResourceManager) GetConfigMap(name, namespace string) (*corev1.ConfigMap, error) { + // return rm.configMapLister.ConfigMaps(namespace).Get(name) + obj, err := rm.DynamicRootClient.Resource(utils.GVR_CONFIGMAP).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + retObj := &corev1.ConfigMap{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &retObj); err != nil { + return nil, err + } + + return retObj, nil +} + +// GetSecret retrieves the specified secret from Kubernetes. +func (rm *envResourceManager) GetSecret(name, namespace string) (*corev1.Secret, error) { + // return rm.secretLister.Secrets(namespace).Get(name) + obj, err := rm.DynamicRootClient.Resource(utils.GVR_SECRET).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + retObj := &corev1.Secret{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &retObj); err != nil { + return nil, err + } + + return retObj, nil +} + +// ListServices retrieves the list of services from Kubernetes. +func (rm *envResourceManager) ListServices() ([]*corev1.Service, error) { + // return rm.serviceLister.List(labels.Everything()) + objs, err := rm.DynamicRootClient.Resource(utils.GVR_SERVICE).List(context.TODO(), metav1.ListOptions{ + LabelSelector: labels.Everything().String(), + }) + + if err != nil { + return nil, err + } + + retObj := make([]*corev1.Service, 0) + + for _, obj := range objs.Items { + tmpObj := &corev1.Service{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &tmpObj); err != nil { + return nil, err + } + retObj = append(retObj, tmpObj) + } + + return retObj, nil +} + +func NewEnvResourceManager(client dynamic.Interface) utils.EnvResourceManager { + return &envResourceManager{ + DynamicRootClient: client, + } +} + func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - var pod corev1.Pod - if err := r.Get(ctx, request.NamespacedName, &pod); err != nil { + var cachepod corev1.Pod + if err := r.Get(ctx, request.NamespacedName, &cachepod); err != nil { if errors.IsNotFound(err) { leafPod := &corev1.Pod{} err := r.LeafClient.Get(ctx, request.NamespacedName, leafPod) @@ -69,6 +138,8 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil } + pod := *(cachepod.DeepCopy()) + // belongs to the current node if pod.Spec.NodeName != r.NodeName { return reconcile.Result{}, nil @@ -113,7 +184,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req } } - if utils.ShouldEnqueue(leafPod, &pod) { + if podutils.ShouldEnqueue(leafPod, &pod) { if err := r.UpdatePodInLeafCluster(ctx, &pod); err != nil { return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil } @@ -127,6 +198,8 @@ func (r *RootPodReconciler) SetupWithManager(mgr manager.Manager) error { r.Client = mgr.GetClient() } + r.envResourceManager = NewEnvResourceManager(r.DynamicRootClient) + return ctrl.NewControllerManagedBy(mgr). Named(RootPodControllerName). WithOptions(controller.Options{}). @@ -159,7 +232,7 @@ func (p *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, gvr return fmt.Errorf("find gvr(%v) %v error %v", gvr, rname, err) } - utils.FitUnstructuredObjMeta(unstructuredObj) + podutils.FitUnstructuredObjMeta(unstructuredObj) if gvr.Resource == "secrets" { secretObj := &corev1.Secret{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, secretObj) @@ -174,7 +247,7 @@ func (p *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, gvr } } - utils.SetUnstructuredObjGlobal(unstructuredObj) + podutils.SetUnstructuredObjGlobal(unstructuredObj) _, err = p.DynamicLeafClient.Resource(gvr).Namespace(ns).Create(ctx, unstructuredObj, metav1.CreateOptions{}) if err != nil { @@ -311,7 +384,7 @@ func (p *RootPodReconciler) createCAInLeafCluster(ctx context.Context, ns string newCA := ca.DeepCopy() newCA.Name = utils.MasterRooTCAName - utils.FitObjectMeta(&newCA.ObjectMeta) + podutils.FitObjectMeta(&newCA.ObjectMeta) err = p.LeafClient.Create(ctx, newCA) if err != nil && !errors.IsAlreadyExists(err) { @@ -452,7 +525,12 @@ func (p *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, pod *cor return nil } - basicPod := utils.FitPod(pod, p.IgnoreLabels) + if err := podutils.PopulateEnvironmentVariables(ctx, pod, p.envResourceManager); err != nil { + // span.SetStatus(err) + return err + } + + basicPod := podutils.FitPod(pod, p.IgnoreLabels) klog.Infof("Creating pod %v/%+v", pod.Namespace, pod.Name) ns := &corev1.Namespace{} nsKey := types.NamespacedName{ @@ -474,9 +552,9 @@ func (p *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, pod *cor return err } } - secretNames := utils.GetSecrets(pod) - configMaps := utils.GetConfigmaps(pod) - pvcs := utils.GetPVCs(pod) + secretNames := podutils.GetSecrets(pod) + configMaps := podutils.GetConfigmaps(pod) + pvcs := podutils.GetPVCs(pod) // nolint:errcheck go wait.PollImmediate(500*time.Millisecond, 10*time.Minute, func() (bool, error) { klog.Info("Trying to creating base dependent") @@ -530,15 +608,15 @@ func (p *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, pod *cor if err != nil { return fmt.Errorf("could not get current pod") } - if !utils.IsKosmosPod(pod) { + if !podutils.IsKosmosPod(pod) { klog.Info("Pod is not created by vk, ignore") return nil } - utils.FitLabels(currentPod.ObjectMeta.Labels, p.IgnoreLabels) + podutils.FitLabels(currentPod.ObjectMeta.Labels, p.IgnoreLabels) podCopy := currentPod.DeepCopy() // util.GetUpdatedPod update PodCopy container image, annotations, labels. // recover toleration, affinity, tripped ignore labels. - utils.GetUpdatedPod(podCopy, pod, p.IgnoreLabels) + podutils.GetUpdatedPod(podCopy, pod, p.IgnoreLabels) if reflect.DeepEqual(currentPod.Spec, podCopy.Spec) && reflect.DeepEqual(currentPod.Annotations, podCopy.Annotations) && reflect.DeepEqual(currentPod.Labels, podCopy.Labels) { @@ -559,7 +637,7 @@ func (p *RootPodReconciler) DeletePodInLeafCluster(ctx context.Context, pod *cor } klog.Infof("Deleting pod %v/%+v", pod.Namespace, pod.Name) - if !utils.IsKosmosPod(pod) { + if !podutils.IsKosmosPod(pod) { klog.Info("Pod is not create by vk, ignore") return nil } @@ -597,6 +675,6 @@ func (p *RootPodReconciler) GetPodInLeafCluster(ctx context.Context, namespace s return nil, fmt.Errorf("could not get pod %s/%s: %v", namespace, name, err) } podCopy := pod.DeepCopy() - utils.RecoverLabels(podCopy.Labels, podCopy.Annotations) + podutils.RecoverLabels(podCopy.Labels, podCopy.Annotations) return podCopy, nil } diff --git a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go index 7f720301b..64dad4b86 100644 --- a/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pvc/leaf_pvc_controller.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( @@ -116,7 +117,7 @@ func filterPVC(leafPVC *v1.PersistentVolumeClaim, nodeName string) error { leafPVC.ObjectMeta.ResourceVersion = "" leafPVC.ObjectMeta.OwnerReferences = nil - utils.SetObjectGlobal(&leafPVC.ObjectMeta) + podutils.SetObjectGlobal(&leafPVC.ObjectMeta) if labelSelector != nil { labelStr, err := json.Marshal(labelSelector) if err != nil { diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index df7c8b649..7bbda5bac 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -105,3 +105,9 @@ var GVR_SECRET = schema.GroupVersionResource{ Version: "v1", Resource: "secrets", } + +var GVR_SERVICE = schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", +} diff --git a/pkg/utils/k8s.go b/pkg/utils/k8s.go index 15a069e21..e4586f960 100644 --- a/pkg/utils/k8s.go +++ b/pkg/utils/k8s.go @@ -18,12 +18,21 @@ import ( kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" ) +const SYNC_KIND_CONFIGMAP = "ConfigMap" +const SYNC_KIND_SECRET = "SECRET" + type ClustersNodeSelection struct { NodeSelector map[string]string `json:"nodeSelector,omitempty"` Affinity *corev1.Affinity `json:"affinity,omitempty"` Tolerations []corev1.Toleration `json:"tolerations,omitempty"` } +type EnvResourceManager interface { + GetConfigMap(name, namespace string) (*corev1.ConfigMap, error) + GetSecret(name, namespace string) (*corev1.Secret, error) + ListServices() ([]*corev1.Service, error) +} + func CreateMergePatch(original, new interface{}) ([]byte, error) { pvByte, err := json.Marshal(original) if err != nil { @@ -336,26 +345,16 @@ func UpdateSecret(old, new *corev1.Secret) { old.Type = new.Type } -func UpdateUnstructured(old, new *unstructured.Unstructured, g func() (interface{}, error), cb func(old, new interface{}) error) (*unstructured.Unstructured, error) { - oldObj, err := g() - if err != nil { - return nil, err - } +func UpdateUnstructured[T *corev1.ConfigMap | *corev1.Secret](old, new *unstructured.Unstructured, oldObj T, newObj T, update func(old, new T)) (*unstructured.Unstructured, error) { if err := runtime.DefaultUnstructuredConverter.FromUnstructured(old.UnstructuredContent(), &oldObj); err != nil { return nil, err } - newObj, err := g() - if err != nil { - return nil, err - } if err := runtime.DefaultUnstructuredConverter.FromUnstructured(new.UnstructuredContent(), &newObj); err != nil { return nil, err } - if err := cb(oldObj, newObj); err != nil { - return nil, err - } + update(oldObj, newObj) if retObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(oldObj); err == nil { return &unstructured.Unstructured{ diff --git a/pkg/utils/podutils/env.go b/pkg/utils/podutils/env.go new file mode 100644 index 000000000..4cb777048 --- /dev/null +++ b/pkg/utils/podutils/env.go @@ -0,0 +1,699 @@ +// This code is directly lifted from the karmada +// For reference: +// https://github.com/virtual-kubelet/virtual-kubelet/blob/master/internal/podutils/env.go + +package podutils + +import ( + "context" + "fmt" + "net" + "sort" + "strconv" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + apivalidation "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/klog" + + "github.com/kosmos.io/kosmos/pkg/utils" +) + +const ( + // ReasonOptionalConfigMapNotFound is the reason used in events emitted when an optional configmap is not found. + ReasonOptionalConfigMapNotFound = "OptionalConfigMapNotFound" + // ReasonOptionalConfigMapKeyNotFound is the reason used in events emitted when an optional configmap key is not found. + ReasonOptionalConfigMapKeyNotFound = "OptionalConfigMapKeyNotFound" + // ReasonFailedToReadOptionalConfigMap is the reason used in events emitted when an optional configmap could not be read. + ReasonFailedToReadOptionalConfigMap = "FailedToReadOptionalConfigMap" + + // ReasonOptionalSecretNotFound is the reason used in events emitted when an optional secret is not found. + ReasonOptionalSecretNotFound = "OptionalSecretNotFound" + // ReasonOptionalSecretKeyNotFound is the reason used in events emitted when an optional secret key is not found. + ReasonOptionalSecretKeyNotFound = "OptionalSecretKeyNotFound" + // ReasonFailedToReadOptionalSecret is the reason used in events emitted when an optional secret could not be read. + ReasonFailedToReadOptionalSecret = "FailedToReadOptionalSecret" + + // ReasonMandatoryConfigMapNotFound is the reason used in events emitted when a mandatory configmap is not found. + ReasonMandatoryConfigMapNotFound = "MandatoryConfigMapNotFound" + // ReasonMandatoryConfigMapKeyNotFound is the reason used in events emitted when a mandatory configmap key is not found. + ReasonMandatoryConfigMapKeyNotFound = "MandatoryConfigMapKeyNotFound" + // ReasonFailedToReadMandatoryConfigMap is the reason used in events emitted when a mandatory configmap could not be read. + ReasonFailedToReadMandatoryConfigMap = "FailedToReadMandatoryConfigMap" + + // ReasonMandatorySecretNotFound is the reason used in events emitted when a mandatory secret is not found. + ReasonMandatorySecretNotFound = "MandatorySecretNotFound" + // ReasonMandatorySecretKeyNotFound is the reason used in events emitted when a mandatory secret key is not found. + ReasonMandatorySecretKeyNotFound = "MandatorySecretKeyNotFound" + // ReasonFailedToReadMandatorySecret is the reason used in events emitted when a mandatory secret could not be read. + ReasonFailedToReadMandatorySecret = "FailedToReadMandatorySecret" + + // ReasonInvalidEnvironmentVariableNames is the reason used in events emitted when a configmap/secret referenced in a ".spec.containers[*].envFrom" field contains invalid environment variable names. + ReasonInvalidEnvironmentVariableNames = "InvalidEnvironmentVariableNames" +) + +var masterServices = sets.NewString("kubernetes") + +// PopulateEnvironmentVariables populates the environment of each container (and init container) in the specified pod. +func PopulateEnvironmentVariables(ctx context.Context, pod *corev1.Pod, rm utils.EnvResourceManager) error { + // Populate each init container's environment. + for idx := range pod.Spec.InitContainers { + if err := populateContainerEnvironment(ctx, pod, &pod.Spec.InitContainers[idx], rm); err != nil { + return err + } + } + // Populate each container's environment. + for idx := range pod.Spec.Containers { + if err := populateContainerEnvironment(ctx, pod, &pod.Spec.Containers[idx], rm); err != nil { + return err + } + } + return nil +} + +// populateContainerEnvironment populates the environment of a single container in the specified pod. +func populateContainerEnvironment(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager) error { + // Create an "environment map" based on the value of the specified container's ".envFrom" field. + tmpEnv, err := makeEnvironmentMapBasedOnEnvFrom(ctx, pod, container, rm) + if err != nil { + return err + } + // Create the final "environment map" for the container using the ".env" and ".envFrom" field + // and service environment variables. + err = makeEnvironmentMap(ctx, pod, container, rm, tmpEnv) + if err != nil { + return err + } + // Empty the container's ".envFrom" field and replace its ".env" field with the final, merged environment. + // Values in "env" (sourced from ".env") will override any values with the same key defined in "envFrom" (sourced from ".envFrom"). + // This is in accordance with what the Kubelet itself does. + // https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L557-L558 + container.EnvFrom = []corev1.EnvFromSource{} + + res := make([]corev1.EnvVar, 0, len(tmpEnv)) + + for key, val := range tmpEnv { + res = append(res, corev1.EnvVar{ + Name: key, + Value: val, + }) + } + container.Env = res + + return nil +} + +// getServiceEnvVarMap makes a map[string]string of env vars for services a +// pod in namespace ns should see. +// Based on getServiceEnvVarMap in kubelet_pods.go. +func getServiceEnvVarMap(rm utils.EnvResourceManager, ns string, enableServiceLinks bool) (map[string]string, error) { + var ( + serviceMap = make(map[string]*corev1.Service) + m = make(map[string]string) + ) + + services, err := rm.ListServices() + if err != nil { + return nil, err + } + + // project the services in namespace ns onto the master services + for i := range services { + service := services[i] + // ignore services where ClusterIP is "None" or empty + if !IsServiceIPSet(service) { + continue + } + serviceName := service.Name + + // We always want to add environment variables for master kubernetes service + // from the default namespace, even if enableServiceLinks is false. + // We also add environment variables for other services in the same + // namespace, if enableServiceLinks is true. + if service.Namespace == metav1.NamespaceDefault && masterServices.Has(serviceName) { + if _, exists := serviceMap[serviceName]; !exists { + serviceMap[serviceName] = service + } + } else if service.Namespace == ns && enableServiceLinks { + serviceMap[serviceName] = service + } + } + + mappedServices := make([]*corev1.Service, 0, len(serviceMap)) + for key := range serviceMap { + mappedServices = append(mappedServices, serviceMap[key]) + } + + for _, e := range FromServices(mappedServices) { + m[e.Name] = e.Value + } + return m, nil +} + +// makeEnvironmentMapBasedOnEnvFrom returns a map representing the resolved environment of the specified container after being populated from the entries in the ".envFrom" field. +func makeEnvironmentMapBasedOnEnvFrom(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager) (map[string]string, error) { + // Create a map to hold the resulting environment. + res := make(map[string]string) + // Iterate over "envFrom" references in order to populate the environment. +loop: + for _, envFrom := range container.EnvFrom { + switch { + // Handle population from a configmap. + case envFrom.ConfigMapRef != nil: + ef := envFrom.ConfigMapRef + // Check whether the configmap reference is optional. + // This will control whether we fail when unable to read the configmap. + optional := ef.Optional != nil && *ef.Optional + // Try to grab the referenced configmap. + m, err := rm.GetConfigMap(ef.Name, pod.Namespace) + if err != nil { + // We couldn't fetch the configmap. + // However, if the configmap reference is optional we should not fail. + if optional { + if errors.IsNotFound(err) { + klog.Warningf("configmap %q not found", ef.Name) + } else { + klog.Warningf("failed to read configmap %q: %v", ef.Name, err) + } + // Continue on to the next reference. + continue loop + } + // At this point we know the configmap reference is mandatory. + // Hence, we should return a meaningful error. + if errors.IsNotFound(err) { + klog.Warningf("configmap %q not found", ef.Name) + return nil, fmt.Errorf("configmap %q not found", ef.Name) + } + klog.Warningf("failed to read configmap %q", ef.Name) + return nil, fmt.Errorf("failed to fetch configmap %q: %v", ef.Name, err) + } + // At this point we have successfully fetched the target configmap. + // Iterate over the keys defined in the configmap and populate the environment accordingly. + // https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L581-L595 + invalidKeys := make([]string, 0) + mKeys: + for key, val := range m.Data { + // If a prefix has been defined, prepend it to the environment variable's name. + if len(envFrom.Prefix) > 0 { + key = envFrom.Prefix + key + } + // Make sure that the resulting key is a valid environment variable name. + // If it isn't, it should be appended to the list of invalid keys and skipped. + if errMsgs := apivalidation.IsEnvVarName(key); len(errMsgs) != 0 { + invalidKeys = append(invalidKeys, key) + continue mKeys + } + // Add the key and its value to the environment. + res[key] = val + } + // Report any invalid keys. + if len(invalidKeys) > 0 { + sort.Strings(invalidKeys) + klog.Warningf("keys [%s] from configmap %s/%s were skipped since they are invalid as environment variable names", strings.Join(invalidKeys, ", "), m.Namespace, m.Name) + } + // Handle population from a secret. + case envFrom.SecretRef != nil: + ef := envFrom.SecretRef + // Check whether the secret reference is optional. + // This will control whether we fail when unable to read the secret. + optional := ef.Optional != nil && *ef.Optional + // Try to grab the referenced secret. + s, err := rm.GetSecret(ef.Name, pod.Namespace) + if err != nil { + // We couldn't fetch the secret. + // However, if the secret reference is optional we should not fail. + if optional { + if errors.IsNotFound(err) { + klog.Warningf("secret %q not found", ef.Name) + } else { + klog.Warningf("failed to read secret %q: %v", ef.Name, err) + } + // Continue on to the next reference. + continue loop + } + // At this point we know the secret reference is mandatory. + // Hence, we should return a meaningful error. + if errors.IsNotFound(err) { + klog.Warningf("secret %q not found", ef.Name) + return nil, fmt.Errorf("secret %q not found", ef.Name) + } + klog.Warningf("failed to read secret %q", ef.Name) + return nil, fmt.Errorf("failed to fetch secret %q: %v", ef.Name, err) + } + // At this point we have successfully fetched the target secret. + // Iterate over the keys defined in the secret and populate the environment accordingly. + // https://github.com/kubernetes/kubernetes/blob/v1.13.1/pkg/kubelet/kubelet_pods.go#L581-L595 + invalidKeys := make([]string, 0) + sKeys: + for key, val := range s.Data { + // If a prefix has been defined, prepend it to the environment variable's name. + if len(envFrom.Prefix) > 0 { + key = envFrom.Prefix + key + } + // Make sure that the resulting key is a valid environment variable name. + // If it isn't, it should be appended to the list of invalid keys and skipped. + if errMsgs := apivalidation.IsEnvVarName(key); len(errMsgs) != 0 { + invalidKeys = append(invalidKeys, key) + continue sKeys + } + // Add the key and its value to the environment. + res[key] = string(val) + } + // Report any invalid keys. + if len(invalidKeys) > 0 { + sort.Strings(invalidKeys) + klog.Warningf("keys [%s] from secret %s/%s were skipped since they are invalid as environment variable names", strings.Join(invalidKeys, ", "), s.Namespace, s.Name) + } + } + } + // Return the populated environment. + return res, nil +} + +// makeEnvironmentMap returns a map representing the resolved environment of the specified container after being populated from the entries in the ".env" and ".envFrom" field. +func makeEnvironmentMap(ctx context.Context, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager, res map[string]string) error { + // TODO If pod.Spec.EnableServiceLinks is nil then fail as per 1.14 kubelet. + enableServiceLinks := corev1.DefaultEnableServiceLinks + if pod.Spec.EnableServiceLinks != nil { + enableServiceLinks = *pod.Spec.EnableServiceLinks + } + + // Note that there is a race between Kubelet seeing the pod and kubelet seeing the service. + // To avoid this users can: (1) wait between starting a service and starting; or (2) detect + // missing service env var and exit and be restarted; or (3) use DNS instead of env vars + // and keep trying to resolve the DNS name of the service (recommended). + svcEnv, err := getServiceEnvVarMap(rm, pod.Namespace, enableServiceLinks) + if err != nil { + return err + } + + // If the variable's Value is set, expand the `$(var)` references to other + // variables in the .Value field; the sources of variables are the declared + // variables of the container and the service environment variables. + // mappingFunc := expansion.MappingFuncFor(res, svcEnv) + + // Iterate over environment variables in order to populate the map. + for _, env := range container.Env { + envptr := env + val, err := getEnvironmentVariableValue(ctx, &envptr, pod, container, rm) + if err != nil { + return err + } + if val != nil { + res[env.Name] = *val + } + } + + // Append service env vars. + for k, v := range svcEnv { + if _, present := res[k]; !present { + res[k] = v + } + } + + return nil +} + +func getEnvironmentVariableValue(ctx context.Context, env *corev1.EnvVar, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager) (*string, error) { + if env.ValueFrom != nil { + return getEnvironmentVariableValueWithValueFrom(ctx, env, pod, container, rm) + } + // Handle values that have been directly provided after expanding variable references. + return &env.Value, nil +} + +func getEnvironmentVariableValueWithValueFrom(ctx context.Context, env *corev1.EnvVar, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager) (*string, error) { + // Handle population from a configmap key. + if env.ValueFrom.ConfigMapKeyRef != nil { + return getEnvironmentVariableValueWithValueFromConfigMapKeyRef(ctx, env, pod, container, rm) + } + + // Handle population from a secret key. + if env.ValueFrom.SecretKeyRef != nil { + return getEnvironmentVariableValueWithValueFromSecretKeyRef(ctx, env, pod, container, rm) + } + + // Handle population from a field (downward API). + if env.ValueFrom.FieldRef != nil { + return getEnvironmentVariableValueWithValueFromFieldRef(ctx, env, pod, container, rm) + } + if env.ValueFrom.ResourceFieldRef != nil { + // TODO Implement populating resource requests. + return nil, nil + } + + klog.Error("Unhandled environment variable with non-nil env.ValueFrom, do not know how to populate") + return nil, nil +} + +func getEnvironmentVariableValueWithValueFromConfigMapKeyRef(ctx context.Context, env *corev1.EnvVar, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager) (*string, error) { + // The environment variable must be set from a configmap. + vf := env.ValueFrom.ConfigMapKeyRef + // Check whether the key reference is optional. + // This will control whether we fail when unable to read the requested key. + optional := vf != nil && vf.Optional != nil && *vf.Optional + // Try to grab the referenced configmap. + m, err := rm.GetConfigMap(vf.Name, pod.Namespace) + if err != nil { + // We couldn't fetch the configmap. + // However, if the key reference is optional we should not fail. + if optional { + if errors.IsNotFound(err) { + klog.Warningf("skipping optional envvar %q: configmap %q not found", env.Name, vf.Name) + } else { + klog.Warningf("failed to read configmap %q: %v", vf.Name, err) + } + // Continue on to the next reference. + return nil, nil + } + // At this point we know the key reference is mandatory. + // Hence, we should return a meaningful error. + if errors.IsNotFound(err) { + klog.Warningf("configmap %q not found", vf.Name) + return nil, fmt.Errorf("configmap %q not found", vf.Name) + } + klog.Warningf("failed to read configmap %q", vf.Name) + return nil, fmt.Errorf("failed to read configmap %q: %v", vf.Name, err) + } + // At this point we have successfully fetched the target configmap. + // We must now try to grab the requested key. + var ( + keyExists bool + keyValue string + ) + if keyValue, keyExists = m.Data[vf.Key]; !keyExists { + // The requested key does not exist. + // However, we should not fail if the key reference is optional. + if optional { + // Continue on to the next reference. + klog.Warningf("skipping optional envvar %q: key %q does not exist in configmap %q", env.Name, vf.Key, vf.Name) + return nil, nil + } + // At this point we know the key reference is mandatory. + // Hence, we should fail. + klog.Warningf("key %q does not exist in configmap %q", vf.Key, vf.Name) + return nil, fmt.Errorf("configmap %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name) + } + // Populate the environment variable and continue on to the next reference. + return &keyValue, nil +} + +func getEnvironmentVariableValueWithValueFromSecretKeyRef(ctx context.Context, env *corev1.EnvVar, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager) (*string, error) { + vf := env.ValueFrom.SecretKeyRef + // Check whether the key reference is optional. + // This will control whether we fail when unable to read the requested key. + optional := vf != nil && vf.Optional != nil && *vf.Optional + // Try to grab the referenced secret. + s, err := rm.GetSecret(vf.Name, pod.Namespace) + if err != nil { + // We couldn't fetch the secret. + // However, if the key reference is optional we should not fail. + if optional { + if errors.IsNotFound(err) { + klog.Warningf("skipping optional envvar %q: secret %q not found", env.Name, vf.Name) + } else { + klog.Warningf("failed to read secret %q: %v", vf.Name, err) + klog.Warningf("skipping optional envvar %q: failed to read secret %q", env.Name, vf.Name) + } + // Continue on to the next reference. + return nil, nil + } + // At this point we know the key reference is mandatory. + // Hence, we should return a meaningful error. + if errors.IsNotFound(err) { + klog.Warningf("secret %q not found", vf.Name) + return nil, fmt.Errorf("secret %q not found", vf.Name) + } + klog.Warningf("failed to read secret %q", vf.Name) + return nil, fmt.Errorf("failed to read secret %q: %v", vf.Name, err) + } + // At this point we have successfully fetched the target secret. + // We must now try to grab the requested key. + var ( + keyExists bool + keyValue []byte + ) + if keyValue, keyExists = s.Data[vf.Key]; !keyExists { + // The requested key does not exist. + // However, we should not fail if the key reference is optional. + if optional { + // Continue on to the next reference. + klog.Warningf("skipping optional envvar %q: key %q does not exist in secret %q", env.Name, vf.Key, vf.Name) + return nil, nil + } + // At this point we know the key reference is mandatory. + // Hence, we should fail. + klog.Warningf("key %q does not exist in secret %q", vf.Key, vf.Name) + return nil, fmt.Errorf("secret %q doesn't contain the %q key required by pod %s", vf.Name, vf.Key, pod.Name) + } + // Populate the environment variable and continue on to the next reference. + ret := string(keyValue) + return &ret, nil +} + +// Handle population from a field (downward API). +func getEnvironmentVariableValueWithValueFromFieldRef(ctx context.Context, env *corev1.EnvVar, pod *corev1.Pod, container *corev1.Container, rm utils.EnvResourceManager) (*string, error) { + // https://github.com/virtual-kubelet/virtual-kubelet/issues/123 + vf := env.ValueFrom.FieldRef + + runtimeVal, err := podFieldSelectorRuntimeValue(vf, pod) + if err != nil { + return nil, err + } + + return &runtimeVal, nil +} + +// podFieldSelectorRuntimeValue returns the runtime value of the given +// selector for a pod. +func podFieldSelectorRuntimeValue(fs *corev1.ObjectFieldSelector, pod *corev1.Pod) (string, error) { + internalFieldPath, _, err := ConvertDownwardAPIFieldLabel(fs.APIVersion, fs.FieldPath, "") + if err != nil { + return "", err + } + switch internalFieldPath { + case "spec.nodeName": + return pod.Spec.NodeName, nil + case "spec.serviceAccountName": + return pod.Spec.ServiceAccountName, nil + } + return ExtractFieldPathAsString(pod, internalFieldPath) +} + +// ExtractFieldPathAsString extracts the field from the given object +// and returns it as a string. The object must be a pointer to an +// API type. +func ExtractFieldPathAsString(obj interface{}, fieldPath string) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } + + if path, subscript, ok := SplitMaybeSubscriptedPath(fieldPath); ok { + switch path { + case "metadata.annotations": + if errs := apivalidation.IsQualifiedName(strings.ToLower(subscript)); len(errs) != 0 { + return "", fmt.Errorf("invalid key subscript in %s: %s", fieldPath, strings.Join(errs, ";")) + } + return accessor.GetAnnotations()[subscript], nil + case "metadata.labels": + if errs := apivalidation.IsQualifiedName(subscript); len(errs) != 0 { + return "", fmt.Errorf("invalid key subscript in %s: %s", fieldPath, strings.Join(errs, ";")) + } + return accessor.GetLabels()[subscript], nil + default: + return "", fmt.Errorf("fieldPath %q does not support subscript", fieldPath) + } + } + + switch fieldPath { + case "metadata.annotations": + return FormatMap(accessor.GetAnnotations()), nil + case "metadata.labels": + return FormatMap(accessor.GetLabels()), nil + case "metadata.name": + return accessor.GetName(), nil + case "metadata.namespace": + return accessor.GetNamespace(), nil + case "metadata.uid": + return string(accessor.GetUID()), nil + } + + return "", fmt.Errorf("unsupported fieldPath: %v", fieldPath) +} + +// FormatMap formats map[string]string to a string. +func FormatMap(m map[string]string) (fmtStr string) { + // output with keys in sorted order to provide stable output + keys := sets.NewString() + for key := range m { + keys.Insert(key) + } + for _, key := range keys.List() { + fmtStr += fmt.Sprintf("%v=%q\n", key, m[key]) + } + fmtStr = strings.TrimSuffix(fmtStr, "\n") + + return +} + +// SplitMaybeSubscriptedPath checks whether the specified fieldPath is +// subscripted, and +// - if yes, this function splits the fieldPath into path and subscript, and +// returns (path, subscript, true). +// - if no, this function returns (fieldPath, "", false). +// +// Example inputs and outputs: +// - "metadata.annotations['myKey']" --> ("metadata.annotations", "myKey", true) +// - "metadata.annotations['a[b]c']" --> ("metadata.annotations", "a[b]c", true) +// - "metadata.labels[”]" --> ("metadata.labels", "", true) +// - "metadata.labels" --> ("metadata.labels", "", false) +func SplitMaybeSubscriptedPath(fieldPath string) (string, string, bool) { + if !strings.HasSuffix(fieldPath, "']") { + return fieldPath, "", false + } + s := strings.TrimSuffix(fieldPath, "']") + parts := strings.SplitN(s, "['", 2) + if len(parts) < 2 { + return fieldPath, "", false + } + if len(parts[0]) == 0 { + return fieldPath, "", false + } + return parts[0], parts[1], true +} + +// ConvertDownwardAPIFieldLabel converts the specified downward API field label +// and its value in the pod of the specified version to the internal version, +// and returns the converted label and value. This function returns an error if +// the conversion fails. +func ConvertDownwardAPIFieldLabel(version, label, value string) (string, string, error) { + if version != "v1" { + return "", "", fmt.Errorf("unsupported pod version: %s", version) + } + + if path, _, ok := SplitMaybeSubscriptedPath(label); ok { + switch path { + case "metadata.annotations", "metadata.labels": + return label, value, nil + default: + return "", "", fmt.Errorf("field label does not support subscript: %s", label) + } + } + + switch label { + case "metadata.annotations", + "metadata.labels", + "metadata.name", + "metadata.namespace", + "metadata.uid", + "spec.nodeName", + "spec.restartPolicy", + "spec.serviceAccountName", + "spec.schedulerName", + "status.phase", + "status.hostIP", + "status.podIP", + "status.podIPs": + return label, value, nil + // This is for backwards compatibility with old v1 clients which send spec.host + case "spec.host": + return "spec.nodeName", value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } +} + +// this function aims to check if the service's ClusterIP is set or not +// the objective is not to perform validation here +func IsServiceIPSet(service *corev1.Service) bool { + return service.Spec.ClusterIP != corev1.ClusterIPNone && service.Spec.ClusterIP != "" +} + +// provided as an argument. +func FromServices(services []*corev1.Service) []corev1.EnvVar { + var result []corev1.EnvVar + for i := range services { + service := services[i] + + // ignore services where ClusterIP is "None" or empty + // the services passed to this method should be pre-filtered + // only services that have the cluster IP set should be included here + if !IsServiceIPSet(service) { + continue + } + + // Host + name := makeEnvVariableName(service.Name) + "_SERVICE_HOST" + result = append(result, corev1.EnvVar{Name: name, Value: service.Spec.ClusterIP}) + // First port - give it the backwards-compatible name + name = makeEnvVariableName(service.Name) + "_SERVICE_PORT" + result = append(result, corev1.EnvVar{Name: name, Value: strconv.Itoa(int(service.Spec.Ports[0].Port))}) + // All named ports (only the first may be unnamed, checked in validation) + for i := range service.Spec.Ports { + sp := &service.Spec.Ports[i] + if sp.Name != "" { + pn := name + "_" + makeEnvVariableName(sp.Name) + result = append(result, corev1.EnvVar{Name: pn, Value: strconv.Itoa(int(sp.Port))}) + } + } + // Docker-compatible vars. + result = append(result, makeLinkVariables(service)...) + } + return result +} + +func makeEnvVariableName(str string) string { + // TODO: If we simplify to "all names are DNS1123Subdomains" this + // will need two tweaks: + // 1) Handle leading digits + // 2) Handle dots + return strings.ToUpper(strings.Replace(str, "-", "_", -1)) +} + +func makeLinkVariables(service *corev1.Service) []corev1.EnvVar { + prefix := makeEnvVariableName(service.Name) + all := []corev1.EnvVar{} + for i := range service.Spec.Ports { + sp := &service.Spec.Ports[i] + + protocol := string(corev1.ProtocolTCP) + if sp.Protocol != "" { + protocol = string(sp.Protocol) + } + + hostPort := net.JoinHostPort(service.Spec.ClusterIP, strconv.Itoa(int(sp.Port))) + + if i == 0 { + // Docker special-cases the first port. + all = append(all, corev1.EnvVar{ + Name: prefix + "_PORT", + Value: fmt.Sprintf("%s://%s", strings.ToLower(protocol), hostPort), + }) + } + portPrefix := fmt.Sprintf("%s_PORT_%d_%s", prefix, sp.Port, strings.ToUpper(protocol)) + all = append(all, []corev1.EnvVar{ + { + Name: portPrefix, + Value: fmt.Sprintf("%s://%s", strings.ToLower(protocol), hostPort), + }, + { + Name: portPrefix + "_PROTO", + Value: strings.ToLower(protocol), + }, + { + Name: portPrefix + "_PORT", + Value: strconv.Itoa(int(sp.Port)), + }, + { + Name: portPrefix + "_ADDR", + Value: service.Spec.ClusterIP, + }, + }...) + } + return all +} diff --git a/pkg/utils/pod.go b/pkg/utils/podutils/pod.go similarity index 91% rename from pkg/utils/pod.go rename to pkg/utils/podutils/pod.go index 472090f5f..5c5df47db 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/podutils/pod.go @@ -1,4 +1,4 @@ -package utils +package podutils import ( "encoding/json" @@ -9,6 +9,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog" + + "github.com/kosmos.io/kosmos/pkg/utils" ) func GetSecrets(pod *corev1.Pod) []string { @@ -72,7 +74,7 @@ func SetObjectGlobal(obj *metav1.ObjectMeta) { if obj.Annotations == nil { obj.Annotations = map[string]string{} } - obj.Annotations[KosmosGlobalLabel] = "true" + obj.Annotations[utils.KosmosGlobalLabel] = "true" } func SetUnstructuredObjGlobal(unstructuredObj *unstructured.Unstructured) { @@ -80,7 +82,7 @@ func SetUnstructuredObjGlobal(unstructuredObj *unstructured.Unstructured) { if annotationsMap == nil { annotationsMap = map[string]string{} } - annotationsMap[KosmosGlobalLabel] = "true" + annotationsMap[utils.KosmosGlobalLabel] = "true" unstructuredObj.SetAnnotations(annotationsMap) } @@ -147,7 +149,7 @@ func FitPod(pod *corev1.Pod, ignoreLabels []string) *corev1.Pod { if podCopy.Annotations == nil { podCopy.Annotations = make(map[string]string) } - podCopy.Labels[KosmosPodLabel] = "true" + podCopy.Labels[utils.KosmosPodLabel] = "true" cns := ConvertAnnotations(pod.Annotations) recoverSelectors(podCopy, cns) podCopy.Spec.Containers = fitContainers(pod.Spec.Containers) @@ -164,7 +166,7 @@ func FitPod(pod *corev1.Pod, ignoreLabels []string) *corev1.Pod { if err != nil { return podCopy } - podCopy.Annotations[KosmosTrippedLabels] = string(trippedStr) + podCopy.Annotations[utils.KosmosTrippedLabels] = string(trippedStr) } return podCopy @@ -189,14 +191,14 @@ func fitContainers(containers []corev1.Container) []corev1.Container { } func IsKosmosPod(pod *corev1.Pod) bool { - if pod.Labels != nil && pod.Labels[KosmosPodLabel] == "true" { + if pod.Labels != nil && pod.Labels[utils.KosmosPodLabel] == "true" { return true } return false } func RecoverLabels(labels map[string]string, annotations map[string]string) { - trippedLabels := annotations[KosmosTrippedLabels] + trippedLabels := annotations[utils.KosmosTrippedLabels] if trippedLabels == "" { return } @@ -234,7 +236,7 @@ func GetUpdatedPod(orig, update *corev1.Pod, ignoreLabels []string) { if update.Annotations == nil { update.Annotations = make(map[string]string) } - if orig.Annotations[KosmosSelectorKey] != update.Annotations[KosmosSelectorKey] { + if orig.Annotations[utils.KosmosSelectorKey] != update.Annotations[utils.KosmosSelectorKey] { if cns := ConvertAnnotations(update.Annotations); cns != nil { orig.Spec.Tolerations = cns.Tolerations } @@ -247,16 +249,16 @@ func GetUpdatedPod(orig, update *corev1.Pod, ignoreLabels []string) { } } -func ConvertAnnotations(annotation map[string]string) *ClustersNodeSelection { +func ConvertAnnotations(annotation map[string]string) *utils.ClustersNodeSelection { if annotation == nil { return nil } - val := annotation[KosmosSelectorKey] + val := annotation[utils.KosmosSelectorKey] if len(val) == 0 { return nil } - var cns ClustersNodeSelection + var cns utils.ClustersNodeSelection err := json.Unmarshal([]byte(val), &cns) if err != nil { return nil @@ -264,7 +266,7 @@ func ConvertAnnotations(annotation map[string]string) *ClustersNodeSelection { return &cns } -func recoverSelectors(pod *corev1.Pod, cns *ClustersNodeSelection) { +func recoverSelectors(pod *corev1.Pod, cns *utils.ClustersNodeSelection) { if cns != nil { pod.Spec.NodeSelector = cns.NodeSelector pod.Spec.Tolerations = cns.Tolerations