diff --git a/.github/workflows/e2e-1.16.yaml b/.github/workflows/e2e-1.16.yaml index aaed35ec6b..a12ed60af2 100644 --- a/.github/workflows/e2e-1.16.yaml +++ b/.github/workflows/e2e-1.16.yaml @@ -85,6 +85,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal pullimages-containerrecreate: @@ -154,6 +166,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal advanced-daemonset: @@ -223,6 +247,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal sidecarset: @@ -292,6 +328,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal podUnavailableBudget: @@ -419,4 +467,16 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal diff --git a/.github/workflows/e2e-1.24.yaml b/.github/workflows/e2e-1.24.yaml index dda59a8615..2bf1c24348 100644 --- a/.github/workflows/e2e-1.24.yaml +++ b/.github/workflows/e2e-1.24.yaml @@ -154,6 +154,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal advanced-daemonset: @@ -223,6 +235,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal sidecarset: @@ -292,6 +316,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal ephemeraljob: @@ -477,4 +513,16 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal diff --git a/config/rbac/daemon_role.yaml b/config/rbac/daemon_role.yaml index a1b5a732f0..e87ce2a1b7 100644 --- a/config/rbac/daemon_role.yaml +++ b/config/rbac/daemon_role.yaml @@ -72,3 +72,19 @@ rules: - get - list - watch +- apiGroups: + - apps.kruise.io + resources: + - nodepodprobes + verbs: + - get + - list + - watch +- apiGroups: + - apps.kruise.io + resources: + - nodepodprobes/status + verbs: + - get + - patch + - update diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index a7c4db69d4..782bee00ef 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -23,6 +23,8 @@ import ( "net/http" "sync" + "github.com/openkruise/kruise/pkg/daemon/podprobe" + "github.com/prometheus/client_golang/prometheus/promhttp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -147,9 +149,16 @@ func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) { return nil, fmt.Errorf("failed to new crr daemon controller: %v", err) } + // node pod probe + nppController, err := podprobe.NewController(opts) + if err != nil { + return nil, fmt.Errorf("failed to new nodePodProbe daemon controller: %v", err) + } + var runnables = []Runnable{ puller, crrController, + nppController, } if utilfeature.DefaultFeatureGate.Enabled(features.DaemonWatchingPod) { diff --git a/pkg/daemon/podprobe/pod_probe_controller.go b/pkg/daemon/podprobe/pod_probe_controller.go new file mode 100644 index 0000000000..c8c539f4cf --- /dev/null +++ b/pkg/daemon/podprobe/pod_probe_controller.go @@ -0,0 +1,428 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "context" + "crypto/rand" + "fmt" + "math/big" + "net/http" + "reflect" + "sync" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client" + kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" + clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" + daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" + "github.com/openkruise/kruise/pkg/daemon/util" + commonutil "github.com/openkruise/kruise/pkg/util" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/gengo/examples/set-gen/sets" + "k8s.io/klog/v2" + kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" +) + +// Key uniquely identifying container probes +type probeKey struct { + podUID string + containerName string + probeName string +} + +type Controller struct { + queue workqueue.RateLimitingInterface + updateQueue workqueue.RateLimitingInterface + nodePodProbeInformer cache.SharedIndexInformer + nodePodProbeLister listersalpha1.NodePodProbeLister + nodePodProbeClient clientalpha1.NodePodProbeInterface + // Map of active workers for probes + workers map[probeKey]*worker + // Lock for accessing & mutating workers + workerLock sync.RWMutex + runtimeFactory daemonruntime.Factory + // prober executes the probe actions. + prober *prober + // pod probe result manager + result *resultManager + // node name + nodeName string +} + +// NewController returns the controller for pod probe +func NewController(opts daemonoptions.Options) (*Controller, error) { + // pull the next work item from queue. It should be a key we use to lookup + // something in a cache + nodeName, err := util.NodeName() + if err != nil { + return nil, err + } + randInt, _ := rand.Int(rand.Reader, big.NewInt(5000)) + queue := workqueue.NewNamedRateLimitingQueue( + // Backoff duration from 500ms to 50~55s + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(randInt.Int64())), + "sync_node_pod_probe", + ) + updateQueue := workqueue.NewNamedRateLimitingQueue( + // Backoff duration from 500ms to 50~55s + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(randInt.Int64())), + "update_node_pod_probe_status", + ) + genericClient := client.GetGenericClientWithName("kruise-daemon-podprobe") + informer := newNodePodProbeInformer(genericClient.KruiseClient, opts.NodeName) + c := &Controller{ + nodePodProbeInformer: informer, + nodePodProbeLister: listersalpha1.NewNodePodProbeLister(informer.GetIndexer()), + runtimeFactory: opts.RuntimeFactory, + workers: make(map[probeKey]*worker), + queue: queue, + updateQueue: updateQueue, + nodePodProbeClient: genericClient.KruiseClient.AppsV1alpha1().NodePodProbes(), + result: NewResultManager(updateQueue), + nodeName: nodeName, + } + c.prober = newProber(c.runtimeFactory.GetRuntimeService()) + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npp, ok := obj.(*appsv1alpha1.NodePodProbe) + if ok { + enqueue(queue, npp) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldNodePodProbe, oldOK := oldObj.(*appsv1alpha1.NodePodProbe) + newNodePodProbe, newOK := newObj.(*appsv1alpha1.NodePodProbe) + if !oldOK || !newOK { + return + } + if reflect.DeepEqual(oldNodePodProbe.Spec, newNodePodProbe.Spec) { + return + } + enqueue(queue, newNodePodProbe) + }, + }) + + opts.Healthz.RegisterFunc("nodePodProbeInformerSynced", func(_ *http.Request) error { + if !informer.HasSynced() { + return fmt.Errorf("not synced") + } + return nil + }) + return c, nil +} + +func newNodePodProbeInformer(client kruiseclient.Interface, nodeName string) cache.SharedIndexInformer { + tweakListOptionsFunc := func(opt *metav1.ListOptions) { + opt.FieldSelector = "metadata.name=" + nodeName + } + + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + tweakListOptionsFunc(&options) + return client.AppsV1alpha1().NodePodProbes().List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + tweakListOptionsFunc(&options) + return client.AppsV1alpha1().NodePodProbes().Watch(context.TODO(), options) + }, + }, + &appsv1alpha1.NodePodProbe{}, + 0, // do not resync + cache.Indexers{}, + ) +} + +func enqueue(queue workqueue.Interface, obj *appsv1alpha1.NodePodProbe) { + if obj.DeletionTimestamp != nil { + return + } + key, _ := cache.MetaNamespaceKeyFunc(obj) + queue.Add(key) +} + +func (c *Controller) Run(stop <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Info("Starting informer for NodePodProbe") + go c.nodePodProbeInformer.Run(stop) + if !cache.WaitForCacheSync(stop, c.nodePodProbeInformer.HasSynced) { + return + } + + klog.Infof("Starting NodePodProbe controller") + // Launch one workers to process resources, for there is only one nodePodProbe per Node + go wait.Until(func() { + for c.processNextWorkItem() { + } + }, time.Second, stop) + + go wait.Until(func() { + for c.processUpdateWorkItem() { + } + }, time.Second, stop) + + klog.Info("Started NodePodProbe controller successfully") + <-stop +} + +// run probe worker based on NodePodProbe.Spec configuration +func (c *Controller) processNextWorkItem() bool { + // pull the next work item from queue. It should be a key we use to lookup + // something in a cache + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + err := c.sync() + if err == nil { + // No error, tell the queue to stop tracking history + c.queue.Forget(key) + } else { + // requeue the item to work on later + c.queue.AddRateLimited(key) + } + + return true +} + +func (c *Controller) sync() error { + // indicates must be deep copy before update pod objection + npp, err := c.nodePodProbeLister.Get(c.nodeName) + if errors.IsNotFound(err) { + return nil + } else if err != nil { + klog.Errorf("Failed to get nodePodProbe %s: %v", c.nodeName, err) + return err + } + + // run probe worker + c.workerLock.Lock() + validProbe := map[probeKey]struct{}{} + validSets := sets.NewString() + for _, podProbe := range npp.Spec.PodProbes { + key := probeKey{podUID: podProbe.UID} + for i := range podProbe.Probes { + probe := podProbe.Probes[i] + key.containerName = probe.ContainerName + key.probeName = probe.Name + validSets.Insert(fmt.Sprintf("%s/%s", key.podUID, key.probeName)) + validProbe[key] = struct{}{} + if worker, ok := c.workers[key]; ok { + if !reflect.DeepEqual(probe.Probe, worker.getProbeSpec()) { + klog.Infof("NodePodProbe pod(%s) container(%s) probe changed from(%s) -> to(%s)", + key.podUID, key.containerName, commonutil.DumpJSON(worker.getProbeSpec()), commonutil.DumpJSON(probe.Probe)) + worker.updateProbeSpec(&probe.Probe) + } + continue + } + w := newWorker(c, key, &probe.Probe) + c.workers[key] = w + klog.Infof("NodePodProbe run pod(%s) container(%s) probe(%s) worker", key.podUID, key.containerName, key.probeName) + go w.run() + } + } + for key, worker := range c.workers { + if _, ok := validProbe[key]; !ok { + klog.Infof("NodePodProbe stop pod(%s/%s) container(%s) probe(%s) worker", key.podUID, key.containerName, key.probeName) + worker.stop() + } + } + c.workerLock.Unlock() + + // If the PodProbe is deleted, the corresponding status will be clear + newStatus := appsv1alpha1.NodePodProbeStatus{} + for _, probeStatus := range npp.Status.PodProbeStatuses { + newProbeStatus := appsv1alpha1.PodProbeStatus{ + Namespace: probeStatus.Namespace, + Name: probeStatus.Name, + UID: probeStatus.UID, + } + for i := range probeStatus.ProbeStates { + probeState := probeStatus.ProbeStates[i] + if validSets.Has(fmt.Sprintf("%s/%s", probeStatus.UID, probeState.Name)) { + newProbeStatus.ProbeStates = append(newProbeStatus.ProbeStates, probeState) + } + } + if len(newProbeStatus.ProbeStates) > 0 { + newStatus.PodProbeStatuses = append(newStatus.PodProbeStatuses, newProbeStatus) + } + } + if reflect.DeepEqual(npp.Status, newStatus) { + return nil + } + nppClone := npp.DeepCopy() + nppClone.Status = newStatus + _, err = c.nodePodProbeClient.UpdateStatus(context.TODO(), nppClone, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("NodePodProbe(%s) update status failed: %s", c.nodeName, err.Error()) + return err + } + klog.Infof("NodePodProbe(%s) update status from(%s) -> to(%s) success", c.nodeName, commonutil.DumpJSON(npp.Status), commonutil.DumpJSON(nppClone.Status)) + return nil +} + +// Record the execution result of probe worker to NodePodProbe Status +func (c *Controller) processUpdateWorkItem() bool { + key, quit := c.updateQueue.Get() + if quit { + return false + } + defer c.updateQueue.Done(key) + + err := c.syncUpdateNodePodProbeStatus() + if err == nil { + // No error, tell the queue to stop tracking history + c.queue.Forget(key) + } else { + // requeue the item to work on later + c.queue.AddRateLimited(key) + } + + return true +} + +func (c *Controller) syncUpdateNodePodProbeStatus() error { + // container probe result + updates := c.result.ListResults() + if len(updates) == 0 { + return nil + } + // indicates must be deep copy before update pod objection + npp, err := c.nodePodProbeLister.Get(c.nodeName) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + klog.Errorf("Get NodePodProbe(%s) failed: %s", c.nodeName, err.Error()) + return err + } + + newStatus := npp.Status.DeepCopy() + for _, update := range updates { + // update probe result in status + updateNodePodProbeStatus(update, npp.Spec, newStatus) + } + if reflect.DeepEqual(npp.Status, newStatus) { + return nil + } + nppClone := npp.DeepCopy() + nppClone.Status = *newStatus + _, err = c.nodePodProbeClient.UpdateStatus(context.TODO(), nppClone, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("NodePodProbe(%s) update status failed: %s", c.nodeName, err.Error()) + return err + } + klog.Infof("NodePodProbe(%s) update status from(%s) -> to(%s) success", c.nodeName, commonutil.DumpJSON(npp.Status), commonutil.DumpJSON(nppClone.Status)) + return nil +} + +// Called by the worker after exiting. +func (c *Controller) removeWorker(key probeKey) { + c.workerLock.Lock() + defer c.workerLock.Unlock() + delete(c.workers, key) +} + +func (c *Controller) fetchLatestPodContainer(podUID, name string) (*runtimeapi.Container, error) { + // runtimeService, for example docker + if c.runtimeFactory == nil { + klog.Warningf("NodePodProbe not found runtimeFactory") + return nil, nil + } + runtimeService := c.runtimeFactory.GetRuntimeService() + if runtimeService == nil { + klog.Warningf("NodePodProbe not found runtimeService") + return nil, nil + } + containers, err := runtimeService.ListContainers(&runtimeapi.ContainerFilter{ + LabelSelector: map[string]string{kubelettypes.KubernetesPodUIDLabel: podUID}, + }) + if err != nil { + klog.Errorf("NodePodProbe pod(%s) list containers failed: %s", podUID, err.Error()) + return nil, err + } + var container *runtimeapi.Container + for i := range containers { + obj := containers[i] + if obj.Metadata.Name != name { + continue + } + if container == nil || obj.CreatedAt > container.CreatedAt { + container = obj + } + } + return container, nil +} + +func updateNodePodProbeStatus(update Update, nppSpec appsv1alpha1.NodePodProbeSpec, newStatus *appsv1alpha1.NodePodProbeStatus) { + var podNs, podName string + for _, podProbe := range nppSpec.PodProbes { + if podProbe.UID == update.Key.podUID { + podNs, podName = podProbe.Namespace, podProbe.Name + } + } + if podName == "" { + return + } + var probeStatus *appsv1alpha1.PodProbeStatus + for i := range newStatus.PodProbeStatuses { + status := &newStatus.PodProbeStatuses[i] + if status.UID == update.Key.podUID { + probeStatus = status + } + } + if probeStatus == nil { + newStatus.PodProbeStatuses = append(newStatus.PodProbeStatuses, appsv1alpha1.PodProbeStatus{Namespace: podNs, Name: podName, UID: update.Key.podUID}) + probeStatus = &newStatus.PodProbeStatuses[len(newStatus.PodProbeStatuses)-1] + } + + for i, obj := range probeStatus.ProbeStates { + if obj.Name == update.Key.probeName { + if obj.State != update.State { + probeStatus.ProbeStates[i].State = update.State + probeStatus.ProbeStates[i].Message = update.Msg + probeStatus.ProbeStates[i].LastProbeTime = metav1.Now() + probeStatus.ProbeStates[i].LastTransitionTime = metav1.Now() + } + return + } + } + probeStatus.ProbeStates = append(probeStatus.ProbeStates, appsv1alpha1.ContainerProbeState{ + Name: update.Key.probeName, + State: update.State, + LastProbeTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Message: update.Msg, + }) +} diff --git a/pkg/daemon/podprobe/pod_probe_controller_test.go b/pkg/daemon/podprobe/pod_probe_controller_test.go new file mode 100644 index 0000000000..393e1a93b2 --- /dev/null +++ b/pkg/daemon/podprobe/pod_probe_controller_test.go @@ -0,0 +1,489 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "reflect" + "testing" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + commonutil "github.com/openkruise/kruise/pkg/util" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +var ( + demoNodePodProbe = appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }, + }, + }, + }, + }, + }, + } +) + +func TestUpdateNodePodProbeStatus(t *testing.T) { + cases := []struct { + name string + getUpdate func() Update + getNodePodProbe func() *appsv1alpha1.NodePodProbe + expectNodePodProbeStatus func() appsv1alpha1.NodePodProbeStatus + }{ + { + name: "test1, update pod probe status", + getUpdate: func() Update { + return Update{Key: probeKey{"pod-1-uid", "main", "healthy"}, State: appsv1alpha1.ProbeSucceeded} + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return demo + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + obj := appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return obj + }, + }, + { + name: "test2, update pod probe status", + getUpdate: func() Update { + return Update{Key: probeKey{"pod-1-uid", "main", "healthy"}, State: appsv1alpha1.ProbeSucceeded} + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return demo + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + obj := appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return obj + }, + }, + { + name: "test2, update pod probe status", + getUpdate: func() Update { + return Update{Key: probeKey{"pod-1-uid", "main", "healthy"}, State: appsv1alpha1.ProbeSucceeded} + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "other", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return demo + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + obj := appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return obj + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(cs.getNodePodProbe()) + informer := newNodePodProbeInformer(fakeClient, "node-1") + c := &Controller{ + nodePodProbeInformer: informer, + nodePodProbeLister: listersalpha1.NewNodePodProbeLister(informer.GetIndexer()), + workers: make(map[probeKey]*worker), + nodePodProbeClient: fakeClient.AppsV1alpha1().NodePodProbes(), + nodeName: "node-1", + result: NewResultManager(workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second), + "sync_node_pod_probe", + )), + } + stopCh := make(chan struct{}, 1) + go c.nodePodProbeInformer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.nodePodProbeInformer.HasSynced) { + return + } + c.result.cache = map[string]Update{ + "container-id-1": cs.getUpdate(), + } + err := c.syncUpdateNodePodProbeStatus() + if err != nil { + t.Fatalf("syncUpdateNodePodProbeStatus failed: %s", err.Error()) + return + } + time.Sleep(time.Second) + if !checkNodePodProbeStatusEqual(c.nodePodProbeLister, cs.expectNodePodProbeStatus()) { + t.Fatalf("checkNodePodProbeStatusEqual failed") + } + }) + } +} + +func checkNodePodProbeStatusEqual(lister listersalpha1.NodePodProbeLister, expect appsv1alpha1.NodePodProbeStatus) bool { + npp, err := lister.Get("node-1") + if err != nil { + klog.Errorf("Get NodePodProbe failed: %s", err.Error()) + return false + } + for i := range npp.Status.PodProbeStatuses { + podProbe := npp.Status.PodProbeStatuses[i] + for j := range podProbe.ProbeStates { + obj := &podProbe.ProbeStates[j] + obj.LastTransitionTime = metav1.Time{} + obj.LastProbeTime = metav1.Time{} + } + } + return reflect.DeepEqual(npp.Status.PodProbeStatuses, expect.PodProbeStatuses) +} + +func TestSyncNodePodProbe(t *testing.T) { + cases := []struct { + name string + getNodePodProbe func() *appsv1alpha1.NodePodProbe + setWorkers func(c *Controller) + expectNodePodProbeStatus func() appsv1alpha1.NodePodProbeStatus + expectWorkers func(c *Controller) map[probeKey]*worker + }{ + { + name: "test1, sync nodePodProbe", + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-2#other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + { + Name: "pod-2", + UID: "pod-2-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-2#other", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return demo + }, + setWorkers: func(c *Controller) { + c.workers = map[probeKey]*worker{} + key1 := probeKey{"pod-1-uid", "main", "ppm-1#check"} + c.workers[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/check.sh"}, + }, + }, + }, + }) + key2 := probeKey{"pod-1-uid", "main", "ppm-1#healthy"} + c.workers[key1] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy2.sh"}, + }, + }, + }, + }) + }, + expectWorkers: func(c *Controller) map[probeKey]*worker { + expect := map[probeKey]*worker{} + key := probeKey{"pod-1-uid", "main", "ppm-1#healthy"} + expect[key] = newWorker(c, key, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + return expect + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + return appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + }, + }, + { + name: "test2, sync nodePodProbe", + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[0].Probes = append(demo.Spec.PodProbes[0].Probes, appsv1alpha1.ContainerProbe{ + Name: "ppm-1#check", + ContainerName: "nginx", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/check.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }, + }) + return demo + }, + setWorkers: func(c *Controller) { + c.workers = map[probeKey]*worker{} + }, + expectWorkers: func(c *Controller) map[probeKey]*worker { + expect := map[probeKey]*worker{} + key1 := probeKey{"pod-1-uid", "main", "ppm-1#healthy"} + expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + key2 := probeKey{"pod-1-uid", "nginx", "ppm-1#check"} + expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/check.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + return expect + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + return appsv1alpha1.NodePodProbeStatus{} + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(cs.getNodePodProbe()) + informer := newNodePodProbeInformer(fakeClient, "node-1") + c := &Controller{ + nodePodProbeInformer: informer, + nodePodProbeLister: listersalpha1.NewNodePodProbeLister(informer.GetIndexer()), + workers: make(map[probeKey]*worker), + nodePodProbeClient: fakeClient.AppsV1alpha1().NodePodProbes(), + nodeName: "node-1", + result: NewResultManager(workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second), + "sync_node_pod_probe", + )), + } + stopCh := make(chan struct{}, 1) + go c.nodePodProbeInformer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.nodePodProbeInformer.HasSynced) { + return + } + err := c.sync() + if err != nil { + t.Fatalf("NodePodProbe sync failed: %s", err.Error()) + return + } + time.Sleep(time.Second) + if !checkNodePodProbeStatusEqual(c.nodePodProbeLister, cs.expectNodePodProbeStatus()) { + t.Fatalf("checkNodePodProbeStatusEqual failed") + } + if len(c.workers) != len(cs.expectWorkers(c)) { + t.Fatalf("expect(%d), but get(%d)", len(cs.expectWorkers(c)), len(c.workers)) + } + for _, worker := range cs.expectWorkers(c) { + obj, ok := c.workers[worker.key] + if !ok { + t.Fatalf("expect(%v), but not found", worker.key) + } + if !reflect.DeepEqual(worker.spec, obj.spec) { + t.Fatalf("expect(%s), but get(%s)", commonutil.DumpJSON(worker.spec), commonutil.DumpJSON(obj.spec)) + } + } + }) + } +} diff --git a/pkg/daemon/podprobe/prober.go b/pkg/daemon/podprobe/prober.go new file mode 100644 index 0000000000..1bb3e5e933 --- /dev/null +++ b/pkg/daemon/podprobe/prober.go @@ -0,0 +1,169 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "bytes" + "fmt" + "io" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + criapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/probe" + execprobe "k8s.io/kubernetes/pkg/probe/exec" + "k8s.io/utils/exec" +) + +const ( + maxProbeRetries = 3 + maxMessageLength = 100 +) + +// Prober helps to check the probe(exec, http, tcp) of a container. +type prober struct { + exec execprobe.Prober + runtimeService criapi.RuntimeService +} + +// NewProber creates a Prober, it takes a command runner and +// several container info managers. +func newProber(runtimeService criapi.RuntimeService) *prober { + return &prober{ + exec: execprobe.New(), + runtimeService: runtimeService, + } +} + +// probe probes the container. +func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.Container, containerID string) (appsv1alpha1.ProbeState, string, error) { + result, msg, err := pb.runProbeWithRetries(p, container, containerID) + if bytes.Count([]byte(msg), nil)-1 > maxMessageLength { + msg = msg[:maxMessageLength] + } + if err != nil || (result != probe.Success && result != probe.Warning) { + return appsv1alpha1.ProbeFailed, msg, err + } + return appsv1alpha1.ProbeSucceeded, msg, nil +} + +// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result +// if it never succeeds. +func (pb *prober) runProbeWithRetries(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.Container, containerID string) (probe.Result, string, error) { + var err error + var result probe.Result + var output string + for i := 0; i < maxProbeRetries; i++ { + result, output, err = pb.runProbe(p, container, containerID) + if err == nil { + return result, output, nil + } + } + return result, output, err +} + +func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.Container, containerID string) (probe.Result, string, error) { + timeSecond := p.TimeoutSeconds + if timeSecond <= 0 { + timeSecond = 1 + } + timeout := time.Duration(timeSecond) * time.Second + // current only support exec + // todo: http, tcp + if p.Exec != nil { + return pb.exec.Probe(pb.newExecInContainer(containerID, p.Exec.Command, timeout)) + } + + klog.InfoS("Failed to find probe builder for container", "containerName", container.Metadata.Name) + return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", container.Metadata.Name) +} + +type execInContainer struct { + // run executes a command in a container. Combined stdout and stderr output is always returned. An + // error is returned if one occurred. + run func() ([]byte, error) + writer io.Writer +} + +func (pb *prober) newExecInContainer(containerID string, cmd []string, timeout time.Duration) exec.Cmd { + return &execInContainer{run: func() ([]byte, error) { + // current ignore stdout + _, stderr, err := pb.runtimeService.ExecSync(containerID, cmd, timeout) + if err != nil { + return stderr, err + } + return stderr, nil + }} +} + +func (eic *execInContainer) Run() error { + return nil +} + +func (eic *execInContainer) CombinedOutput() ([]byte, error) { + return eic.run() +} + +func (eic *execInContainer) Output() ([]byte, error) { + return nil, fmt.Errorf("unimplemented") +} + +func (eic *execInContainer) SetDir(dir string) { + // unimplemented +} + +func (eic *execInContainer) SetStdin(in io.Reader) { + // unimplemented +} + +func (eic *execInContainer) SetStdout(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetStderr(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetEnv(env []string) { + // unimplemented +} + +func (eic *execInContainer) Stop() { + // unimplemented +} + +func (eic *execInContainer) Start() error { + data, err := eic.run() + if eic.writer != nil { + eic.writer.Write(data) + } + return err +} + +func (eic *execInContainer) Wait() error { + return nil +} + +func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) { + return nil, fmt.Errorf("unimplemented") +} + +func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) { + return nil, fmt.Errorf("unimplemented") +} diff --git a/pkg/daemon/podprobe/results_manager.go b/pkg/daemon/podprobe/results_manager.go new file mode 100644 index 0000000000..6bd263a29c --- /dev/null +++ b/pkg/daemon/podprobe/results_manager.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "sync" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +// Update is an enum of the types of updates sent over the Updates channel. +type Update struct { + ContainerID string + Key probeKey + State appsv1alpha1.ProbeState + Msg string +} + +// resultManager implementation, store container probe result +type resultManager struct { + // guards the cache + sync.RWMutex + // map of container ID -> probe Result + cache map[string]Update + queue workqueue.RateLimitingInterface +} + +// NewResultManager creates and returns an empty results resultManager. +func NewResultManager(queue workqueue.RateLimitingInterface) *resultManager { + return &resultManager{ + cache: make(map[string]Update), + queue: queue, + } +} + +func (m *resultManager) ListResults() []Update { + m.RLock() + defer m.RUnlock() + + var results []Update + for _, result := range m.cache { + results = append(results, result) + } + return results +} + +func (m *resultManager) Set(id string, key probeKey, result appsv1alpha1.ProbeState, msg string) { + m.Lock() + defer m.Unlock() + prev, exists := m.cache[id] + if !exists || prev.State != result { + klog.V(5).Infof("Pod(%s) do container(%s) probe(%s) result(%s)", key.podUID, key.containerName, key.probeName, result) + m.cache[id] = Update{id, key, result, msg} + m.queue.Add("update") + } +} + +func (m *resultManager) Remove(id string) { + m.Lock() + defer m.Unlock() + delete(m.cache, id) +} diff --git a/pkg/daemon/podprobe/worker.go b/pkg/daemon/podprobe/worker.go new file mode 100644 index 0000000000..34e2a45a63 --- /dev/null +++ b/pkg/daemon/podprobe/worker.go @@ -0,0 +1,190 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "fmt" + "reflect" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/apimachinery/pkg/util/runtime" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" +) + +// worker handles the periodic probing of its assigned container. Each worker has a go-routine +// associated with it which runs the probe loop until the container permanently terminates, or the +// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date +// container IDs. +type worker struct { + // Channel for stopping the probe. + stopCh chan struct{} + + // pod uid, container name, probe name + key probeKey + + // Describes the probe configuration + spec *appsv1alpha1.ContainerProbeSpec + + // The probe value during the initial delay. + initialValue appsv1alpha1.ProbeState + + probeController *Controller + + // The last known container ID for this worker. + containerID string + // The last probe result for this worker. + lastResult appsv1alpha1.ProbeState + // How many times in a row the probe has returned the same result. + resultRun int +} + +// Creates and starts a new probe worker. +func newWorker(c *Controller, key probeKey, probe *appsv1alpha1.ContainerProbeSpec) *worker { + + w := &worker{ + stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. + key: key, + spec: probe, + probeController: c, + initialValue: appsv1alpha1.ProbeUnknown, + } + + return w +} + +// run periodically probes the container. +func (w *worker) run() { + periodSecond := w.spec.PeriodSeconds + if periodSecond < 1 { + periodSecond = 1 + } + probeTickerPeriod := time.Duration(periodSecond) * time.Second + probeTicker := time.NewTicker(probeTickerPeriod) + defer func() { + // Clean up. + probeTicker.Stop() + if w.containerID != "" { + w.probeController.result.Remove(w.containerID) + } + + w.probeController.removeWorker(w.key) + }() + +probeLoop: + for w.doProbe() { + // Wait for next probe tick. + select { + case <-w.stopCh: + break probeLoop + case <-probeTicker.C: + } + } +} + +// stop stops the probe worker. The worker handles cleanup and removes itself from its manager. +// It is safe to call stop multiple times. +func (w *worker) stop() { + select { + case w.stopCh <- struct{}{}: + default: // Non-blocking. + } +} + +// doProbe probes the container once and records the result. +// Returns whether the worker should continue. +func (w *worker) doProbe() (keepGoing bool) { + defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging) + defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) + + container, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName) + if container == nil { + klog.V(5).Infof("Pod(%s) container(%s) Not Found", w.key.podUID, w.key.containerName) + return true + } + + if w.containerID != container.Id { + if w.containerID != "" { + w.probeController.result.Remove(w.containerID) + } + klog.V(5).Infof("Pod(%s) container(%s) Id changed(%s -> %s)", w.key.podUID, w.key.containerName, w.containerID, container.Id) + w.containerID = container.Id + w.probeController.result.Set(w.containerID, w.key, w.initialValue, "") + } + if container.State != runtimeapi.ContainerState_CONTAINER_RUNNING { + klog.V(5).Infof("Pod(%s) Non-running container(%s) probed", w.key.podUID, w.key.containerName) + w.probeController.result.Set(w.containerID, w.key, appsv1alpha1.ProbeFailed, fmt.Sprintf("Container(%s) is Non-running", w.key.containerName)) + } + + // Probe disabled for InitialDelaySeconds. + initialDelay := w.spec.InitialDelaySeconds + if initialDelay < 1 { + initialDelay = 1 + } + curDelay := int32(time.Since(time.Unix(0, container.CreatedAt)).Seconds()) + if curDelay < initialDelay { + klog.V(5).Infof("Pod(%s) container(%s) probe(%s) initialDelay(%d), but curDelay(%d)", + w.key.podUID, w.key.containerName, w.key.probeName, initialDelay, curDelay) + return true + } + + // the full container environment here, OR we must make a call to the CRI in order to get those environment + // values from the running container. + result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID) + if err != nil { + klog.V(5).Infof("Pod(%s) do container(%s) probe(%s) failed: %s", w.key.podUID, w.key.containerName, w.key.probeName, err.Error()) + // Prober error, throw away the result. + return true + } + if w.lastResult == result { + w.resultRun++ + } else { + w.lastResult = result + w.resultRun = 1 + } + + failureThreshold := w.spec.FailureThreshold + if failureThreshold <= 0 { + failureThreshold = 1 + } + successThreshold := w.spec.SuccessThreshold + if successThreshold <= 0 { + successThreshold = 1 + } + if (result == appsv1alpha1.ProbeFailed && w.resultRun < int(failureThreshold)) || + (result == appsv1alpha1.ProbeSucceeded && w.resultRun < int(successThreshold)) { + // Success or failure is below threshold - leave the probe state unchanged. + return true + } + w.probeController.result.Set(w.containerID, w.key, result, msg) + return true +} + +func (w *worker) getProbeSpec() *appsv1alpha1.ContainerProbeSpec { + return w.spec +} + +func (w *worker) updateProbeSpec(spec *appsv1alpha1.ContainerProbeSpec) { + if !reflect.DeepEqual(w.spec.Handler, spec.Handler) { + if w.containerID != "" { + klog.Infof("Pod(%s) container(%s) probe spec changed", w.key.podUID, w.key.containerName) + w.probeController.result.Set(w.containerID, w.key, w.initialValue, "") + } + } + w.spec = spec +}