From 493ac4f65e0924be669cc0b986f92902129686f4 Mon Sep 17 00:00:00 2001 From: hxcGit Date: Thu, 29 Sep 2022 15:55:13 +0800 Subject: [PATCH] use local cache to get pods Signed-off-by: hxcGit --- pkg/yurthub/otaupdate/ota.go | 27 +++++++++++++++++++-------- pkg/yurthub/otaupdate/ota_test.go | 8 +++++++- pkg/yurthub/server/server.go | 2 +- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 32227738b3e..679e04fa8d9 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -24,10 +24,12 @@ import ( "github.com/gorilla/mux" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" runtimescheme "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" @@ -37,20 +39,24 @@ import ( type OTAHandler func(kubernetes.Interface, string) http.Handler // GetPods return pod list -func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { +func GetPods(podLister corelisters.PodLister, nodeName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ - FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), - }) + pods, err := podLister.Pods("").List(labels.Everything()) if err != nil { klog.Errorf("Get pod list failed, %v", err) WriteErr(w, "Get pod list failed", http.StatusInternalServerError) return } - klog.V(5).Infof("Got pod list: %v", podList) + + var otaPods []*corev1.Pod + for _, pod := range pods { + if pod.Spec.NodeName == nodeName { + otaPods = append(otaPods, pod) + } + } // Successfully get pod list, response 200 - data, err := encodePodList(podList) + data, err := encodePods(otaPods) if err != nil { klog.Errorf("Encode pod list failed, %v", err) WriteErr(w, "Encode pod list failed", http.StatusInternalServerError) @@ -120,8 +126,13 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st return nil, true } -// encodePodList returns the encoded PodList -func encodePodList(podList *corev1.PodList) ([]byte, error) { +// Derived from kubelet encodePods +func encodePods(pods []*corev1.Pod) (data []byte, err error) { + podList := new(corev1.PodList) + for _, pod := range pods { + podList.Items = append(podList.Items, *pod) + } + codec := scheme.Codecs.LegacyCodec(runtimescheme.GroupVersion{Group: corev1.GroupName, Version: "v1"}) return runtime.Encode(codec, podList) } diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 71ddd0ba8f3..38cd92762bf 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" @@ -66,6 +67,11 @@ func TestGetPods(t *testing.T) { normalPod := newPod("normalPod") clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod) + podInformer := informers.NewSharedInformerFactory(clientset, 0).Core().V1().Pods() + + podInformer.Informer().GetStore().Add(updatablePod) + podInformer.Informer().GetStore().Add(notUpdatablePod) + podInformer.Informer().GetStore().Add(normalPod) req, err := http.NewRequest("GET", "/openyurt.io/v1/pods", nil) if err != nil { @@ -73,7 +79,7 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset, "").ServeHTTP(rr, req) + GetPods(podInformer.Lister(), "").ServeHTTP(rr, req) expectedCode := http.StatusOK assert.Equal(t, expectedCode, rr.Code) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 7da09cbd23a..1530815bfcf 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -174,7 +174,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET") + c.Handle("/pods", ota.GetPods(cfg.SharedFactory.Core().V1().Pods().Lister(), cfg.NodeName)).Methods("GET") c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST") }