diff --git a/apis/apps/v1alpha1/node_pod_probe_types.go b/apis/apps/v1alpha1/node_pod_probe_types.go index 0f7fdaa0ff..9de5a80eae 100644 --- a/apis/apps/v1alpha1/node_pod_probe_types.go +++ b/apis/apps/v1alpha1/node_pod_probe_types.go @@ -37,14 +37,12 @@ type PodProbe struct { } type ContainerProbe struct { - // probe name, unique within the Pod(Even between different containers, they cannot be the same) + // Name is podProbeMarker.Name#probe.Name Name string `json:"name"` // container name ContainerName string `json:"containerName"` // container probe spec Probe ContainerProbeSpec `json:"probe"` - // Used for NodeProbeProbe to quickly find the corresponding PodProbeMarker resource. - PodProbeMarkerName string `json:"podProbeMarkerName,omitempty"` } type NodePodProbeStatus struct { @@ -64,7 +62,7 @@ type PodProbeStatus struct { } type ContainerProbeState struct { - // probe name + // Name is podProbeMarker.Name#probe.Name Name string `json:"name"` // container probe exec state, True or False State ProbeState `json:"state"` diff --git a/config/crd/bases/apps.kruise.io_nodepodprobes.yaml b/config/crd/bases/apps.kruise.io_nodepodprobes.yaml index 2b34e034a4..4719de38a3 100644 --- a/config/crd/bases/apps.kruise.io_nodepodprobes.yaml +++ b/config/crd/bases/apps.kruise.io_nodepodprobes.yaml @@ -54,12 +54,7 @@ spec: description: container name type: string name: - description: probe name, unique within the Pod(Even between - different containers, they cannot be the same) - type: string - podProbeMarkerName: - description: Used for NodeProbeProbe to quickly find the - corresponding PodProbeMarker resource. + description: Name is podProbeMarker.Name#probe.Name type: string probe: description: container probe spec @@ -243,7 +238,7 @@ spec: error message type: string name: - description: probe name + description: Name is podProbeMarker.Name#probe.Name type: string state: description: container probe exec state, True or False diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 327405f53d..ec0e7fd368 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -17,6 +17,8 @@ resources: - bases/apps.kruise.io_workloadspreads.yaml - bases/apps.kruise.io_ephemeraljobs.yaml - bases/apps.kruise.io_persistentpodstates.yaml +- bases/apps.kruise.io_podprobemarkers.yaml +- bases/apps.kruise.io_nodepodprobes.yaml # +kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index a6c4eac58b..8aae74fbbb 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -305,6 +305,26 @@ rules: - get - patch - update +- apiGroups: + - apps.kruise.io + resources: + - nodepodprobes + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - apps.kruise.io + resources: + - nodepodprobes/status + verbs: + - get + - patch + - update - apiGroups: - apps.kruise.io resources: @@ -325,6 +345,26 @@ rules: - get - patch - update +- apiGroups: + - apps.kruise.io + resources: + - podprobemarkers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - apps.kruise.io + resources: + - podprobemarkers/status + verbs: + - get + - patch + - update - apiGroups: - apps.kruise.io resources: diff --git a/pkg/controller/controllers.go b/pkg/controller/controllers.go index acfa849c29..081ecd22f5 100644 --- a/pkg/controller/controllers.go +++ b/pkg/controller/controllers.go @@ -26,7 +26,9 @@ import ( "github.com/openkruise/kruise/pkg/controller/ephemeraljob" "github.com/openkruise/kruise/pkg/controller/imagepulljob" "github.com/openkruise/kruise/pkg/controller/nodeimage" + "github.com/openkruise/kruise/pkg/controller/nodepodprobe" "github.com/openkruise/kruise/pkg/controller/persistentpodstate" + "github.com/openkruise/kruise/pkg/controller/podprobemarker" "github.com/openkruise/kruise/pkg/controller/podreadiness" "github.com/openkruise/kruise/pkg/controller/podunavailablebudget" "github.com/openkruise/kruise/pkg/controller/resourcedistribution" @@ -59,6 +61,8 @@ func init() { controllerAddFuncs = append(controllerAddFuncs, ephemeraljob.Add) controllerAddFuncs = append(controllerAddFuncs, containerlauchpriority.Add) controllerAddFuncs = append(controllerAddFuncs, persistentpodstate.Add) + controllerAddFuncs = append(controllerAddFuncs, podprobemarker.Add) + controllerAddFuncs = append(controllerAddFuncs, nodepodprobe.Add) } func SetupWithManager(m manager.Manager) error { diff --git a/pkg/controller/nodeimage/nodeimage_event_handler.go b/pkg/controller/nodeimage/nodeimage_event_handler.go index 09bc3034de..06c2860497 100644 --- a/pkg/controller/nodeimage/nodeimage_event_handler.go +++ b/pkg/controller/nodeimage/nodeimage_event_handler.go @@ -33,6 +33,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) +const ( + VirtualKubelet = "virtual-kubelet" +) + type nodeHandler struct { client.Reader } @@ -41,6 +45,9 @@ var _ handler.EventHandler = &nodeHandler{} func (e *nodeHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { node := evt.Object.(*v1.Node) + if node.Labels["type"] == VirtualKubelet { + return + } if node.DeletionTimestamp != nil { e.nodeDelete(node, q) return @@ -53,6 +60,9 @@ func (e *nodeHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingIn func (e *nodeHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { node := evt.ObjectNew.(*v1.Node) + if node.Labels["type"] == VirtualKubelet { + return + } if node.DeletionTimestamp != nil { e.nodeDelete(node, q) } else { @@ -62,6 +72,9 @@ func (e *nodeHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInte func (e *nodeHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { node := evt.Object.(*v1.Node) + if node.Labels["type"] == VirtualKubelet { + return + } e.nodeDelete(node, q) } diff --git a/pkg/controller/nodepodprobe/node_pod_probe_controller.go b/pkg/controller/nodepodprobe/node_pod_probe_controller.go new file mode 100644 index 0000000000..1eca44383a --- /dev/null +++ b/pkg/controller/nodepodprobe/node_pod_probe_controller.go @@ -0,0 +1,362 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodepodprobe + +import ( + "context" + "flag" + "fmt" + "reflect" + "strings" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" + "github.com/openkruise/kruise/pkg/util/controllerfinder" + utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + "github.com/openkruise/kruise/pkg/util/ratelimiter" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + kubecontroller "k8s.io/kubernetes/pkg/controller" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +var ( + nodePodProbeCreationDelayAfterNodeReady = time.Second * 30 +) + +func init() { + flag.IntVar(&concurrentReconciles, "nodepodprobe-workers", concurrentReconciles, "Max concurrent workers for NodePodProbe controller.") +} + +var ( + concurrentReconciles = 3 + controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("NodePodProbe") +) + +/** +* USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller +* business logic. Delete these comments after modifying this file.* + */ + +// Add creates a new NodePodProbe Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func Add(mgr manager.Manager) error { + if !utildiscovery.DiscoverGVK(controllerKind) { + return nil + } + return add(mgr, newReconciler(mgr)) +} + +// newReconciler returns a new reconcile.Reconciler +func newReconciler(mgr manager.Manager) reconcile.Reconciler { + cli := utilclient.NewClientFromManager(mgr, "NodePodProbe-controller") + return &ReconcileNodePodProbe{ + Client: cli, + scheme: mgr.GetScheme(), + finder: controllerfinder.Finder, + } +} + +// add adds a new Controller to mgr with r as the reconcile.Reconciler +func add(mgr manager.Manager, r reconcile.Reconciler) error { + // Create a new controller + c, err := controller.New("NodePodProbe-controller", mgr, controller.Options{ + Reconciler: r, MaxConcurrentReconciles: concurrentReconciles, + RateLimiter: ratelimiter.DefaultControllerRateLimiter()}) + if err != nil { + return err + } + + // watch for changes to NodePodProbe + if err = c.Watch(&source.Kind{Type: &appsv1alpha1.NodePodProbe{}}, &enqueueRequestForNodePodProbe{}); err != nil { + return err + } + + // watch for changes to pod + if err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &enqueueRequestForPod{reader: mgr.GetClient()}); err != nil { + return err + } + + // watch for changes to node + if err = c.Watch(&source.Kind{Type: &corev1.Node{}}, &enqueueRequestForNode{Reader: mgr.GetClient()}); err != nil { + return err + } + + return nil +} + +var _ reconcile.Reconciler = &ReconcileNodePodProbe{} + +// ReconcileNodePodProbe reconciles a NodePodProbe object +type ReconcileNodePodProbe struct { + client.Client + scheme *runtime.Scheme + + finder *controllerfinder.ControllerFinder +} + +// +kubebuilder:rbac:groups=apps.kruise.io,resources=nodepodprobes,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps.kruise.io,resources=nodepodprobes/status,verbs=get;update;patch + +// Reconcile reads that state of the cluster for a NodePodProbe object and makes changes based on the state read +// and what is in the NodePodProbe.Spec +func (r *ReconcileNodePodProbe) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) { + err := r.syncNodePodProbe(req.Name) + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +func (r *ReconcileNodePodProbe) syncNodePodProbe(name string) error { + // Fetch the NodePodProbe instance + npp := &appsv1alpha1.NodePodProbe{} + err := r.Get(context.TODO(), client.ObjectKey{Name: name}, npp) + if err != nil { + if errors.IsNotFound(err) { + npp.Name = name + return r.Create(context.TODO(), npp) + } + // Error reading the object - requeue the request. + return err + } + // Fetch Node + node := &corev1.Node{} + err = r.Get(context.TODO(), client.ObjectKey{Name: name}, node) + if err != nil && !errors.IsNotFound(err) { + return err + } else if errors.IsNotFound(err) || !node.DeletionTimestamp.IsZero() { + return r.Delete(context.TODO(), npp) + } + // If Pod is deleted, then remove podProbe from NodePodProbe.Spec + matchedPods, err := r.syncPodFromNodePodProbe(npp) + if err != nil { + return err + } + for _, status := range npp.Status.PodProbeStatuses { + pod, ok := matchedPods[status.UID] + if !ok { + continue + } + // Write podProbe state to Pod metadata and condition + if err = r.updatePodProbeStatus(pod, status); err != nil { + return err + } + } + return nil +} + +func (r *ReconcileNodePodProbe) syncPodFromNodePodProbe(npp *appsv1alpha1.NodePodProbe) (map[string]*corev1.Pod, error) { + // map[pod.uid]=Pod + matchedPods := map[string]*corev1.Pod{} + for _, obj := range npp.Spec.PodProbes { + pod := &corev1.Pod{} + err := r.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, pod) + if err != nil && !errors.IsNotFound(err) { + klog.Errorf("NodePodProbe get pod(%s/%s) failed: %s", obj.Namespace, obj.Name, err.Error()) + return nil, err + } + if errors.IsNotFound(err) || !kubecontroller.IsPodActive(pod) || string(pod.UID) != obj.UID { + continue + } + matchedPods[string(pod.UID)] = pod + } + + newSpec := appsv1alpha1.NodePodProbeSpec{} + for i := range npp.Spec.PodProbes { + obj := npp.Spec.PodProbes[i] + if _, ok := matchedPods[obj.UID]; ok { + newSpec.PodProbes = append(newSpec.PodProbes, obj) + } + } + if reflect.DeepEqual(newSpec, npp.Spec) { + return matchedPods, nil + } + + nppClone := &appsv1alpha1.NodePodProbe{} + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := r.Client.Get(context.TODO(), types.NamespacedName{Name: npp.Name}, nppClone); err != nil { + klog.Errorf("error getting updated npp %s from client", npp.Name) + } + if reflect.DeepEqual(newSpec, nppClone.Spec) { + return nil + } + nppClone.Spec = newSpec + return r.Client.Update(context.TODO(), nppClone) + }) + if err != nil { + klog.Errorf("NodePodProbe update NodePodProbe(%s) failed:%s", npp.Name, err.Error()) + return nil, err + } + klog.V(3).Infof("NodePodProbe update NodePodProbe(%s) from(%s) -> to(%s) success", npp.Name, util.DumpJSON(npp.Spec), util.DumpJSON(newSpec)) + return matchedPods, nil +} + +func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status appsv1alpha1.PodProbeStatus) error { + // map[probe.name]->probeState + currentConditions := make(map[string]appsv1alpha1.ProbeState) + for _, condition := range pod.Status.Conditions { + currentConditions[string(condition.Type)] = appsv1alpha1.ProbeState(condition.Status) + } + type metadata struct { + Labels map[string]interface{} `json:"labels,omitempty"` + Annotations map[string]interface{} `json:"annotations,omitempty"` + } + // patch labels or annotations in pod + probeMetadata := metadata{ + Labels: map[string]interface{}{}, + Annotations: map[string]interface{}{}, + } + // pod status condition record probe result + var probeConditions []corev1.PodCondition + var err error + for i := range status.ProbeStates { + probeState := status.ProbeStates[i] + // ignore the probe state + if probeState.State == "" || probeState.State == currentConditions[probeState.Name] { + continue + } + + var conStatus corev1.ConditionStatus + if probeState.State == appsv1alpha1.ProbeSucceeded { + conStatus = corev1.ConditionTrue + } else { + conStatus = corev1.ConditionFalse + } + ppmName, probeName := strings.Split(probeState.Name, "#")[0], strings.Split(probeState.Name, "#")[1] + probeConditions = append(probeConditions, corev1.PodCondition{ + // type -> PodProbeMarker#podProbeMarker.Name#probe.Name + Type: corev1.PodConditionType(fmt.Sprintf("PodProbeMarker#%s#%s", ppmName, probeName)), + Status: conStatus, + LastProbeTime: probeState.LastProbeTime, + LastTransitionTime: probeState.LastTransitionTime, + Message: probeState.Message, + }) + // marker pod labels & annotations according to probe state + // fetch NodePodProbe + ppm := &appsv1alpha1.PodProbeMarker{} + err = r.Get(context.TODO(), client.ObjectKey{Namespace: pod.Namespace, Name: ppmName}, ppm) + if err != nil { + // when NodePodProbe is deleted, should delete probes from NodePodProbe.spec + if errors.IsNotFound(err) { + continue + } + klog.Errorf("NodePodProbe(%s) get pod(%s/%s) failed: %s", ppmName, pod.Namespace, pod.Name, err.Error()) + return err + } else if !ppm.DeletionTimestamp.IsZero() { + continue + } + var policy []appsv1alpha1.ProbeMarkerPolicy + for _, probe := range ppm.Spec.Probes { + if probe.Name == probeName { + policy = probe.MarkerPolicy + break + } + } + if len(policy) == 0 { + continue + } + // patch pod labels & annotations + var matchedPolicy *appsv1alpha1.ProbeMarkerPolicy + for j := range policy { + if policy[j].State == probeState.State { + matchedPolicy = &policy[j] + break + } + } + // find matched policy + if matchedPolicy != nil { + for k, v := range matchedPolicy.Labels { + probeMetadata.Labels[k] = v + } + for k, v := range matchedPolicy.Annotations { + probeMetadata.Annotations[k] = v + } + continue + } + // If only one Marker Policy is defined, for example: only define State=Succeeded, Patch Labels[healthy]='true'. + // When the probe execution success, kruise will patch labels[healthy]='true' to pod. + // And when the probe execution fails, Label[healthy] will be deleted. + for k := range policy[0].Labels { + probeMetadata.Labels[k] = nil + } + for k := range policy[0].Annotations { + probeMetadata.Annotations[k] = nil + } + } + // probe condition no changed, continue + if len(probeConditions) == 0 { + return nil + } + + //update pod metadata and status condition + podClone := pod.DeepCopy() + if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err = r.Client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, podClone); err != nil { + klog.Errorf("error getting updated pod(%s/%s) from client", pod.Namespace, pod.Name) + return err + } + oldStatus := podClone.DeepCopy() + for i := range probeConditions { + condition := probeConditions[i] + util.SetPodCondition(podClone, condition) + } + oldMetadata := podClone.ObjectMeta.DeepCopy() + if podClone.Annotations == nil { + podClone.Annotations = map[string]string{} + } + for k, v := range probeMetadata.Labels { + // delete the label + if v == nil { + delete(podClone.Labels, k) + // patch the label + } else { + podClone.Labels[k] = v.(string) + } + } + for k, v := range probeMetadata.Annotations { + // delete the annotation + if v == nil { + delete(podClone.Annotations, k) + // patch the annotation + } else { + podClone.Annotations[k] = v.(string) + } + } + if reflect.DeepEqual(oldStatus, podClone.Status) && reflect.DeepEqual(oldMetadata.Labels, podClone.Labels) && + reflect.DeepEqual(oldMetadata.Annotations, podClone.Annotations) { + return nil + } + return r.Client.Status().Update(context.TODO(), podClone) + }); err != nil { + klog.Errorf("NodePodProbe patch pod(%s/%s) status failed: %s", podClone.Namespace, podClone.Name, err.Error()) + return err + } + klog.V(3).Infof("NodePodProbe update pod(%s/%s) status conditions(%s) success", podClone.Namespace, podClone.Name, util.DumpJSON(probeConditions)) + return nil +} diff --git a/pkg/controller/nodepodprobe/node_pod_probe_controller_test.go b/pkg/controller/nodepodprobe/node_pod_probe_controller_test.go new file mode 100644 index 0000000000..9ba4198ae2 --- /dev/null +++ b/pkg/controller/nodepodprobe/node_pod_probe_controller_test.go @@ -0,0 +1,707 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodepodprobe + +import ( + "context" + "reflect" + "testing" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util/controllerfinder" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func init() { + scheme = runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + _ = appsv1alpha1.AddToScheme(scheme) +} + +var ( + scheme *runtime.Scheme + + demoPodProbeMarker = appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-1", + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsv1alpha1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + } + + demoNodePodProbe = appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +) + +func TestSyncNodePodProbe(t *testing.T) { + cases := []struct { + name string + req ctrl.Request + getPods func() []*corev1.Pod + getPodProbeMarkers func() []*appsv1alpha1.PodProbeMarker + getNodePodProbes func() []*appsv1alpha1.NodePodProbe + getNode func() []*corev1.Node + expectPods func() []*corev1.Pod + }{ + { + name: "test1, probe success", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoNodePodProbe.Name, + }, + }, + getNode: func() []*corev1.Node { + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + } + return nodes + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Labels: map[string]string{ + "app": "test", + }, + UID: types.UID("pod-1-uid"), + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Labels: map[string]string{ + "app": "test", + }, + UID: types.UID("pod-2-uid"), + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarker.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + podProbe2 := appsv1alpha1.PodProbe{ + Name: "pod-2", + UID: "pod-2-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + }, + } + demo.Spec.PodProbes = append(demo.Spec.PodProbes, podProbe2) + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + { + Name: "pod-2", + UID: "pod-2-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + expectPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Labels: map[string]string{ + "app": "test", + "server-healthy": "true", + }, + UID: types.UID("pod-1-uid"), + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodConditionType("PodProbeMarker#ppm-1#healthy"), + Status: corev1.ConditionTrue, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Labels: map[string]string{ + "app": "test", + "server-healthy": "false", + }, + UID: types.UID("pod-2-uid"), + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodConditionType("PodProbeMarker#ppm-1#healthy"), + Status: corev1.ConditionFalse, + }, + }, + }, + }, + } + return pods + }, + }, + { + name: "test2, probe failed", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoNodePodProbe.Name, + }, + }, + getNode: func() []*corev1.Node { + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + } + return nodes + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + "server-healthy": "true", + }, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodConditionType("PodProbeMarker#ppm-1#healthy"), + Status: corev1.ConditionTrue, + }, + { + Type: corev1.PodConditionType("PodProbeMarker#ppm-2#other"), + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + demo := demoPodProbeMarker.DeepCopy() + demo.Spec.Probes[0].MarkerPolicy = demo.Spec.Probes[0].MarkerPolicy[:1] + return []*appsv1alpha1.PodProbeMarker{demo} + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + expectPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodConditionType("PodProbeMarker#ppm-1#healthy"), + Status: corev1.ConditionFalse, + }, + { + Type: corev1.PodConditionType("PodProbeMarker#ppm-2#other"), + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + for _, obj := range cs.getPods() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create Pod failed: %s", err.Error()) + } + } + for _, obj := range cs.getPodProbeMarkers() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create PodProbeMarker failed: %s", err.Error()) + } + } + for _, obj := range cs.getNodePodProbes() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create NodePodProbes failed: %s", err.Error()) + } + } + for _, obj := range cs.getNode() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create Node failed: %s", err.Error()) + } + } + controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient} + recon := ReconcileNodePodProbe{Client: fakeClient} + _, err := recon.Reconcile(context.TODO(), cs.req) + if err != nil { + t.Fatalf("Reconcile failed: %s", err.Error()) + } + if !checkPodMarkerEqual(fakeClient, t, cs.expectPods()) { + t.Fatalf("Reconcile failed") + } + }) + } +} + +func checkPodMarkerEqual(c client.WithWatch, t *testing.T, expect []*corev1.Pod) bool { + for i := range expect { + obj := expect[i] + pod := &corev1.Pod{} + err := c.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, pod) + if err != nil { + t.Fatalf("get NodePodProbe failed: %s", err.Error()) + return false + } + if !reflect.DeepEqual(obj.Labels, pod.Labels) || !reflect.DeepEqual(obj.Annotations, pod.Annotations) || + !reflect.DeepEqual(obj.Status.Conditions, pod.Status.Conditions) { + return false + } + } + return true +} + +func TestSyncPodFromNodePodProbe(t *testing.T) { + cases := []struct { + name string + req ctrl.Request + getPods func() []*corev1.Pod + getNodePodProbe func() *appsv1alpha1.NodePodProbe + expectNodePodProbe func() *appsv1alpha1.NodePodProbe + getNode func() []*corev1.Node + }{ + { + name: "test1, pod no changed", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: "node-1", + }, + }, + getNode: func() []*corev1.Node { + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + } + return nodes + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + }, + } + return pods + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := &appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + }, + { + Name: "pod-2", + }, + }, + }, + } + return demo + }, + expectNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := &appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + }, + { + Name: "pod-2", + }, + }, + }, + } + return demo + }, + }, + { + name: "test2, pod not found", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: "node-1", + }, + }, + getNode: func() []*corev1.Node { + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + } + return nodes + }, + getPods: func() []*corev1.Pod { + now := metav1.Now() + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Labels: map[string]string{ + "app": "test", + }, + DeletionTimestamp: &now, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + }, + } + return pods + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := &appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + }, + { + Name: "pod-2", + }, + }, + }, + } + return demo + }, + expectNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := &appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{}, + } + return demo + }, + }, + { + name: "test3, pod uid changed", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: "node-1", + }, + }, + getNode: func() []*corev1.Node { + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + } + return nodes + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid-02"), + Labels: map[string]string{ + "app": "test", + }, + }, + }, + } + return pods + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := &appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid-01", + }, + }, + }, + } + return demo + }, + expectNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := &appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{}, + } + return demo + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + for _, obj := range cs.getPods() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create Pod failed: %s", err.Error()) + } + } + for _, obj := range cs.getNode() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create Node failed: %s", err.Error()) + } + } + err := fakeClient.Create(context.TODO(), cs.getNodePodProbe()) + if err != nil { + t.Fatalf("create NodePodProbes failed: %s", err.Error()) + } + + controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient} + recon := ReconcileNodePodProbe{Client: fakeClient} + _, err = recon.Reconcile(context.TODO(), cs.req) + if err != nil { + t.Fatalf("Reconcile failed: %s", err.Error()) + } + if !checkNodePodProbeEqual(fakeClient, t, []*appsv1alpha1.NodePodProbe{cs.expectNodePodProbe()}) { + t.Fatalf("Reconcile failed") + } + }) + } +} + +func checkNodePodProbeEqual(c client.WithWatch, t *testing.T, expect []*appsv1alpha1.NodePodProbe) bool { + for i := range expect { + obj := expect[i] + npp := &appsv1alpha1.NodePodProbe{} + err := c.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, npp) + if err != nil { + t.Fatalf("get NodePodProbe failed: %s", err.Error()) + return false + } + if !reflect.DeepEqual(obj.Spec, npp.Spec) { + return false + } + } + return true +} diff --git a/pkg/controller/nodepodprobe/nodepodprobe_event_handler.go b/pkg/controller/nodepodprobe/nodepodprobe_event_handler.go new file mode 100644 index 0000000000..8e30e56939 --- /dev/null +++ b/pkg/controller/nodepodprobe/nodepodprobe_event_handler.go @@ -0,0 +1,250 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodepodprobe + +import ( + "context" + "reflect" + "time" + + appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + kubecontroller "k8s.io/kubernetes/pkg/controller" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var _ handler.EventHandler = &enqueueRequestForNodePodProbe{} + +type enqueueRequestForNodePodProbe struct{} + +func (p *enqueueRequestForNodePodProbe) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + obj, ok := evt.Object.(*appsalphav1.NodePodProbe) + if !ok { + return + } + p.queue(q, obj) +} + +func (p *enqueueRequestForNodePodProbe) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +} + +func (p *enqueueRequestForNodePodProbe) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +} + +func (p *enqueueRequestForNodePodProbe) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + // must be deep copy before update the objection + new, ok := evt.ObjectNew.(*appsalphav1.NodePodProbe) + if !ok { + return + } + old, ok := evt.ObjectOld.(*appsalphav1.NodePodProbe) + if !ok { + return + } + if checkNodePodProbeStatusEqual(new.Status.DeepCopy(), old.Status.DeepCopy()) { + return + } + p.queue(q, new) +} + +func checkNodePodProbeStatusEqual(obj1, obj2 *appsalphav1.NodePodProbeStatus) bool { + // ignore LastProbeTime, LastTransitionTime and Message + t := metav1.Now() + for i := range obj1.PodProbeStatuses { + podProbe := &obj1.PodProbeStatuses[i] + for j := range podProbe.ProbeStates { + state := &podProbe.ProbeStates[j] + state.LastProbeTime = t + state.LastTransitionTime = t + state.Message = "" + } + } + for i := range obj2.PodProbeStatuses { + podProbe := &obj2.PodProbeStatuses[i] + for j := range podProbe.ProbeStates { + state := &podProbe.ProbeStates[j] + state.LastProbeTime = t + state.LastTransitionTime = t + state.Message = "" + } + } + return reflect.DeepEqual(obj1, obj2) +} + +func (p *enqueueRequestForNodePodProbe) queue(q workqueue.RateLimitingInterface, npp *appsalphav1.NodePodProbe) { + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: npp.Name, + }, + }) +} + +var _ handler.EventHandler = &enqueueRequestForPod{} + +type enqueueRequestForPod struct { + reader client.Reader +} + +func (p *enqueueRequestForPod) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {} + +func (p *enqueueRequestForPod) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + obj, ok := evt.Object.(*corev1.Pod) + if !ok { + return + } + // remove pod probe from nodePodProbe.spec + if obj.Spec.NodeName == "" { + return + } + npp := &appsalphav1.NodePodProbe{} + if err := p.reader.Get(context.TODO(), client.ObjectKey{Name: obj.Spec.NodeName}, npp); err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("Get NodePodProbe(%s) failed: %s", obj.Spec.NodeName) + } + return + } + for _, probe := range npp.Spec.PodProbes { + if probe.Namespace == obj.Namespace && probe.Name == obj.Name { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: obj.Spec.NodeName}}) + break + } + } +} + +func (p *enqueueRequestForPod) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +} + +func (p *enqueueRequestForPod) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + new, ok := evt.ObjectNew.(*corev1.Pod) + if !ok { + return + } + old, ok := evt.ObjectOld.(*corev1.Pod) + if !ok { + return + } + // remove pod probe from nodePodProbe.spec + if new.Spec.NodeName != "" && kubecontroller.IsPodActive(old) && !kubecontroller.IsPodActive(new) { + npp := &appsalphav1.NodePodProbe{} + if err := p.reader.Get(context.TODO(), client.ObjectKey{Name: new.Spec.NodeName}, npp); err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("Get NodePodProbe(%s) failed: %s", new.Spec.NodeName) + } + return + } + for _, probe := range npp.Spec.PodProbes { + if probe.Namespace == new.Namespace && probe.Name == new.Name { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: new.Spec.NodeName}}) + break + } + } + } +} + +const ( + VirtualKubelet = "virtual-kubelet" +) + +type enqueueRequestForNode struct { + client.Reader +} + +var _ handler.EventHandler = &enqueueRequestForNode{} + +func (e *enqueueRequestForNode) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + node := evt.Object.(*corev1.Node) + if node.Labels["type"] == VirtualKubelet { + return + } + if node.DeletionTimestamp != nil { + e.nodeDelete(node, q) + return + } + e.nodeCreate(node, q) +} + +func (e *enqueueRequestForNode) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +} + +func (e *enqueueRequestForNode) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + node := evt.ObjectNew.(*corev1.Node) + if node.Labels["type"] == VirtualKubelet { + return + } + if node.DeletionTimestamp != nil { + e.nodeDelete(node, q) + } +} + +func (e *enqueueRequestForNode) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + node := evt.Object.(*corev1.Node) + if node.Labels["type"] == VirtualKubelet { + return + } + e.nodeDelete(node, q) +} + +func (e *enqueueRequestForNode) nodeCreate(node *corev1.Node, q workqueue.RateLimitingInterface) { + npp := &appsalphav1.NodePodProbe{} + if err := e.Get(context.TODO(), client.ObjectKey{Name: node.Name}, npp); err != nil { + if errors.IsNotFound(err) { + klog.Infof("Node create event for nodePodProbe %v", node.Name) + namespacedName := types.NamespacedName{Name: node.Name} + if isReady, delay := getNodeReadyAndDelayTime(node); !isReady { + klog.Infof("Skip to enqueue Node %s with not nodePodProbe, for not ready yet.", node.Name) + return + } else if delay > 0 { + klog.Infof("Enqueue Node %s with not nodePodProbe after %v.", node.Name, delay) + q.AddAfter(reconcile.Request{NamespacedName: namespacedName}, delay) + return + } + klog.Infof("Enqueue Node %s with not nodePodProbe.", node.Name) + q.Add(reconcile.Request{NamespacedName: namespacedName}) + return + } + klog.Errorf("Failed to get nodePodProbe for Node %s: %v", node.Name, err) + } +} + +func (e *enqueueRequestForNode) nodeDelete(node *corev1.Node, q workqueue.RateLimitingInterface) { + nodePodProbe := &appsalphav1.NodePodProbe{} + if err := e.Get(context.TODO(), client.ObjectKey{Name: node.Name}, nodePodProbe); errors.IsNotFound(err) { + return + } + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}}) +} + +func getNodeReadyAndDelayTime(node *corev1.Node) (bool, time.Duration) { + _, condition := nodeutil.GetNodeCondition(&node.Status, corev1.NodeReady) + if condition == nil || condition.Status != corev1.ConditionTrue { + return false, 0 + } + delay := nodePodProbeCreationDelayAfterNodeReady - time.Since(condition.LastTransitionTime.Time) + if delay > 0 { + return true, delay + } + return true, 0 +} diff --git a/pkg/controller/podprobemarker/pod_probe_marker_controller.go b/pkg/controller/podprobemarker/pod_probe_marker_controller.go new file mode 100644 index 0000000000..f3e9d27673 --- /dev/null +++ b/pkg/controller/podprobemarker/pod_probe_marker_controller.go @@ -0,0 +1,337 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobemarker + +import ( + "context" + "flag" + "fmt" + "reflect" + "strings" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" + "github.com/openkruise/kruise/pkg/util/controllerfinder" + utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + "github.com/openkruise/kruise/pkg/util/ratelimiter" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + kubecontroller "k8s.io/kubernetes/pkg/controller" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +func init() { + flag.IntVar(&concurrentReconciles, "podprobemarker-workers", concurrentReconciles, "Max concurrent workers for PodProbeMarker controller.") +} + +var ( + concurrentReconciles = 3 + controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("PodProbeMarker") +) + +const ( + // PodProbeMarkerFinalizer is on PodProbeMarker, and used to remove podProbe from NodePodProbe.Spec + PodProbeMarkerFinalizer = "kruise.io/node-pod-probe-cleanup" +) + +/** +* USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller +* business logic. Delete these comments after modifying this file.* + */ + +// Add creates a new PodProbeMarker Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func Add(mgr manager.Manager) error { + if !utildiscovery.DiscoverGVK(controllerKind) { + return nil + } + return add(mgr, newReconciler(mgr)) +} + +// newReconciler returns a new reconcile.Reconciler +func newReconciler(mgr manager.Manager) reconcile.Reconciler { + cli := utilclient.NewClientFromManager(mgr, "PodProbeMarker-controller") + return &ReconcilePodProbeMarker{ + Client: cli, + scheme: mgr.GetScheme(), + finder: controllerfinder.Finder, + } +} + +// add adds a new Controller to mgr with r as the reconcile.Reconciler +func add(mgr manager.Manager, r reconcile.Reconciler) error { + // Create a new controller + c, err := controller.New("PodProbeMarker-controller", mgr, controller.Options{ + Reconciler: r, MaxConcurrentReconciles: concurrentReconciles, + RateLimiter: ratelimiter.DefaultControllerRateLimiter()}) + if err != nil { + return err + } + + // watch for changes to PodProbeMarker + if err = c.Watch(&source.Kind{Type: &appsv1alpha1.PodProbeMarker{}}, &enqueueRequestForPodProbeMarker{}); err != nil { + return err + } + // watch for changes to pod + if err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &enqueueRequestForPod{reader: mgr.GetClient()}); err != nil { + return err + } + + return nil +} + +var _ reconcile.Reconciler = &ReconcilePodProbeMarker{} + +// ReconcilePodProbeMarker reconciles a PodProbeMarker object +type ReconcilePodProbeMarker struct { + client.Client + scheme *runtime.Scheme + + finder *controllerfinder.ControllerFinder +} + +// +kubebuilder:rbac:groups=apps.kruise.io,resources=podprobemarkers,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps.kruise.io,resources=podprobemarkers/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps.kruise.io,resources=nodepodprobes,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps.kruise.io,resources=nodepodprobes/status,verbs=get;update;patch + +// Reconcile reads that state of the cluster for a PodProbeMarker object and makes changes based on the state read +// and what is in the PodProbeMarker.Spec +func (r *ReconcilePodProbeMarker) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) { + err := r.syncPodProbeMarker(req.Namespace, req.Name) + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +func (r *ReconcilePodProbeMarker) syncPodProbeMarker(ns, name string) error { + // Fetch the PodProbeMarker instance + ppm := &appsv1alpha1.PodProbeMarker{} + err := r.Get(context.TODO(), client.ObjectKey{Namespace: ns, Name: name}, ppm) + if err != nil { + if errors.IsNotFound(err) { + // Object not found, return. Created objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + return nil + } + // Error reading the object - requeue the request. + return err + } + pods, err := r.getMatchingPods(ppm) + if err != nil { + klog.Errorf("PodProbeMarker ppm(%s/%s) list pods failed: %s", ppm.Namespace, ppm.Name, err.Error()) + return err + } + // remove podProbe from NodePodProbe.Spec + if !ppm.DeletionTimestamp.IsZero() { + return r.handlerPodProbeMarkerFinalizer(ppm, pods) + } + // add finalizer + if !controllerutil.ContainsFinalizer(ppm, PodProbeMarkerFinalizer) { + err = util.UpdateFinalizer(r.Client, ppm, util.AddFinalizerOpType, PodProbeMarkerFinalizer) + if err != nil { + klog.Errorf("add PodProbeMarker(%s/%s) finalizer failed: %s", ppm.Namespace, ppm.Name, err.Error()) + return err + } + klog.V(3).Infof("add PodProbeMarker(%s/%s) finalizer success", ppm.Namespace, ppm.Name) + } + // add podProbe in NodePodProbe + for _, pod := range pods { + // add podProbe to NodePodProbe.Spec + if err = r.updateNodePodProbe(ppm, pod); err != nil { + return err + } + } + // update podProbeMarker status + ppmClone := ppm.DeepCopy() + if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: ppm.Namespace, Name: ppm.Name}, ppmClone); err != nil { + klog.Errorf("error getting updated podProbeMarker %s from client", ppm.Name) + } + if ppmClone.Status.ObservedGeneration == ppmClone.Generation && int(ppmClone.Status.MatchedPods) == len(pods) { + return nil + } + ppmClone.Status.ObservedGeneration = ppmClone.Generation + ppmClone.Status.MatchedPods = int64(len(pods)) + return r.Client.Status().Update(context.TODO(), ppmClone) + }); err != nil { + klog.Errorf("PodProbeMarker(%s/%s) update status failed: %s", ppm.Namespace, ppm.Name, err.Error()) + return err + } + klog.V(3).Infof("PodProbeMarker(%s/%s) update status(%s) success", ppm.Namespace, ppm.Name, util.DumpJSON(ppmClone.Status)) + return nil +} + +func (r *ReconcilePodProbeMarker) handlerPodProbeMarkerFinalizer(ppm *appsv1alpha1.PodProbeMarker, pods []*corev1.Pod) error { + if !controllerutil.ContainsFinalizer(ppm, PodProbeMarkerFinalizer) { + return nil + } + for _, pod := range pods { + if err := r.removePodProbeFromNodePodProbe(ppm.Name, pod.Spec.NodeName); err != nil { + return err + } + } + err := util.UpdateFinalizer(r.Client, ppm, util.RemoveFinalizerOpType, PodProbeMarkerFinalizer) + if err != nil { + klog.Errorf("remove PodProbeMarker(%s/%s) finalizer failed: %s", ppm.Namespace, ppm.Name, err.Error()) + return err + } + klog.V(3).Infof("remove PodProbeMarker(%s/%s) finalizer success", ppm.Namespace, ppm.Name) + return nil +} + +func (r *ReconcilePodProbeMarker) updateNodePodProbe(ppm *appsv1alpha1.PodProbeMarker, pod *corev1.Pod) error { + npp := &appsv1alpha1.NodePodProbe{} + err := r.Get(context.TODO(), client.ObjectKey{Name: pod.Spec.NodeName}, npp) + if err != nil { + if errors.IsNotFound(err) { + klog.Warningf("PodProbeMarker ppm(%s/%s) NodePodProbe(%s) is Not Found", ppm.Namespace, ppm.Name, npp.Name) + return nil + } + // Error reading the object - requeue the request. + klog.Errorf("PodProbeMarker ppm(%s/%s) get NodePodProbe(%s) failed: %s", ppm.Namespace, ppm.Name, pod.Spec.NodeName, err.Error()) + return err + } + + oldSpec := npp.Spec.DeepCopy() + exist := false + for i := range npp.Spec.PodProbes { + podProbe := &npp.Spec.PodProbes[i] + if podProbe.Name == pod.Name && podProbe.Namespace == pod.Namespace && podProbe.UID == string(pod.UID) { + exist = true + for j := range ppm.Spec.Probes { + probe := ppm.Spec.Probes[j] + setPodContainerProbes(podProbe, probe, ppm.Name) + } + break + } + } + if !exist { + podProbe := appsv1alpha1.PodProbe{Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID)} + for j := range ppm.Spec.Probes { + probe := ppm.Spec.Probes[j] + podProbe.Probes = append(podProbe.Probes, appsv1alpha1.ContainerProbe{ + Name: fmt.Sprintf("%s#%s", ppm.Name, probe.Name), + ContainerName: probe.ContainerName, + Probe: probe.Probe, + }) + } + npp.Spec.PodProbes = append(npp.Spec.PodProbes, podProbe) + } + + if reflect.DeepEqual(npp.Spec, oldSpec) { + return nil + } + err = r.Update(context.TODO(), npp) + if err != nil { + klog.Errorf("PodProbeMarker ppm(%s/%s) Update NodePodProbe(%s) failed: %s", ppm.Namespace, ppm.Name, npp.Name, err.Error()) + return err + } + klog.V(3).Infof("PodProbeMarker ppm(%s/%s) update NodePodProbe(%s) from(%s) -> to(%s) success", + ppm.Namespace, ppm.Name, npp.Name, util.DumpJSON(oldSpec), util.DumpJSON(npp.Spec)) + return nil +} + +func setPodContainerProbes(podProbe *appsv1alpha1.PodProbe, probe appsv1alpha1.PodContainerProbe, ppmName string) { + newProbe := appsv1alpha1.ContainerProbe{ + Name: fmt.Sprintf("%s#%s", ppmName, probe.Name), + ContainerName: probe.ContainerName, + Probe: probe.Probe, + } + for i, obj := range podProbe.Probes { + if obj.Name == newProbe.Name { + if !reflect.DeepEqual(obj, newProbe) { + podProbe.Probes[i] = newProbe + } + return + } + } + podProbe.Probes = append(podProbe.Probes, newProbe) +} + +// If you need update the pod object, you must DeepCopy it +func (r *ReconcilePodProbeMarker) getMatchingPods(ppm *appsv1alpha1.PodProbeMarker) ([]*corev1.Pod, error) { + // get more faster selector + selector, err := util.ValidatedLabelSelectorAsSelector(ppm.Spec.Selector) + if err != nil { + return nil, err + } + // DisableDeepCopy:true, indicates must be deep copy before update pod objection + listOpts := &client.ListOptions{LabelSelector: selector, Namespace: ppm.Namespace} + podList := &corev1.PodList{} + if listErr := r.Client.List(context.TODO(), podList, listOpts, utilclient.DisableDeepCopy); listErr != nil { + return nil, err + } + pods := make([]*corev1.Pod, 0, len(podList.Items)) + for i := range podList.Items { + pod := &podList.Items[i] + condition := util.GetCondition(pod, corev1.PodInitialized) + if kubecontroller.IsPodActive(pod) && pod.Spec.NodeName != "" && + condition != nil && condition.Status == corev1.ConditionTrue { + pods = append(pods, pod) + } + } + return pods, nil +} + +func (r *ReconcilePodProbeMarker) removePodProbeFromNodePodProbe(ppmName, nppName string) error { + npp := &appsv1alpha1.NodePodProbe{} + err := r.Get(context.TODO(), client.ObjectKey{Name: nppName}, npp) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + + newSpec := appsv1alpha1.NodePodProbeSpec{} + for _, podProbe := range npp.Spec.PodProbes { + newPodProbe := appsv1alpha1.PodProbe{Name: podProbe.Name, Namespace: podProbe.Namespace, UID: podProbe.UID} + for i := range podProbe.Probes { + probe := podProbe.Probes[i] + // probe.Name -> podProbeMarker.Name#probe.Name + if !strings.Contains(probe.Name, fmt.Sprintf("%s#", ppmName)) { + newPodProbe.Probes = append(newPodProbe.Probes, probe) + } + } + if len(newPodProbe.Probes) > 0 { + newSpec.PodProbes = append(newSpec.PodProbes, newPodProbe) + } + } + if reflect.DeepEqual(npp.Spec, newSpec) { + return nil + } + npp.Spec = newSpec + err = r.Update(context.TODO(), npp) + if err != nil { + klog.Errorf("NodePodProbe(%s) remove PodProbe(%s) failed: %s", nppName, ppmName, err.Error()) + return err + } + klog.V(3).Infof("NodePodProbe(%s) remove PodProbe(%s) success", nppName, ppmName) + return nil +} diff --git a/pkg/controller/podprobemarker/pod_probe_marker_controller_test.go b/pkg/controller/podprobemarker/pod_probe_marker_controller_test.go new file mode 100644 index 0000000000..c1fe12314e --- /dev/null +++ b/pkg/controller/podprobemarker/pod_probe_marker_controller_test.go @@ -0,0 +1,825 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobemarker + +import ( + "context" + "reflect" + "testing" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util/controllerfinder" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func init() { + scheme = runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + _ = appsv1alpha1.AddToScheme(scheme) +} + +var ( + scheme *runtime.Scheme + + demoPodProbeMarker = appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-1", + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + Labels: map[string]string{ + "server-healthy": "true", + }, + }, + { + State: appsv1alpha1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + Labels: map[string]string{ + "server-healthy": "false", + }, + }, + }, + }, + }, + }, + } + + demoNodePodProbe = appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +) + +func TestSyncPodProbeMarker(t *testing.T) { + cases := []struct { + name string + req ctrl.Request + getPods func() []*corev1.Pod + getPodProbeMarkers func() []*appsv1alpha1.PodProbeMarker + getNodePodProbes func() []*appsv1alpha1.NodePodProbe + expectNodePodProbes func() []*appsv1alpha1.NodePodProbe + }{ + { + name: "test1, merge NodePodProbes", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarker.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarker.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + { + name: "test2, no NodePodProbes", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarker.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + UID: types.UID("pod-2-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{}, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarker.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + }, + } + }, + }, + { + name: "test3, remove podProbe from NodePodProbes", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarker.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + UID: types.UID("pod-2-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + demo := demoPodProbeMarker.DeepCopy() + now := metav1.Now() + demo.DeletionTimestamp = &now + demo.Finalizers = []string{PodProbeMarkerFinalizer} + ppms := []*appsv1alpha1.PodProbeMarker{ + demo, + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + { + name: "test4, remove podProbe from NodePodProbes", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarker.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + UID: types.UID("pod-2-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + demo := demoPodProbeMarker.DeepCopy() + now := metav1.Now() + demo.DeletionTimestamp = &now + demo.Finalizers = []string{PodProbeMarkerFinalizer} + ppms := []*appsv1alpha1.PodProbeMarker{ + demo, + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + }, + } + }, + }, + { + name: "test5, merge NodePodProbes", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarker.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarker.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + { + name: "test6, merge NodePodProbes", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarker.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarker.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-2", + UID: "pod-2-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes = []appsv1alpha1.PodProbe{ + { + Name: "pod-2", + UID: "pod-2-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + { + name: "test7, NodePodProbes changed", + req: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: demoPodProbeMarker.Name, + }, + }, + getPods: func() []*corev1.Pod { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + UID: types.UID("pod-1-uid"), + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodInitialized, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + return pods + }, + getPodProbeMarkers: func() []*appsv1alpha1.PodProbeMarker { + ppms := []*appsv1alpha1.PodProbeMarker{ + demoPodProbeMarker.DeepCopy(), + } + return ppms + }, + getNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + return []*appsv1alpha1.NodePodProbe{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/home/admin/healthy.sh"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + }, + expectNodePodProbes: func() []*appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-2#idle", + ContainerName: "log", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/idle.sh"}, + }, + }, + }, + }, + }, + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + }, + }, + }, + } + return []*appsv1alpha1.NodePodProbe{demo} + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + for _, obj := range cs.getPods() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create Pod failed: %s", err.Error()) + } + } + for _, obj := range cs.getPodProbeMarkers() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create PodProbeMarker failed: %s", err.Error()) + } + } + for _, obj := range cs.getNodePodProbes() { + err := fakeClient.Create(context.TODO(), obj.DeepCopy()) + if err != nil { + t.Fatalf("create NodePodProbes failed: %s", err.Error()) + } + } + + controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient} + recon := ReconcilePodProbeMarker{Client: fakeClient} + _, err := recon.Reconcile(context.TODO(), cs.req) + if err != nil { + t.Fatalf("Reconcile failed: %s", err.Error()) + } + if !checkNodePodProbeEqual(fakeClient, t, cs.expectNodePodProbes()) { + t.Fatalf("Reconcile failed") + } + }) + } +} + +func checkNodePodProbeEqual(c client.WithWatch, t *testing.T, expect []*appsv1alpha1.NodePodProbe) bool { + for i := range expect { + obj := expect[i] + npp := &appsv1alpha1.NodePodProbe{} + err := c.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, npp) + if err != nil { + t.Fatalf("get NodePodProbe failed: %s", err.Error()) + return false + } + if !reflect.DeepEqual(obj.Spec, npp.Spec) { + return false + } + } + return true +} diff --git a/pkg/controller/podprobemarker/podprobemarker_event_handler.go b/pkg/controller/podprobemarker/podprobemarker_event_handler.go new file mode 100644 index 0000000000..7afdfa5898 --- /dev/null +++ b/pkg/controller/podprobemarker/podprobemarker_event_handler.go @@ -0,0 +1,136 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobemarker + +import ( + "context" + + appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + kubecontroller "k8s.io/kubernetes/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var _ handler.EventHandler = &enqueueRequestForPodProbeMarker{} + +type enqueueRequestForPodProbeMarker struct{} + +func (p *enqueueRequestForPodProbeMarker) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + p.queue(q, evt.Object) +} + +func (p *enqueueRequestForPodProbeMarker) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +} + +func (p *enqueueRequestForPodProbeMarker) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +} + +func (p *enqueueRequestForPodProbeMarker) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + p.queue(q, evt.ObjectNew) +} + +func (p *enqueueRequestForPodProbeMarker) queue(q workqueue.RateLimitingInterface, obj runtime.Object) { + ppm, ok := obj.(*appsalphav1.PodProbeMarker) + if !ok { + return + } + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: ppm.Namespace, + Name: ppm.Name, + }, + }) +} + +var _ handler.EventHandler = &enqueueRequestForPod{} + +type enqueueRequestForPod struct { + reader client.Reader +} + +func (p *enqueueRequestForPod) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {} + +func (p *enqueueRequestForPod) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +} + +func (p *enqueueRequestForPod) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +} + +func (p *enqueueRequestForPod) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + new, ok := evt.ObjectNew.(*corev1.Pod) + if !ok { + return + } + old, ok := evt.ObjectOld.(*corev1.Pod) + if !ok { + return + } + // add pod probe to nodePodProbe.spec + oldInitialCondition := util.GetCondition(old, corev1.PodInitialized) + newInitialCondition := util.GetCondition(new, corev1.PodInitialized) + if newInitialCondition == nil { + return + } + if kubecontroller.IsPodActive(new) && (oldInitialCondition == nil || oldInitialCondition.Status == corev1.ConditionFalse) && + newInitialCondition.Status == corev1.ConditionTrue { + ppms, err := p.getPodProbeMarkerForPod(new) + if err != nil { + klog.Errorf("List PodProbeMarker fialed: %s", err.Error()) + return + } + for _, ppm := range ppms { + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: ppm.Namespace, + Name: ppm.Name, + }, + }) + } + } +} + +func (p *enqueueRequestForPod) getPodProbeMarkerForPod(pod *corev1.Pod) ([]*appsalphav1.PodProbeMarker, error) { + ppmList := &appsalphav1.PodProbeMarkerList{} + if err := p.reader.List(context.TODO(), ppmList, &client.ListOptions{Namespace: pod.Namespace}, utilclient.DisableDeepCopy); err != nil { + return nil, err + } + var ppms []*appsalphav1.PodProbeMarker + for i := range ppmList.Items { + ppm := &ppmList.Items[i] + // This error is irreversible, so continue + labelSelector, err := util.ValidatedLabelSelectorAsSelector(ppm.Spec.Selector) + if err != nil { + continue + } + // If a PUB with a nil or empty selector creeps in, it should match nothing, not everything. + if labelSelector.Empty() || !labelSelector.Matches(labels.Set(pod.Labels)) { + continue + } + ppms = append(ppms, ppm) + } + return ppms, nil +} diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 69ba24865e..ec93929bf1 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -193,9 +193,6 @@ func (ssc *defaultStatefulSetControl) truncateHistory( current *apps.ControllerRevision, update *apps.ControllerRevision) error { history := make([]*apps.ControllerRevision, 0, len(revisions)) - if current == nil || update == nil { - return nil - } // mark all live revisions live := map[string]bool{} if current != nil { diff --git a/pkg/util/finalizer.go b/pkg/util/finalizer.go new file mode 100644 index 0000000000..b4e244437e --- /dev/null +++ b/pkg/util/finalizer.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Kruise Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +type FinalizerOpType string + +const ( + AddFinalizerOpType FinalizerOpType = "Add" + RemoveFinalizerOpType FinalizerOpType = "Remove" +) + +func UpdateFinalizer(c client.Client, object client.Object, op FinalizerOpType, finalizer string) error { + switch op { + case AddFinalizerOpType, RemoveFinalizerOpType: + default: + panic("UpdateFinalizer Func 'op' parameter must be 'Add' or 'Remove'") + } + + key := client.ObjectKeyFromObject(object) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + fetchedObject := object.DeepCopyObject().(client.Object) + getErr := c.Get(context.TODO(), key, fetchedObject) + if getErr != nil { + return getErr + } + finalizers := fetchedObject.GetFinalizers() + switch op { + case AddFinalizerOpType: + if controllerutil.ContainsFinalizer(fetchedObject, finalizer) { + return nil + } + finalizers = append(finalizers, finalizer) + case RemoveFinalizerOpType: + finalizerSet := sets.NewString(finalizers...) + if !finalizerSet.Has(finalizer) { + return nil + } + finalizers = finalizerSet.Delete(finalizer).List() + } + fetchedObject.SetFinalizers(finalizers) + return c.Update(context.TODO(), fetchedObject) + }) +} diff --git a/test/e2e/apps/sidecarset.go b/test/e2e/apps/sidecarset.go index 228e944586..b0a2f0b0fa 100644 --- a/test/e2e/apps/sidecarset.go +++ b/test/e2e/apps/sidecarset.go @@ -608,6 +608,7 @@ var _ = SIGDescribe("SidecarSet", func() { // create sidecarSet again sidecarSetIn, err = tester.CreateSidecarSet(sidecarSetIn) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 3) // create deployment deploymentIn := tester.NewBaseDeployment(ns)