diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 32227738b3e..01089465616 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -31,26 +31,35 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ) type OTAHandler func(kubernetes.Interface, string) http.Handler // GetPods return pod list -func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { +func GetPods(store cachemanager.StorageWrapper) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ - FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), - }) + objs, err := store.List("kubelet/pods") if err != nil { klog.Errorf("Get pod list failed, %v", err) WriteErr(w, "Get pod list failed", http.StatusInternalServerError) return } - klog.V(5).Infof("Got pod list: %v", podList) + + podList := new(corev1.PodList) + for _, obj := range objs { + pod, ok := obj.(*corev1.Pod) + if !ok { + klog.Errorf("Get pod list failed, %v", err) + WriteErr(w, "Get pod list failed", http.StatusInternalServerError) + return + } + podList.Items = append(podList.Items, *pod) + } // Successfully get pod list, response 200 - data, err := encodePodList(podList) + data, err := encodePods(podList) if err != nil { klog.Errorf("Encode pod list failed, %v", err) WriteErr(w, "Encode pod list failed", http.StatusInternalServerError) @@ -120,8 +129,8 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st return nil, true } -// encodePodList returns the encoded PodList -func encodePodList(podList *corev1.PodList) ([]byte, error) { +// Derived from kubelet encodePods +func encodePods(podList *corev1.PodList) (data []byte, err error) { codec := scheme.Codecs.LegacyCodec(runtimescheme.GroupVersion{Group: corev1.GroupName, Version: "v1"}) return runtime.Encode(codec, podList) } diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 71ddd0ba8f3..25060d4ad32 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -17,6 +17,7 @@ limitations under the License. package otaupdate import ( + "fmt" "net/http" "net/http/httptest" "testing" @@ -28,11 +29,16 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" ) func newPod(podName string) *corev1.Pod { pod := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, ObjectMeta: metav1.ObjectMeta{ GenerateName: podName, Namespace: metav1.NamespaceDefault, @@ -61,11 +67,24 @@ func SetPodUpgradeCondition(pod *corev1.Pod, ready corev1.ConditionStatus) { } func TestGetPods(t *testing.T) { + dir := t.TempDir() + dStorage, err := disk.NewDiskStorage(dir) + if err != nil { + t.Errorf("failed to create disk storage, %v", err) + } + sWrapper := cachemanager.NewStorageWrapper(dStorage) + updatablePod := newPodWithCondition("updatablePod", corev1.ConditionTrue) notUpdatablePod := newPodWithCondition("notUpdatablePod", corev1.ConditionFalse) normalPod := newPod("normalPod") - clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod) + pods := []*corev1.Pod{updatablePod, notUpdatablePod, normalPod} + for _, pod := range pods { + err = sWrapper.Create(fmt.Sprintf("kubelet/pods/default/%s", pod.Name), pod) + if err != nil { + t.Errorf("failed to create obj, %v", err) + } + } req, err := http.NewRequest("GET", "/openyurt.io/v1/pods", nil) if err != nil { @@ -73,7 +92,7 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset, "").ServeHTTP(rr, req) + GetPods(sWrapper).ServeHTTP(rr, req) expectedCode := http.StatusOK assert.Equal(t, expectedCode, rr.Code) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 7da09cbd23a..c086bb8f4a0 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -174,7 +174,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET") + c.Handle("/pods", ota.GetPods(cfg.StorageWrapper)).Methods("GET") c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST") }