From 6e6715bf5109783b4bb479df5b4a7ca2abed5ea2 Mon Sep 17 00:00:00 2001 From: hxcGit Date: Sat, 20 Aug 2022 21:00:40 +0800 Subject: [PATCH 01/10] add ota upgrade for daemonset Signed-off-by: hxcGit --- pkg/yurthub/otaupgrade/daemonset_update.go | 92 ++++++++++++++++++++++ pkg/yurthub/otaupgrade/ota.go | 65 +++++++++++++++ pkg/yurthub/otaupgrade/util.go | 32 ++++++++ pkg/yurthub/server/server.go | 11 ++- 4 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 pkg/yurthub/otaupgrade/daemonset_update.go create mode 100644 pkg/yurthub/otaupgrade/ota.go create mode 100644 pkg/yurthub/otaupgrade/util.go diff --git a/pkg/yurthub/otaupgrade/daemonset_update.go b/pkg/yurthub/otaupgrade/daemonset_update.go new file mode 100644 index 00000000000..f2e7045e020 --- /dev/null +++ b/pkg/yurthub/otaupgrade/daemonset_update.go @@ -0,0 +1,92 @@ +package otaupgrade + +import ( + "fmt" + "strconv" + + client "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + upgradeUtil "github.com/openyurtio/openyurt/pkg/controller/podupgrade" +) + +// getDaemonsetPodUpgradeStatus compares spec between all daemonsets and their pods +// to determine whether new version application is availabel +func getDaemonsetPodUpgradeStatus(clientset *client.Clientset) ([]*PodStatus, error) { + pods, err := Pods(clientset, "") + if err != nil { + klog.Errorf("Get all pods in current node failed, %v", err) + return nil, err + } + + podStatusList := make([]*PodStatus, 0) + + for _, pod := range pods.Items { + var upgradable bool + v, ok := pod.Annotations[upgradeUtil.PodUpgradableAnnotation] + + if !ok { + upgradable = false + } else { + upgradable, err = strconv.ParseBool(v) + if err != nil { + klog.Warningf("Pod %v with invalid upgrade annotation %v", pod.Name, v) + continue + } + } + + klog.V(5).Infof("Pod %v with upgrade annotation %v", pod.Name, upgradable) + + if ok && upgradable { + podStatus := &PodStatus{ + Namespace: pod.Namespace, + PodName: pod.Name, + Upgradable: upgradable, + } + + podStatusList = append(podStatusList, podStatus) + } + } + + return podStatusList, nil +} + +// applyUpgrade execute pod upgrade process by deleting pod under OnDelete update strategy +func applyUpgrade(clientset *client.Clientset, namespace, podName string) error { + klog.Infof("Start to upgrade daemonset pod:%v/%v", namespace, podName) + + pod, err := Pod(clientset, namespace, podName) + if err != nil { + klog.Errorf("Get pod %v/%v failed, %v", namespace, podName, err) + return err + } + + // Pod is not upgradable without annotation "apps.openyurt.io/pod-upgradable" + v, ok := pod.Annotations[upgradeUtil.PodUpgradableAnnotation] + if !ok { + klog.Infof("Daemonset pod: %v/%v is not upgradable", namespace, podName) + return fmt.Errorf("Daemonset pod: %v/%v is not upgradable", namespace, podName) + } + + // Pod is not upgradable when annotation "apps.openyurt.io/pod-upgradable" value cannot be parsed + upgradable, err := strconv.ParseBool(v) + if err != nil { + klog.Errorf("Pod %v is not upgradable with invalid upgrade annotation %v", pod.Name, v) + return err + } + + // Pod is not upgradable when annotation "apps.openyurt.io/pod-upgradable" value is false + if !upgradable { + klog.Infof("Daemonset pod: %v/%v is not upgradable", namespace, podName) + return fmt.Errorf("Current pod is not upgradable") + } + + klog.Infof("Daemonset pod: %v/%v is upgradable", namespace, podName) + err = DeletePod(clientset, namespace, podName) + if err != nil { + klog.Errorf("Upgrade pod %v/%v failed when delete pod %v", namespace, pod.Name, err) + return err + } + + klog.Infof("Daemonset pod: %v/%v upgrade success", namespace, podName) + return nil +} diff --git a/pkg/yurthub/otaupgrade/ota.go b/pkg/yurthub/otaupgrade/ota.go new file mode 100644 index 00000000000..af1b22a310d --- /dev/null +++ b/pkg/yurthub/otaupgrade/ota.go @@ -0,0 +1,65 @@ +package otaupgrade + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/gorilla/mux" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +type PodStatus struct { + Namespace string + PodName string + // TODO: whether need to display + // OldImgs []string + // NewImgs []string + Upgradable bool +} + +// GetPods return all daemonset pods' upgrade information by PodStatus +func GetPods(clientset *kubernetes.Clientset) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + psList, err := getDaemonsetPodUpgradeStatus(clientset) + if err != nil { + Err(fmt.Errorf("Get daemonset's pods upgrade status failed"), w, r) + return + } + klog.V(4).Infof("Got pods status list: %v", psList) + + // Successfully get daemonsets/pods upgrade info + w.Header().Set("content-type", "text/json") + data, err := json.Marshal(psList) + if err != nil { + klog.Errorf("Marshal pods status failed: %v", err.Error()) + Err(fmt.Errorf("Get daemonset's pods upgrade status failed: data transfer to json format failed."), w, r) + return + } + + w.WriteHeader(http.StatusOK) + n, err := w.Write(data) + if err != nil || n != len(data) { + klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err) + } + }) +} + +// UpgradePod upgrade a specifc pod(namespace/podname) to the latest version +func UpgradePod(clientset *kubernetes.Clientset) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + params := mux.Vars(r) + namespace := params["ns"] + podName := params["podname"] + + err := applyUpgrade(clientset, namespace, podName) + if err != nil { + Err(fmt.Errorf("Apply upgrade failed"), w, r) + return + } + w.WriteHeader(http.StatusOK) + }) +} diff --git a/pkg/yurthub/otaupgrade/util.go b/pkg/yurthub/otaupgrade/util.go new file mode 100644 index 00000000000..542ebae2205 --- /dev/null +++ b/pkg/yurthub/otaupgrade/util.go @@ -0,0 +1,32 @@ +package otaupgrade + +import ( + "context" + "net/http" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +func Pod(clientset *kubernetes.Clientset, namespace, podName string) (*corev1.Pod, error) { + return clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) +} + +func Pods(clientset *kubernetes.Clientset, namespace string) (*corev1.PodList, error) { + return clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{}) +} + +func DeletePod(clientset *kubernetes.Clientset, namespace, podName string) error { + return clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) +} + +func Err(err error, w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + n := len([]byte(err.Error())) + nw, e := w.Write([]byte(err.Error())) + if e != nil || nw != n { + klog.Errorf("write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) + } +} diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 10f03a8779d..dca358553f7 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -38,6 +38,7 @@ import ( certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" + ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupgrade" ) // Server is an interface for providing http service for yurthub @@ -62,13 +63,13 @@ func NewYurtHubServer(cfg *config.YurtHubConfiguration, proxyHandler http.Handler, rest *rest.RestConfigManager) (Server, error) { hubMux := mux.NewRouter() - registerHandlers(hubMux, cfg, certificateMgr) restCfg := rest.GetRestConfig(false) clientSet, err := kubernetes.NewForConfig(restCfg) if err != nil { klog.Errorf("cannot create the client set: %v", err) return nil, err } + registerHandlers(hubMux, cfg, certificateMgr, clientSet) hubServer := &http.Server{ Addr: cfg.YurtHubServerAddr, Handler: hubMux, @@ -156,7 +157,7 @@ func (s *yurtHubServer) Run() { } // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. -func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager) { +func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, clientset *kubernetes.Clientset) { // register handlers for update join token c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT") @@ -170,6 +171,12 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica // register handler for metrics c.Handle("/metrics", promhttp.Handler()) + + // register handler for ota upgrade + c.Handle("/pods", ota.GetPods(clientset)).Methods("GET") + c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpgradePod(clientset)).Methods("POST") + // c.HandleFunc("/pods", ota.GetPods).Methods("GET") + // c.HandleFunc("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpgradePod).Methods("POST") } // healthz returns ok for healthz request From 1c6c86fd446bd987b4e7b0519bf0386384b1a74d Mon Sep 17 00:00:00 2001 From: Xuecheng Date: Tue, 13 Sep 2022 22:06:58 -0700 Subject: [PATCH 02/10] add pod updatable annotation Signed-off-by: Xuecheng --- pkg/yurthub/otaupdate/ota.go | 168 +++++++++++++++++++++ pkg/yurthub/otaupdate/ota_test.go | 108 +++++++++++++ pkg/yurthub/otaupdate/util.go | 33 ++++ pkg/yurthub/otaupgrade/daemonset_update.go | 92 ----------- pkg/yurthub/otaupgrade/ota.go | 65 -------- pkg/yurthub/otaupgrade/util.go | 32 ---- pkg/yurthub/server/server.go | 6 +- 7 files changed, 312 insertions(+), 192 deletions(-) create mode 100644 pkg/yurthub/otaupdate/ota.go create mode 100644 pkg/yurthub/otaupdate/ota_test.go create mode 100644 pkg/yurthub/otaupdate/util.go delete mode 100644 pkg/yurthub/otaupgrade/daemonset_update.go delete mode 100644 pkg/yurthub/otaupgrade/ota.go delete mode 100644 pkg/yurthub/otaupgrade/util.go diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go new file mode 100644 index 00000000000..cc1ae2e06b2 --- /dev/null +++ b/pkg/yurthub/otaupdate/ota.go @@ -0,0 +1,168 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otaupdate + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + + "github.com/gorilla/mux" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + client "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +const ( + PodUpdatableAnnotation = "apps.openyurt.io/pod-updatable" +) + +type PodStatus struct { + Namespace string + PodName string + // TODO: whether need to display + // OldImgs []string + // NewImgs []string + Updatable bool +} + +// GetPods return all daemonset pods' update information by PodStatus +func GetPods(clientset kubernetes.Interface) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + podsStatus, err := getPodUpdateStatus(clientset) + if err != nil { + klog.Errorf("Get pods updatable status failed, %v", err) + returnErr(fmt.Errorf("Get daemonset's pods update status failed"), w, http.StatusInternalServerError) + return + } + klog.V(5).Infof("Got pods status list: %v", podsStatus) + + // Successfully get daemonsets/pods update info + w.Header().Set("content-type", "text/json") + data, err := json.Marshal(podsStatus) + if err != nil { + klog.Errorf("Marshal pods status failed: %v", err.Error()) + returnErr(fmt.Errorf("Get daemonset's pods update status failed: data transfer to json format failed."), w, http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + n, err := w.Write(data) + if err != nil || n != len(data) { + klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err) + } + }) +} + +// UpdatePod update a specifc pod(namespace/podname) to the latest version +func UpdatePod(clientset kubernetes.Interface) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + namespace := params["ns"] + podName := params["podname"] + + klog.Info("12", namespace, podName) + err, ok := applyUpdate(clientset, namespace, podName) + if err != nil { + returnErr(fmt.Errorf("Apply update failed"), w, http.StatusInternalServerError) + return + } + if !ok { + returnErr(fmt.Errorf("Pod is not-updatable"), w, http.StatusForbidden) + return + } + w.WriteHeader(http.StatusOK) + }) +} + +// getDaemonsetPodUpdateStatus check pods annotation "apps.openyurt.io/pod-updatable" +// to determine whether new version application is availabel +func getPodUpdateStatus(clientset client.Interface) ([]*PodStatus, error) { + pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + + podsStatus := make([]*PodStatus, 0) + for _, pod := range pods.Items { + var updatable bool + v, ok := pod.Annotations[PodUpdatableAnnotation] + + if !ok { + updatable = false + } else { + updatable, err = strconv.ParseBool(v) + if err != nil { + klog.Warningf("Pod %v with invalid update annotation %v", pod.Name, v) + continue + } + } + + klog.V(5).Infof("Pod %v with update annotation %v", pod.Name, updatable) + podStatus := &PodStatus{ + Namespace: pod.Namespace, + PodName: pod.Name, + Updatable: updatable, + } + podsStatus = append(podsStatus, podStatus) + + } + + return podsStatus, nil +} + +// applyUpdate execute pod update process by deleting pod under OnDelete update strategy +func applyUpdate(clientset client.Interface, namespace, podName string) (error, bool) { + pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Get pod %v/%v failed, %v", namespace, podName, err) + return err, false + } + + // Pod will not be updated without annotation "apps.openyurt.io/pod-updatable" + v, ok := pod.Annotations[PodUpdatableAnnotation] + if !ok { + klog.Infof("Daemonset pod: %v/%v is not updatable without PodUpdatable annotation", namespace, podName) + return nil, false + } + + // Pod will not be updated when annotation "apps.openyurt.io/pod-updatable" value cannot be parsed + updatable, err := strconv.ParseBool(v) + if err != nil { + klog.Infof("Pod %v/%v is not updatable with invalid update annotation %v", namespace, podName, v) + return nil, false + } + + // Pod will not be updated when annotation "apps.openyurt.io/pod-updatable" value is false + if !updatable { + klog.Infof("Pod %v/%v is not updatable with PodUpdatable annotation is %v", namespace, podName, updatable) + return nil, false + } + + klog.V(5).Infof("Pod: %v/%v is updatable", namespace, podName) + err = clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + if err != nil { + klog.Errorf("Update pod %v/%v failed when delete, %v", namespace, podName, err) + return err, false + } + + klog.Infof("Update pod: %v/%v success", namespace, podName) + return nil, true +} diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go new file mode 100644 index 00000000000..4f49f1d61e1 --- /dev/null +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otaupdate + +import ( + "net/http" + "net/http/httptest" + "strconv" + "testing" + + "github.com/gorilla/mux" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func newPod(podName string) *corev1.Pod { + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: podName, + Namespace: metav1.NamespaceDefault, + }, + } + pod.Name = podName + return pod +} + +func setPodUpdatableAnnotation(pod *corev1.Pod, ok bool) { + metav1.SetMetaDataAnnotation(&pod.ObjectMeta, PodUpdatableAnnotation, strconv.FormatBool(ok)) +} + +func TestGetPods(t *testing.T) { + updatablePod := newPod("updatable") + setPodUpdatableAnnotation(updatablePod, true) + notUpdatablePod := newPod("not-updatable") + setPodUpdatableAnnotation(notUpdatablePod, false) + + clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod) + + req, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + + rr := httptest.NewRecorder() + + GetPods(clientset).ServeHTTP(rr, req) + + t.Logf("rr code is %+v, rr body is %+v", rr.Code, rr.Body) + // if status := rr.Code; status != http.StatusOK { + // t.Errorf("handler returned wrong status code: got %v want %v", + // status, http.StatusOK) + // } + + expected := `[{"Namespace":"default","PodName":"not-updatable","Updatable":false},{"Namespace":"default","PodName":"updatable","Updatable":true}]` + + // data := json.Unmarshal(rr.Body.Bytes(),) + assert.Equal(t, expected, rr.Body.String()) + // if rr.Body.String() != expected { + // t.Errorf("handler returned unexpected body: got %v want %v", + // rr.Body.String(), expected) + // } + +} + +func TestUpdatePod(t *testing.T) { + + updatablePod := newPod("updatable-pod") + setPodUpdatableAnnotation(updatablePod, true) + notUpdatablePod := newPod("not-updatable-pod") + setPodUpdatableAnnotation(notUpdatablePod, false) + + clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod) + + req, err := http.NewRequest("POST", "/openyurt.io/v1/namespaces/default/pods/updatable-pod/update", nil) + if err != nil { + t.Fatal(err) + } + + rr := httptest.NewRecorder() + + vars := map[string]string{ + "ns": "default", + "podname": "updatable-pod", + } + req = mux.SetURLVars(req, vars) + + UpdatePod(clientset).ServeHTTP(rr, req) + + t.Logf("rr code is %+v, rr body is %+v", rr.Code, rr.Body) + +} diff --git a/pkg/yurthub/otaupdate/util.go b/pkg/yurthub/otaupdate/util.go new file mode 100644 index 00000000000..204c7201e38 --- /dev/null +++ b/pkg/yurthub/otaupdate/util.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otaupdate + +import ( + "net/http" + + "k8s.io/klog/v2" +) + +// returnErr write the given error to response and set response header to the given error type +func returnErr(err error, w http.ResponseWriter,errType int) { + w.WriteHeader(errType) + n := len([]byte(err.Error())) + nw, e := w.Write([]byte(err.Error())) + if e != nil || nw != n { + klog.Errorf("write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) + } +} diff --git a/pkg/yurthub/otaupgrade/daemonset_update.go b/pkg/yurthub/otaupgrade/daemonset_update.go deleted file mode 100644 index f2e7045e020..00000000000 --- a/pkg/yurthub/otaupgrade/daemonset_update.go +++ /dev/null @@ -1,92 +0,0 @@ -package otaupgrade - -import ( - "fmt" - "strconv" - - client "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" - upgradeUtil "github.com/openyurtio/openyurt/pkg/controller/podupgrade" -) - -// getDaemonsetPodUpgradeStatus compares spec between all daemonsets and their pods -// to determine whether new version application is availabel -func getDaemonsetPodUpgradeStatus(clientset *client.Clientset) ([]*PodStatus, error) { - pods, err := Pods(clientset, "") - if err != nil { - klog.Errorf("Get all pods in current node failed, %v", err) - return nil, err - } - - podStatusList := make([]*PodStatus, 0) - - for _, pod := range pods.Items { - var upgradable bool - v, ok := pod.Annotations[upgradeUtil.PodUpgradableAnnotation] - - if !ok { - upgradable = false - } else { - upgradable, err = strconv.ParseBool(v) - if err != nil { - klog.Warningf("Pod %v with invalid upgrade annotation %v", pod.Name, v) - continue - } - } - - klog.V(5).Infof("Pod %v with upgrade annotation %v", pod.Name, upgradable) - - if ok && upgradable { - podStatus := &PodStatus{ - Namespace: pod.Namespace, - PodName: pod.Name, - Upgradable: upgradable, - } - - podStatusList = append(podStatusList, podStatus) - } - } - - return podStatusList, nil -} - -// applyUpgrade execute pod upgrade process by deleting pod under OnDelete update strategy -func applyUpgrade(clientset *client.Clientset, namespace, podName string) error { - klog.Infof("Start to upgrade daemonset pod:%v/%v", namespace, podName) - - pod, err := Pod(clientset, namespace, podName) - if err != nil { - klog.Errorf("Get pod %v/%v failed, %v", namespace, podName, err) - return err - } - - // Pod is not upgradable without annotation "apps.openyurt.io/pod-upgradable" - v, ok := pod.Annotations[upgradeUtil.PodUpgradableAnnotation] - if !ok { - klog.Infof("Daemonset pod: %v/%v is not upgradable", namespace, podName) - return fmt.Errorf("Daemonset pod: %v/%v is not upgradable", namespace, podName) - } - - // Pod is not upgradable when annotation "apps.openyurt.io/pod-upgradable" value cannot be parsed - upgradable, err := strconv.ParseBool(v) - if err != nil { - klog.Errorf("Pod %v is not upgradable with invalid upgrade annotation %v", pod.Name, v) - return err - } - - // Pod is not upgradable when annotation "apps.openyurt.io/pod-upgradable" value is false - if !upgradable { - klog.Infof("Daemonset pod: %v/%v is not upgradable", namespace, podName) - return fmt.Errorf("Current pod is not upgradable") - } - - klog.Infof("Daemonset pod: %v/%v is upgradable", namespace, podName) - err = DeletePod(clientset, namespace, podName) - if err != nil { - klog.Errorf("Upgrade pod %v/%v failed when delete pod %v", namespace, pod.Name, err) - return err - } - - klog.Infof("Daemonset pod: %v/%v upgrade success", namespace, podName) - return nil -} diff --git a/pkg/yurthub/otaupgrade/ota.go b/pkg/yurthub/otaupgrade/ota.go deleted file mode 100644 index af1b22a310d..00000000000 --- a/pkg/yurthub/otaupgrade/ota.go +++ /dev/null @@ -1,65 +0,0 @@ -package otaupgrade - -import ( - "encoding/json" - "fmt" - "net/http" - - "github.com/gorilla/mux" - "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" -) - -type PodStatus struct { - Namespace string - PodName string - // TODO: whether need to display - // OldImgs []string - // NewImgs []string - Upgradable bool -} - -// GetPods return all daemonset pods' upgrade information by PodStatus -func GetPods(clientset *kubernetes.Clientset) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - psList, err := getDaemonsetPodUpgradeStatus(clientset) - if err != nil { - Err(fmt.Errorf("Get daemonset's pods upgrade status failed"), w, r) - return - } - klog.V(4).Infof("Got pods status list: %v", psList) - - // Successfully get daemonsets/pods upgrade info - w.Header().Set("content-type", "text/json") - data, err := json.Marshal(psList) - if err != nil { - klog.Errorf("Marshal pods status failed: %v", err.Error()) - Err(fmt.Errorf("Get daemonset's pods upgrade status failed: data transfer to json format failed."), w, r) - return - } - - w.WriteHeader(http.StatusOK) - n, err := w.Write(data) - if err != nil || n != len(data) { - klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err) - } - }) -} - -// UpgradePod upgrade a specifc pod(namespace/podname) to the latest version -func UpgradePod(clientset *kubernetes.Clientset) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - params := mux.Vars(r) - namespace := params["ns"] - podName := params["podname"] - - err := applyUpgrade(clientset, namespace, podName) - if err != nil { - Err(fmt.Errorf("Apply upgrade failed"), w, r) - return - } - w.WriteHeader(http.StatusOK) - }) -} diff --git a/pkg/yurthub/otaupgrade/util.go b/pkg/yurthub/otaupgrade/util.go deleted file mode 100644 index 542ebae2205..00000000000 --- a/pkg/yurthub/otaupgrade/util.go +++ /dev/null @@ -1,32 +0,0 @@ -package otaupgrade - -import ( - "context" - "net/http" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/klog/v2" -) - -func Pod(clientset *kubernetes.Clientset, namespace, podName string) (*corev1.Pod, error) { - return clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) -} - -func Pods(clientset *kubernetes.Clientset, namespace string) (*corev1.PodList, error) { - return clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{}) -} - -func DeletePod(clientset *kubernetes.Clientset, namespace, podName string) error { - return clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) -} - -func Err(err error, w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - n := len([]byte(err.Error())) - nw, e := w.Write([]byte(err.Error())) - if e != nil || nw != n { - klog.Errorf("write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) - } -} diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index dca358553f7..321b4a2dda0 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -38,7 +38,7 @@ import ( certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" - ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupgrade" + ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate" ) // Server is an interface for providing http service for yurthub @@ -173,8 +173,8 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.GetPods(clientset)).Methods("GET") - c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpgradePod(clientset)).Methods("POST") + c.Handle("/openyurt.io/v1/pods", ota.GetPods(clientset)).Methods("GET") + c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/update", ota.UpdatePod(clientset)).Methods("POST") // c.HandleFunc("/pods", ota.GetPods).Methods("GET") // c.HandleFunc("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpgradePod).Methods("POST") } From e34d558c4248c857a73a9a945df73409721b55a5 Mon Sep 17 00:00:00 2001 From: Xuecheng Date: Wed, 14 Sep 2022 08:32:35 -0700 Subject: [PATCH 03/10] add unit test Signed-off-by: Xuecheng --- pkg/yurthub/otaupdate/ota.go | 15 ++++- pkg/yurthub/otaupdate/ota_test.go | 105 ++++++++++++++++++------------ pkg/yurthub/otaupdate/util.go | 33 ---------- pkg/yurthub/server/server.go | 2 - 4 files changed, 74 insertions(+), 81 deletions(-) delete mode 100644 pkg/yurthub/otaupdate/util.go diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index cc1ae2e06b2..f8aff7887ea 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -26,7 +26,6 @@ import ( "github.com/gorilla/mux" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - client "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -94,7 +93,7 @@ func UpdatePod(clientset kubernetes.Interface) http.Handler { // getDaemonsetPodUpdateStatus check pods annotation "apps.openyurt.io/pod-updatable" // to determine whether new version application is availabel -func getPodUpdateStatus(clientset client.Interface) ([]*PodStatus, error) { +func getPodUpdateStatus(clientset kubernetes.Interface) ([]*PodStatus, error) { pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err @@ -129,7 +128,7 @@ func getPodUpdateStatus(clientset client.Interface) ([]*PodStatus, error) { } // applyUpdate execute pod update process by deleting pod under OnDelete update strategy -func applyUpdate(clientset client.Interface, namespace, podName string) (error, bool) { +func applyUpdate(clientset kubernetes.Interface, namespace, podName string) (error, bool) { pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) if err != nil { klog.Errorf("Get pod %v/%v failed, %v", namespace, podName, err) @@ -166,3 +165,13 @@ func applyUpdate(clientset client.Interface, namespace, podName string) (error, klog.Infof("Update pod: %v/%v success", namespace, podName) return nil, true } + +// returnErr write the given error to response and set response header to the given error type +func returnErr(err error, w http.ResponseWriter, errType int) { + w.WriteHeader(errType) + n := len([]byte(err.Error())) + nw, e := w.Write([]byte(err.Error())) + if e != nil || nw != n { + klog.Errorf("write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) + } +} diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 4f49f1d61e1..1fe487917f4 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -41,68 +41,87 @@ func newPod(podName string) *corev1.Pod { return pod } +func newPodWithAnnotation(podName string, ready bool) *corev1.Pod { + pod := newPod(podName) + setPodUpdatableAnnotation(pod, ready) + + return pod +} + func setPodUpdatableAnnotation(pod *corev1.Pod, ok bool) { metav1.SetMetaDataAnnotation(&pod.ObjectMeta, PodUpdatableAnnotation, strconv.FormatBool(ok)) } func TestGetPods(t *testing.T) { - updatablePod := newPod("updatable") - setPodUpdatableAnnotation(updatablePod, true) - notUpdatablePod := newPod("not-updatable") - setPodUpdatableAnnotation(notUpdatablePod, false) + updatablePod := newPodWithAnnotation("updatablePod", true) + notUpdatablePod := newPodWithAnnotation("notUpdatablePod", false) + normalPod := newPod("normalPod") - clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod) + clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod) - req, err := http.NewRequest("POST", "/", nil) + req, err := http.NewRequest("GET", "/openyurt.io/v1/pods", nil) if err != nil { t.Fatal(err) } - rr := httptest.NewRecorder() GetPods(clientset).ServeHTTP(rr, req) - t.Logf("rr code is %+v, rr body is %+v", rr.Code, rr.Body) - // if status := rr.Code; status != http.StatusOK { - // t.Errorf("handler returned wrong status code: got %v want %v", - // status, http.StatusOK) - // } - - expected := `[{"Namespace":"default","PodName":"not-updatable","Updatable":false},{"Namespace":"default","PodName":"updatable","Updatable":true}]` - - // data := json.Unmarshal(rr.Body.Bytes(),) - assert.Equal(t, expected, rr.Body.String()) - // if rr.Body.String() != expected { - // t.Errorf("handler returned unexpected body: got %v want %v", - // rr.Body.String(), expected) - // } + expectedCode := http.StatusOK + expectedData := `[{"Namespace":"default","PodName":"normalPod","Updatable":false},{"Namespace":"default","PodName":"notUpdatablePod","Updatable":false},{"Namespace":"default","PodName":"updatablePod","Updatable":true}]` + assert.Equal(t, expectedCode, rr.Code) + assert.Equal(t, expectedData, rr.Body.String()) } func TestUpdatePod(t *testing.T) { - - updatablePod := newPod("updatable-pod") - setPodUpdatableAnnotation(updatablePod, true) - notUpdatablePod := newPod("not-updatable-pod") - setPodUpdatableAnnotation(notUpdatablePod, false) - - clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod) - - req, err := http.NewRequest("POST", "/openyurt.io/v1/namespaces/default/pods/updatable-pod/update", nil) - if err != nil { - t.Fatal(err) + tests := []struct { + reqURL string + pod *corev1.Pod + podName string + expectedCode int + expectedData string + }{ + { + reqURL: "/openyurt.io/v1/namespaces/default/pods/updatablePod/update", + podName: "updatablePod", + pod: newPodWithAnnotation("updatablePod", true), + expectedCode: http.StatusOK, + expectedData: "", + }, + { + reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update", + podName: "notUpdatablePod", + pod: newPodWithAnnotation("notUpdatablePod", false), + expectedCode: http.StatusForbidden, + expectedData: "Pod is not-updatable", + }, + { + reqURL: "/openyurt.io/v1/namespaces/default/pods/wrongName/update", + podName: "wrongName", + pod: newPodWithAnnotation("trueName", true), + expectedCode: http.StatusInternalServerError, + expectedData: "Apply update failed", + }, } - - rr := httptest.NewRecorder() - - vars := map[string]string{ - "ns": "default", - "podname": "updatable-pod", + for _, test := range tests { + clientset := fake.NewSimpleClientset(test.pod) + + req, err := http.NewRequest("POST", test.reqURL, nil) + if err != nil { + t.Fatal(err) + } + vars := map[string]string{ + "ns": "default", + "podname": test.podName, + } + req = mux.SetURLVars(req, vars) + rr := httptest.NewRecorder() + + UpdatePod(clientset).ServeHTTP(rr, req) + + assert.Equal(t, test.expectedCode, rr.Code) + assert.Equal(t, test.expectedData, rr.Body.String()) } - req = mux.SetURLVars(req, vars) - - UpdatePod(clientset).ServeHTTP(rr, req) - - t.Logf("rr code is %+v, rr body is %+v", rr.Code, rr.Body) } diff --git a/pkg/yurthub/otaupdate/util.go b/pkg/yurthub/otaupdate/util.go deleted file mode 100644 index 204c7201e38..00000000000 --- a/pkg/yurthub/otaupdate/util.go +++ /dev/null @@ -1,33 +0,0 @@ -/* -Copyright 2022 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package otaupdate - -import ( - "net/http" - - "k8s.io/klog/v2" -) - -// returnErr write the given error to response and set response header to the given error type -func returnErr(err error, w http.ResponseWriter,errType int) { - w.WriteHeader(errType) - n := len([]byte(err.Error())) - nw, e := w.Write([]byte(err.Error())) - if e != nil || nw != n { - klog.Errorf("write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) - } -} diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 321b4a2dda0..eb064c32c0d 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -175,8 +175,6 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica // register handler for ota upgrade c.Handle("/openyurt.io/v1/pods", ota.GetPods(clientset)).Methods("GET") c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/update", ota.UpdatePod(clientset)).Methods("POST") - // c.HandleFunc("/pods", ota.GetPods).Methods("GET") - // c.HandleFunc("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpgradePod).Methods("POST") } // healthz returns ok for healthz request From 3e09f57f935e68611cad10d646e63aa809940b94 Mon Sep 17 00:00:00 2001 From: Xuecheng Date: Thu, 15 Sep 2022 01:56:08 -0700 Subject: [PATCH 04/10] use pod condition to specify upgradability Signed-off-by: Xuecheng --- pkg/yurthub/otaupdate/ota.go | 109 ++++++++++-------------------- pkg/yurthub/otaupdate/ota_test.go | 30 ++++---- pkg/yurthub/server/server.go | 8 +-- 3 files changed, 58 insertions(+), 89 deletions(-) diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index f8aff7887ea..18d3048e529 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -21,44 +21,38 @@ import ( "encoding/json" "fmt" "net/http" - "strconv" "github.com/gorilla/mux" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) +// TODO(hxc): should use pkg/controller/daemonpodupdater.PodNeedUpgrade const ( - PodUpdatableAnnotation = "apps.openyurt.io/pod-updatable" + PodNeedUpgrade corev1.PodConditionType = "PodNeedUpgrade" ) -type PodStatus struct { - Namespace string - PodName string - // TODO: whether need to display - // OldImgs []string - // NewImgs []string - Updatable bool -} - -// GetPods return all daemonset pods' update information by PodStatus -func GetPods(clientset kubernetes.Interface) http.Handler { +// GetPods return pod list +func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - podsStatus, err := getPodUpdateStatus(clientset) + podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ + FieldSelector: "spec.nodeName=" + nodeName, + }) if err != nil { - klog.Errorf("Get pods updatable status failed, %v", err) - returnErr(fmt.Errorf("Get daemonset's pods update status failed"), w, http.StatusInternalServerError) + klog.Errorf("Get pod list failed, %v", err) + returnErr(fmt.Errorf("Get pods list failed"), w, http.StatusInternalServerError) return } - klog.V(5).Infof("Got pods status list: %v", podsStatus) + klog.V(5).Infof("Got pod list: %v", podList) - // Successfully get daemonsets/pods update info + // Successfully get pod list w.Header().Set("content-type", "text/json") - data, err := json.Marshal(podsStatus) + data, err := json.Marshal(podList) if err != nil { - klog.Errorf("Marshal pods status failed: %v", err.Error()) - returnErr(fmt.Errorf("Get daemonset's pods update status failed: data transfer to json format failed."), w, http.StatusInternalServerError) + klog.Errorf("Marshal pod list failed: %v", err.Error()) + returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."), w, http.StatusInternalServerError) return } @@ -77,7 +71,6 @@ func UpdatePod(clientset kubernetes.Interface) http.Handler { namespace := params["ns"] podName := params["podname"] - klog.Info("12", namespace, podName) err, ok := applyUpdate(clientset, namespace, podName) if err != nil { returnErr(fmt.Errorf("Apply update failed"), w, http.StatusInternalServerError) @@ -91,42 +84,6 @@ func UpdatePod(clientset kubernetes.Interface) http.Handler { }) } -// getDaemonsetPodUpdateStatus check pods annotation "apps.openyurt.io/pod-updatable" -// to determine whether new version application is availabel -func getPodUpdateStatus(clientset kubernetes.Interface) ([]*PodStatus, error) { - pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return nil, err - } - - podsStatus := make([]*PodStatus, 0) - for _, pod := range pods.Items { - var updatable bool - v, ok := pod.Annotations[PodUpdatableAnnotation] - - if !ok { - updatable = false - } else { - updatable, err = strconv.ParseBool(v) - if err != nil { - klog.Warningf("Pod %v with invalid update annotation %v", pod.Name, v) - continue - } - } - - klog.V(5).Infof("Pod %v with update annotation %v", pod.Name, updatable) - podStatus := &PodStatus{ - Namespace: pod.Namespace, - PodName: pod.Name, - Updatable: updatable, - } - podsStatus = append(podsStatus, podStatus) - - } - - return podsStatus, nil -} - // applyUpdate execute pod update process by deleting pod under OnDelete update strategy func applyUpdate(clientset kubernetes.Interface, namespace, podName string) (error, bool) { pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) @@ -135,23 +92,15 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName string) (err return err, false } - // Pod will not be updated without annotation "apps.openyurt.io/pod-updatable" - v, ok := pod.Annotations[PodUpdatableAnnotation] - if !ok { - klog.Infof("Daemonset pod: %v/%v is not updatable without PodUpdatable annotation", namespace, podName) + // Pod will not be updated while it's being deleted + if pod.DeletionTimestamp != nil { + klog.Infof("Pod %v/%v is deleting, can not update", namespace, podName) return nil, false } - // Pod will not be updated when annotation "apps.openyurt.io/pod-updatable" value cannot be parsed - updatable, err := strconv.ParseBool(v) - if err != nil { - klog.Infof("Pod %v/%v is not updatable with invalid update annotation %v", namespace, podName, v) - return nil, false - } - - // Pod will not be updated when annotation "apps.openyurt.io/pod-updatable" value is false - if !updatable { - klog.Infof("Pod %v/%v is not updatable with PodUpdatable annotation is %v", namespace, podName, updatable) + // Pod will not be updated without pod condition PodNeedUpgrade=true + if !IsPodUpdatable(pod) { + klog.Infof("Pod: %v/%v is not updatable", namespace, podName) return nil, false } @@ -175,3 +124,19 @@ func returnErr(err error, w http.ResponseWriter, errType int) { klog.Errorf("write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) } } + +// TODO(hxc): should use pkg/controller/daemonpodupdater.IsPodUpdatable() +// IsPodUpdatable returns true if a pod is updatable; false otherwise. +func IsPodUpdatable(pod *corev1.Pod) bool { + if &pod.Status == nil || len(pod.Status.Conditions) == 0 { + return false + } + + for i := range pod.Status.Conditions { + if pod.Status.Conditions[i].Type == PodNeedUpgrade && pod.Status.Conditions[i].Status == "true" { + return true + } + } + + return false +} diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 1fe487917f4..74e3a8a29bb 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -36,25 +36,32 @@ func newPod(podName string) *corev1.Pod { GenerateName: podName, Namespace: metav1.NamespaceDefault, }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{}, + }, } pod.Name = podName return pod } -func newPodWithAnnotation(podName string, ready bool) *corev1.Pod { +func newPodWithCondition(podName string, ready bool) *corev1.Pod { pod := newPod(podName) - setPodUpdatableAnnotation(pod, ready) + SetPodUpgradeCondition(pod, ready) return pod } -func setPodUpdatableAnnotation(pod *corev1.Pod, ok bool) { - metav1.SetMetaDataAnnotation(&pod.ObjectMeta, PodUpdatableAnnotation, strconv.FormatBool(ok)) +func SetPodUpgradeCondition(pod *corev1.Pod, ok bool) { + cond := corev1.PodCondition{ + Type: PodNeedUpgrade, + Status: corev1.ConditionStatus(strconv.FormatBool(ok)), + } + pod.Status.Conditions = append(pod.Status.Conditions, cond) } func TestGetPods(t *testing.T) { - updatablePod := newPodWithAnnotation("updatablePod", true) - notUpdatablePod := newPodWithAnnotation("notUpdatablePod", false) + updatablePod := newPodWithCondition("updatablePod", true) + notUpdatablePod := newPodWithCondition("notUpdatablePod", false) normalPod := newPod("normalPod") clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod) @@ -65,13 +72,10 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset).ServeHTTP(rr, req) + GetPods(clientset, "").ServeHTTP(rr, req) expectedCode := http.StatusOK - expectedData := `[{"Namespace":"default","PodName":"normalPod","Updatable":false},{"Namespace":"default","PodName":"notUpdatablePod","Updatable":false},{"Namespace":"default","PodName":"updatablePod","Updatable":true}]` - assert.Equal(t, expectedCode, rr.Code) - assert.Equal(t, expectedData, rr.Body.String()) } func TestUpdatePod(t *testing.T) { @@ -85,21 +89,21 @@ func TestUpdatePod(t *testing.T) { { reqURL: "/openyurt.io/v1/namespaces/default/pods/updatablePod/update", podName: "updatablePod", - pod: newPodWithAnnotation("updatablePod", true), + pod: newPodWithCondition("updatablePod", true), expectedCode: http.StatusOK, expectedData: "", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update", podName: "notUpdatablePod", - pod: newPodWithAnnotation("notUpdatablePod", false), + pod: newPodWithCondition("notUpdatablePod", false), expectedCode: http.StatusForbidden, expectedData: "Pod is not-updatable", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/wrongName/update", podName: "wrongName", - pod: newPodWithAnnotation("trueName", true), + pod: newPodWithCondition("trueName", true), expectedCode: http.StatusInternalServerError, expectedData: "Apply update failed", }, diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index eb064c32c0d..d4ed80fe4a1 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -69,7 +69,7 @@ func NewYurtHubServer(cfg *config.YurtHubConfiguration, klog.Errorf("cannot create the client set: %v", err) return nil, err } - registerHandlers(hubMux, cfg, certificateMgr, clientSet) + registerHandlers(hubMux, cfg, certificateMgr, clientSet, cfg.NodeName) hubServer := &http.Server{ Addr: cfg.YurtHubServerAddr, Handler: hubMux, @@ -157,7 +157,7 @@ func (s *yurtHubServer) Run() { } // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. -func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, clientset *kubernetes.Clientset) { +func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, clientset *kubernetes.Clientset, nodeName string) { // register handlers for update join token c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT") @@ -173,8 +173,8 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/openyurt.io/v1/pods", ota.GetPods(clientset)).Methods("GET") - c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/update", ota.UpdatePod(clientset)).Methods("POST") + c.Handle("/pods", ota.GetPods(clientset, nodeName)).Methods("GET") + c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpdatePod(clientset)).Methods("POST") } // healthz returns ok for healthz request From 5a4677c7b3c2d094eab37d0e6496939d5ff34f80 Mon Sep 17 00:00:00 2001 From: Xuecheng Date: Thu, 15 Sep 2022 07:15:16 -0700 Subject: [PATCH 05/10] add nodeName check before ota update Signed-off-by: Xuecheng --- pkg/yurthub/otaupdate/ota.go | 16 +++++++++++----- pkg/yurthub/otaupdate/ota_test.go | 2 +- pkg/yurthub/server/server.go | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 18d3048e529..3b8f3bcfb96 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -65,13 +65,13 @@ func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { } // UpdatePod update a specifc pod(namespace/podname) to the latest version -func UpdatePod(clientset kubernetes.Interface) http.Handler { +func UpdatePod(clientset kubernetes.Interface, nodeName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) namespace := params["ns"] podName := params["podname"] - err, ok := applyUpdate(clientset, namespace, podName) + err, ok := applyUpdate(clientset, namespace, podName, nodeName) if err != nil { returnErr(fmt.Errorf("Apply update failed"), w, http.StatusInternalServerError) return @@ -85,16 +85,22 @@ func UpdatePod(clientset kubernetes.Interface) http.Handler { } // applyUpdate execute pod update process by deleting pod under OnDelete update strategy -func applyUpdate(clientset kubernetes.Interface, namespace, podName string) (error, bool) { +func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName string) (error, bool) { pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) if err != nil { klog.Errorf("Get pod %v/%v failed, %v", namespace, podName, err) return err, false } - // Pod will not be updated while it's being deleted + // Pod will not be updated when it's being deleted if pod.DeletionTimestamp != nil { - klog.Infof("Pod %v/%v is deleting, can not update", namespace, podName) + klog.Infof("Pod %v/%v is deleting, can not be updated", namespace, podName) + return nil, false + } + + // Pod will not be updated when it's not running on the current node + if pod.Spec.NodeName != nodeName { + klog.Infof("Pod: %v/%v is running on %v, can not be updated", namespace, podName, pod.Spec.NodeName) return nil, false } diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 74e3a8a29bb..d53e4e04a28 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -122,7 +122,7 @@ func TestUpdatePod(t *testing.T) { req = mux.SetURLVars(req, vars) rr := httptest.NewRecorder() - UpdatePod(clientset).ServeHTTP(rr, req) + UpdatePod(clientset, "").ServeHTTP(rr, req) assert.Equal(t, test.expectedCode, rr.Code) assert.Equal(t, test.expectedData, rr.Body.String()) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index d4ed80fe4a1..8f87fa37c15 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -174,7 +174,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica // register handler for ota upgrade c.Handle("/pods", ota.GetPods(clientset, nodeName)).Methods("GET") - c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpdatePod(clientset)).Methods("POST") + c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpdatePod(clientset, nodeName)).Methods("POST") } // healthz returns ok for healthz request From 70107d71ad2aab45967cd3cc5ea33cd410d371e0 Mon Sep 17 00:00:00 2001 From: hxcGit Date: Tue, 27 Sep 2022 16:59:55 +0800 Subject: [PATCH 06/10] forbidden ota upgrade when cloud-edge is disconnected Signed-off-by: hxcGit --- cmd/yurthub/app/start.go | 2 +- pkg/yurthub/otaupdate/ota.go | 50 +++++++++++++++++++------------ pkg/yurthub/otaupdate/ota_test.go | 50 +++++++++++++++++++++++-------- pkg/yurthub/server/server.go | 14 +++++---- 4 files changed, 79 insertions(+), 37 deletions(-) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 13025dfe7f3..3430446ea15 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -195,7 +195,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { cfg.YurtSharedFactory.Start(stopCh) klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr) - s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr) + s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr, healthChecker) if err != nil { return fmt.Errorf("could not create hub server, %w", err) } diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 3b8f3bcfb96..8ed47ee0b02 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -21,22 +21,29 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "github.com/gorilla/mux" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" -) -// TODO(hxc): should use pkg/controller/daemonpodupdater.PodNeedUpgrade -const ( - PodNeedUpgrade corev1.PodConditionType = "PodNeedUpgrade" + "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" ) // GetPods return pod list -func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { +func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, + servers []*url.URL) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Pre-check if edge yurthub node is connected to the cloud + if !isEdgeCloudConnected(checker, servers) { + klog.Errorf("Get pod list is not allowed when edge is disconnected to cloud") + returnErr(fmt.Errorf("Get pod list is not allowed when edge is disconnected to cloud"), + w, http.StatusForbidden) + return + } + podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ FieldSelector: "spec.nodeName=" + nodeName, }) @@ -52,7 +59,8 @@ func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { data, err := json.Marshal(podList) if err != nil { klog.Errorf("Marshal pod list failed: %v", err.Error()) - returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."), w, http.StatusInternalServerError) + returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."), + w, http.StatusInternalServerError) return } @@ -65,8 +73,17 @@ func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { } // UpdatePod update a specifc pod(namespace/podname) to the latest version -func UpdatePod(clientset kubernetes.Interface, nodeName string) http.Handler { +func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, + servers []*url.URL) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Pre-check if edge yurthub node is connected to the cloud + if !isEdgeCloudConnected(checker, servers) { + klog.Errorf("Apply update is not allowed when edge is disconnected to cloud") + returnErr(fmt.Errorf("Apply update is not allowed when edge is disconnected to cloud"), + w, http.StatusForbidden) + return + } + params := mux.Vars(r) namespace := params["ns"] podName := params["podname"] @@ -105,7 +122,7 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st } // Pod will not be updated without pod condition PodNeedUpgrade=true - if !IsPodUpdatable(pod) { + if !daemonpodupdater.IsPodUpdatable(pod) { klog.Infof("Pod: %v/%v is not updatable", namespace, podName) return nil, false } @@ -131,18 +148,13 @@ func returnErr(err error, w http.ResponseWriter, errType int) { } } -// TODO(hxc): should use pkg/controller/daemonpodupdater.IsPodUpdatable() -// IsPodUpdatable returns true if a pod is updatable; false otherwise. -func IsPodUpdatable(pod *corev1.Pod) bool { - if &pod.Status == nil || len(pod.Status.Conditions) == 0 { - return false - } - - for i := range pod.Status.Conditions { - if pod.Status.Conditions[i].Type == PodNeedUpgrade && pod.Status.Conditions[i].Status == "true" { +// isEdgeCloudConnected will check if edge is disconnected to cloud. If there is any remote server is healthy, it is +// regarded as connected. Otherwise, it is regarded as disconnected and return false. +func isEdgeCloudConnected(checker healthchecker.HealthChecker, remoteServers []*url.URL) bool { + for _, server := range remoteServers { + if checker.IsHealthy(server) { return true } } - return false } diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index d53e4e04a28..85ceeaadddc 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -19,7 +19,7 @@ package otaupdate import ( "net/http" "net/http/httptest" - "strconv" + "net/url" "testing" "github.com/gorilla/mux" @@ -27,6 +27,27 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + + "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" +) + +var ( + healthyServers = []*url.URL{ + {Host: "127.0.0.1:18080"}, + } + + unHealthyServers = []*url.URL{ + {Host: "127.0.0.1:18081"}, + } + + healthyFakeChecker = healthchecker.NewFakeChecker(true, map[string]int{ + "http://127.0.0.1:8080": 1, + }) + + unHealthyFakeChecker = healthchecker.NewFakeChecker(false, map[string]int{ + "http://127.0.0.1:8081": 1, + }) ) func newPod(podName string) *corev1.Pod { @@ -44,24 +65,24 @@ func newPod(podName string) *corev1.Pod { return pod } -func newPodWithCondition(podName string, ready bool) *corev1.Pod { +func newPodWithCondition(podName string, ready corev1.ConditionStatus) *corev1.Pod { pod := newPod(podName) SetPodUpgradeCondition(pod, ready) return pod } -func SetPodUpgradeCondition(pod *corev1.Pod, ok bool) { +func SetPodUpgradeCondition(pod *corev1.Pod, ready corev1.ConditionStatus) { cond := corev1.PodCondition{ - Type: PodNeedUpgrade, - Status: corev1.ConditionStatus(strconv.FormatBool(ok)), + Type: daemonpodupdater.PodNeedUpgrade, + Status: ready, } pod.Status.Conditions = append(pod.Status.Conditions, cond) } func TestGetPods(t *testing.T) { - updatablePod := newPodWithCondition("updatablePod", true) - notUpdatablePod := newPodWithCondition("notUpdatablePod", false) + updatablePod := newPodWithCondition("updatablePod", corev1.ConditionTrue) + notUpdatablePod := newPodWithCondition("notUpdatablePod", corev1.ConditionFalse) normalPod := newPod("normalPod") clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod) @@ -72,10 +93,15 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset, "").ServeHTTP(rr, req) + GetPods(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) expectedCode := http.StatusOK assert.Equal(t, expectedCode, rr.Code) + + // Cloud-Edge network disconnected + rr = httptest.NewRecorder() + GetPods(clientset, "", unHealthyFakeChecker, unHealthyServers).ServeHTTP(rr, req) + assert.Equal(t, http.StatusForbidden, rr.Code) } func TestUpdatePod(t *testing.T) { @@ -89,21 +115,21 @@ func TestUpdatePod(t *testing.T) { { reqURL: "/openyurt.io/v1/namespaces/default/pods/updatablePod/update", podName: "updatablePod", - pod: newPodWithCondition("updatablePod", true), + pod: newPodWithCondition("updatablePod", corev1.ConditionTrue), expectedCode: http.StatusOK, expectedData: "", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update", podName: "notUpdatablePod", - pod: newPodWithCondition("notUpdatablePod", false), + pod: newPodWithCondition("notUpdatablePod", corev1.ConditionFalse), expectedCode: http.StatusForbidden, expectedData: "Pod is not-updatable", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/wrongName/update", podName: "wrongName", - pod: newPodWithCondition("trueName", true), + pod: newPodWithCondition("trueName", corev1.ConditionFalse), expectedCode: http.StatusInternalServerError, expectedData: "Apply update failed", }, @@ -122,7 +148,7 @@ func TestUpdatePod(t *testing.T) { req = mux.SetURLVars(req, vars) rr := httptest.NewRecorder() - UpdatePod(clientset, "").ServeHTTP(rr, req) + UpdatePod(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) assert.Equal(t, test.expectedCode, rr.Code) assert.Equal(t, test.expectedData, rr.Body.String()) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 8f87fa37c15..104a1206226 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -37,6 +37,7 @@ import ( "github.com/openyurtio/openyurt/pkg/util/certmanager" certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate" ) @@ -61,7 +62,8 @@ type yurtHubServer struct { func NewYurtHubServer(cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, proxyHandler http.Handler, - rest *rest.RestConfigManager) (Server, error) { + rest *rest.RestConfigManager, + checker healthchecker.HealthChecker) (Server, error) { hubMux := mux.NewRouter() restCfg := rest.GetRestConfig(false) clientSet, err := kubernetes.NewForConfig(restCfg) @@ -69,7 +71,7 @@ func NewYurtHubServer(cfg *config.YurtHubConfiguration, klog.Errorf("cannot create the client set: %v", err) return nil, err } - registerHandlers(hubMux, cfg, certificateMgr, clientSet, cfg.NodeName) + registerHandlers(hubMux, cfg, certificateMgr, clientSet, checker) hubServer := &http.Server{ Addr: cfg.YurtHubServerAddr, Handler: hubMux, @@ -157,7 +159,8 @@ func (s *yurtHubServer) Run() { } // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. -func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, clientset *kubernetes.Clientset, nodeName string) { +func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, + clientset *kubernetes.Clientset, checker healthchecker.HealthChecker) { // register handlers for update join token c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT") @@ -173,8 +176,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.GetPods(clientset, nodeName)).Methods("GET") - c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpdatePod(clientset, nodeName)).Methods("POST") + c.Handle("/pods", ota.GetPods(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("GET") + c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", + ota.UpdatePod(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("POST") } // healthz returns ok for healthz request From 24cc83502ec8c34094169c41c346409efe6f098d Mon Sep 17 00:00:00 2001 From: hxcGit Date: Wed, 28 Sep 2022 21:48:03 +0800 Subject: [PATCH 07/10] adjust response format Signed-off-by: hxcGit --- pkg/yurthub/otaupdate/ota.go | 79 +++++++++++++++++++------------ pkg/yurthub/otaupdate/ota_test.go | 2 +- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 8ed47ee0b02..01ef27a3de7 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -18,14 +18,17 @@ package otaupdate import ( "context" - "encoding/json" "fmt" "net/http" "net/url" "github.com/gorilla/mux" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + runtimescheme "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" @@ -39,8 +42,7 @@ func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchec // Pre-check if edge yurthub node is connected to the cloud if !isEdgeCloudConnected(checker, servers) { klog.Errorf("Get pod list is not allowed when edge is disconnected to cloud") - returnErr(fmt.Errorf("Get pod list is not allowed when edge is disconnected to cloud"), - w, http.StatusForbidden) + WriteErr(w, "Get pod list is not allowed when edge is disconnected to cloud", http.StatusForbidden) return } @@ -49,26 +51,18 @@ func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchec }) if err != nil { klog.Errorf("Get pod list failed, %v", err) - returnErr(fmt.Errorf("Get pods list failed"), w, http.StatusInternalServerError) + WriteErr(w, "Get pod list failed", http.StatusInternalServerError) return } klog.V(5).Infof("Got pod list: %v", podList) - // Successfully get pod list - w.Header().Set("content-type", "text/json") - data, err := json.Marshal(podList) + // Successfully get pod list, response 200 + data, err := encodePodList(podList) if err != nil { - klog.Errorf("Marshal pod list failed: %v", err.Error()) - returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."), - w, http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - n, err := w.Write(data) - if err != nil || n != len(data) { - klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err) + klog.Errorf("Encode pod list failed, %v", err) + WriteErr(w, "Encode pod list failed", http.StatusInternalServerError) } + WriteJSONResponse(w, data) }) } @@ -79,8 +73,7 @@ func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthch // Pre-check if edge yurthub node is connected to the cloud if !isEdgeCloudConnected(checker, servers) { klog.Errorf("Apply update is not allowed when edge is disconnected to cloud") - returnErr(fmt.Errorf("Apply update is not allowed when edge is disconnected to cloud"), - w, http.StatusForbidden) + WriteErr(w, "Apply update is not allowed when edge is disconnected to cloud", http.StatusForbidden) return } @@ -89,15 +82,19 @@ func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthch podName := params["podname"] err, ok := applyUpdate(clientset, namespace, podName, nodeName) + // Pod update failed with error if err != nil { - returnErr(fmt.Errorf("Apply update failed"), w, http.StatusInternalServerError) + WriteErr(w, "Apply update failed", http.StatusInternalServerError) return } + // Pod update is not allowed if !ok { - returnErr(fmt.Errorf("Pod is not-updatable"), w, http.StatusForbidden) + WriteErr(w, "Pod is not-updatable", http.StatusForbidden) return } - w.WriteHeader(http.StatusOK) + + // Successfully apply update, response 200 + WriteJSONResponse(w, []byte(fmt.Sprintf("Start updating pod %q/%q", namespace, podName))) }) } @@ -134,18 +131,14 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st return err, false } - klog.Infof("Update pod: %v/%v success", namespace, podName) + klog.Infof("Start updating pod: %q/%q", namespace, podName) return nil, true } -// returnErr write the given error to response and set response header to the given error type -func returnErr(err error, w http.ResponseWriter, errType int) { - w.WriteHeader(errType) - n := len([]byte(err.Error())) - nw, e := w.Write([]byte(err.Error())) - if e != nil || nw != n { - klog.Errorf("write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) - } +// encodePodList returns the encoded PodList +func encodePodList(podList *corev1.PodList) ([]byte, error) { + codec := scheme.Codecs.LegacyCodec(runtimescheme.GroupVersion{Group: corev1.GroupName, Version: "v1"}) + return runtime.Encode(codec, podList) } // isEdgeCloudConnected will check if edge is disconnected to cloud. If there is any remote server is healthy, it is @@ -158,3 +151,27 @@ func isEdgeCloudConnected(checker healthchecker.HealthChecker, remoteServers []* } return false } + +// WriteErr writes the http status and the error string on the response +func WriteErr(w http.ResponseWriter, errReason string, httpStatus int) { + w.WriteHeader(httpStatus) + n := len([]byte(errReason)) + nw, e := w.Write([]byte(errReason)) + if e != nil || nw != n { + klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", n, nw, e) + } +} + +// Derived from kubelet writeJSONResponse +func WriteJSONResponse(w http.ResponseWriter, data []byte) { + if data == nil { + w.WriteHeader(http.StatusOK) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + n, err := w.Write(data) + if err != nil || n != len(data) { + klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err) + } +} diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 85ceeaadddc..29a6084b3df 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -117,7 +117,7 @@ func TestUpdatePod(t *testing.T) { podName: "updatablePod", pod: newPodWithCondition("updatablePod", corev1.ConditionTrue), expectedCode: http.StatusOK, - expectedData: "", + expectedData: "Start updating pod \"default\"/\"updatablePod\"", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update", From d457d131b0f4d2b33f9dabf66b1e3944a74082ea Mon Sep 17 00:00:00 2001 From: hxcGit Date: Thu, 29 Sep 2022 12:39:16 +0800 Subject: [PATCH 08/10] fix cloud-edge connection judgment Signed-off-by: hxcGit --- cmd/yurthub/app/start.go | 2 +- pkg/yurthub/otaupdate/ota.go | 63 ++++++++++++++----------------- pkg/yurthub/otaupdate/ota_test.go | 31 ++------------- pkg/yurthub/server/server.go | 12 +++--- 4 files changed, 38 insertions(+), 70 deletions(-) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 3430446ea15..13025dfe7f3 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -195,7 +195,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { cfg.YurtSharedFactory.Start(stopCh) klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr) - s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr, healthChecker) + s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr) if err != nil { return fmt.Errorf("could not create hub server, %w", err) } diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 01ef27a3de7..32227738b3e 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "net/url" "github.com/gorilla/mux" corev1 "k8s.io/api/core/v1" @@ -32,22 +31,16 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ) +type OTAHandler func(kubernetes.Interface, string) http.Handler + // GetPods return pod list -func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, - servers []*url.URL) http.Handler { +func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Pre-check if edge yurthub node is connected to the cloud - if !isEdgeCloudConnected(checker, servers) { - klog.Errorf("Get pod list is not allowed when edge is disconnected to cloud") - WriteErr(w, "Get pod list is not allowed when edge is disconnected to cloud", http.StatusForbidden) - return - } - podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ - FieldSelector: "spec.nodeName=" + nodeName, + FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), }) if err != nil { klog.Errorf("Get pod list failed, %v", err) @@ -67,16 +60,8 @@ func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchec } // UpdatePod update a specifc pod(namespace/podname) to the latest version -func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, - servers []*url.URL) http.Handler { +func UpdatePod(clientset kubernetes.Interface, nodeName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Pre-check if edge yurthub node is connected to the cloud - if !isEdgeCloudConnected(checker, servers) { - klog.Errorf("Apply update is not allowed when edge is disconnected to cloud") - WriteErr(w, "Apply update is not allowed when edge is disconnected to cloud", http.StatusForbidden) - return - } - params := mux.Vars(r) namespace := params["ns"] podName := params["podname"] @@ -94,7 +79,7 @@ func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthch } // Successfully apply update, response 200 - WriteJSONResponse(w, []byte(fmt.Sprintf("Start updating pod %q/%q", namespace, podName))) + WriteJSONResponse(w, []byte(fmt.Sprintf("Start updating pod %v/%v", namespace, podName))) }) } @@ -131,7 +116,7 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st return err, false } - klog.Infof("Start updating pod: %q/%q", namespace, podName) + klog.Infof("Start updating pod: %v/%v", namespace, podName) return nil, true } @@ -141,17 +126,6 @@ func encodePodList(podList *corev1.PodList) ([]byte, error) { return runtime.Encode(codec, podList) } -// isEdgeCloudConnected will check if edge is disconnected to cloud. If there is any remote server is healthy, it is -// regarded as connected. Otherwise, it is regarded as disconnected and return false. -func isEdgeCloudConnected(checker healthchecker.HealthChecker, remoteServers []*url.URL) bool { - for _, server := range remoteServers { - if checker.IsHealthy(server) { - return true - } - } - return false -} - // WriteErr writes the http status and the error string on the response func WriteErr(w http.ResponseWriter, errReason string, httpStatus int) { w.WriteHeader(httpStatus) @@ -175,3 +149,24 @@ func WriteJSONResponse(w http.ResponseWriter, data []byte) { klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err) } } + +// HealthyCheck checks if cloud-edge is disconnected before ota update handle, ota update is not allowed when disconnected +func HealthyCheck(rest *rest.RestConfigManager, nodeName string, handler OTAHandler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + restCfg := rest.GetRestConfig(true) + if restCfg == nil { + klog.Infof("Get pod list is not allowed when edge is disconnected to cloud") + WriteErr(w, "OTA update is not allowed when edge is disconnected to cloud", http.StatusForbidden) + return + } + + clientSet, err := kubernetes.NewForConfig(restCfg) + if err != nil { + klog.Errorf("Get client set failed: %v", err) + WriteErr(w, "Get client set failed", http.StatusInternalServerError) + return + } + + handler(clientSet, nodeName).ServeHTTP(w, r) + }) +} diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 29a6084b3df..71ddd0ba8f3 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -19,7 +19,6 @@ package otaupdate import ( "net/http" "net/http/httptest" - "net/url" "testing" "github.com/gorilla/mux" @@ -29,25 +28,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" -) - -var ( - healthyServers = []*url.URL{ - {Host: "127.0.0.1:18080"}, - } - - unHealthyServers = []*url.URL{ - {Host: "127.0.0.1:18081"}, - } - - healthyFakeChecker = healthchecker.NewFakeChecker(true, map[string]int{ - "http://127.0.0.1:8080": 1, - }) - - unHealthyFakeChecker = healthchecker.NewFakeChecker(false, map[string]int{ - "http://127.0.0.1:8081": 1, - }) ) func newPod(podName string) *corev1.Pod { @@ -93,15 +73,10 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) + GetPods(clientset, "").ServeHTTP(rr, req) expectedCode := http.StatusOK assert.Equal(t, expectedCode, rr.Code) - - // Cloud-Edge network disconnected - rr = httptest.NewRecorder() - GetPods(clientset, "", unHealthyFakeChecker, unHealthyServers).ServeHTTP(rr, req) - assert.Equal(t, http.StatusForbidden, rr.Code) } func TestUpdatePod(t *testing.T) { @@ -117,7 +92,7 @@ func TestUpdatePod(t *testing.T) { podName: "updatablePod", pod: newPodWithCondition("updatablePod", corev1.ConditionTrue), expectedCode: http.StatusOK, - expectedData: "Start updating pod \"default\"/\"updatablePod\"", + expectedData: "Start updating pod default/updatablePod", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update", @@ -148,7 +123,7 @@ func TestUpdatePod(t *testing.T) { req = mux.SetURLVars(req, vars) rr := httptest.NewRecorder() - UpdatePod(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) + UpdatePod(clientset, "").ServeHTTP(rr, req) assert.Equal(t, test.expectedCode, rr.Code) assert.Equal(t, test.expectedData, rr.Body.String()) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 104a1206226..7da09cbd23a 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -37,7 +37,6 @@ import ( "github.com/openyurtio/openyurt/pkg/util/certmanager" certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate" ) @@ -62,8 +61,7 @@ type yurtHubServer struct { func NewYurtHubServer(cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, proxyHandler http.Handler, - rest *rest.RestConfigManager, - checker healthchecker.HealthChecker) (Server, error) { + rest *rest.RestConfigManager) (Server, error) { hubMux := mux.NewRouter() restCfg := rest.GetRestConfig(false) clientSet, err := kubernetes.NewForConfig(restCfg) @@ -71,7 +69,7 @@ func NewYurtHubServer(cfg *config.YurtHubConfiguration, klog.Errorf("cannot create the client set: %v", err) return nil, err } - registerHandlers(hubMux, cfg, certificateMgr, clientSet, checker) + registerHandlers(hubMux, cfg, certificateMgr, rest) hubServer := &http.Server{ Addr: cfg.YurtHubServerAddr, Handler: hubMux, @@ -160,7 +158,7 @@ func (s *yurtHubServer) Run() { // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, - clientset *kubernetes.Clientset, checker healthchecker.HealthChecker) { + rest *rest.RestConfigManager) { // register handlers for update join token c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT") @@ -176,9 +174,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.GetPods(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("GET") + c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET") c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", - ota.UpdatePod(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("POST") + ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST") } // healthz returns ok for healthz request From a269988ffd3e940aad585b0c7ed039d272965dd3 Mon Sep 17 00:00:00 2001 From: hxcGit Date: Thu, 29 Sep 2022 15:55:13 +0800 Subject: [PATCH 09/10] use local cache to get pods Signed-off-by: hxcGit --- pkg/yurthub/otaupdate/ota.go | 25 +++++++++++++++++-------- pkg/yurthub/otaupdate/ota_test.go | 25 ++++++++++++++++++++++--- pkg/yurthub/server/server.go | 2 +- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 32227738b3e..01089465616 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -31,26 +31,35 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ) type OTAHandler func(kubernetes.Interface, string) http.Handler // GetPods return pod list -func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { +func GetPods(store cachemanager.StorageWrapper) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ - FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), - }) + objs, err := store.List("kubelet/pods") if err != nil { klog.Errorf("Get pod list failed, %v", err) WriteErr(w, "Get pod list failed", http.StatusInternalServerError) return } - klog.V(5).Infof("Got pod list: %v", podList) + + podList := new(corev1.PodList) + for _, obj := range objs { + pod, ok := obj.(*corev1.Pod) + if !ok { + klog.Errorf("Get pod list failed, %v", err) + WriteErr(w, "Get pod list failed", http.StatusInternalServerError) + return + } + podList.Items = append(podList.Items, *pod) + } // Successfully get pod list, response 200 - data, err := encodePodList(podList) + data, err := encodePods(podList) if err != nil { klog.Errorf("Encode pod list failed, %v", err) WriteErr(w, "Encode pod list failed", http.StatusInternalServerError) @@ -120,8 +129,8 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st return nil, true } -// encodePodList returns the encoded PodList -func encodePodList(podList *corev1.PodList) ([]byte, error) { +// Derived from kubelet encodePods +func encodePods(podList *corev1.PodList) (data []byte, err error) { codec := scheme.Codecs.LegacyCodec(runtimescheme.GroupVersion{Group: corev1.GroupName, Version: "v1"}) return runtime.Encode(codec, podList) } diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 71ddd0ba8f3..25060d4ad32 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -17,6 +17,7 @@ limitations under the License. package otaupdate import ( + "fmt" "net/http" "net/http/httptest" "testing" @@ -28,11 +29,16 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" ) func newPod(podName string) *corev1.Pod { pod := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, ObjectMeta: metav1.ObjectMeta{ GenerateName: podName, Namespace: metav1.NamespaceDefault, @@ -61,11 +67,24 @@ func SetPodUpgradeCondition(pod *corev1.Pod, ready corev1.ConditionStatus) { } func TestGetPods(t *testing.T) { + dir := t.TempDir() + dStorage, err := disk.NewDiskStorage(dir) + if err != nil { + t.Errorf("failed to create disk storage, %v", err) + } + sWrapper := cachemanager.NewStorageWrapper(dStorage) + updatablePod := newPodWithCondition("updatablePod", corev1.ConditionTrue) notUpdatablePod := newPodWithCondition("notUpdatablePod", corev1.ConditionFalse) normalPod := newPod("normalPod") - clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod) + pods := []*corev1.Pod{updatablePod, notUpdatablePod, normalPod} + for _, pod := range pods { + err = sWrapper.Create(fmt.Sprintf("kubelet/pods/default/%s", pod.Name), pod) + if err != nil { + t.Errorf("failed to create obj, %v", err) + } + } req, err := http.NewRequest("GET", "/openyurt.io/v1/pods", nil) if err != nil { @@ -73,7 +92,7 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset, "").ServeHTTP(rr, req) + GetPods(sWrapper).ServeHTTP(rr, req) expectedCode := http.StatusOK assert.Equal(t, expectedCode, rr.Code) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 7da09cbd23a..c086bb8f4a0 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -174,7 +174,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET") + c.Handle("/pods", ota.GetPods(cfg.StorageWrapper)).Methods("GET") c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST") } From 100f17a6e4c04e6b9e361fff9629cc5a1ec337bb Mon Sep 17 00:00:00 2001 From: hxcGit Date: Fri, 30 Sep 2022 11:28:33 +0800 Subject: [PATCH 10/10] add ut for HealthyCheck Signed-off-by: hxcGit --- pkg/yurthub/otaupdate/ota_test.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 25060d4ad32..092d9994425 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "testing" "github.com/gorilla/mux" @@ -28,8 +29,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" ) @@ -149,3 +153,26 @@ func TestUpdatePod(t *testing.T) { } } + +func TestHealthyCheck(t *testing.T) { + u, _ := url.Parse("https://10.10.10.113:6443") + fakeHealthchecker := healthchecker.NewFakeChecker(false, nil) + cfg := &config.YurtHubConfiguration{ + RemoteServers: []*url.URL{u}, + } + + rcm, err := rest.NewRestConfigManager(cfg, nil, fakeHealthchecker) + if err != nil { + t.Fatal(err) + } + + req, err := http.NewRequest("POST", "", nil) + if err != nil { + t.Fatal(err) + } + + rr := httptest.NewRecorder() + + HealthyCheck(rcm, "", UpdatePod).ServeHTTP(rr, req) + assert.Equal(t, http.StatusForbidden, rr.Code) +}