Skip to content

Commit

Permalink
use local cache to get pods
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <[email protected]>
  • Loading branch information
xavier-hou committed Sep 29, 2022
1 parent d457d13 commit 493ac4f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
27 changes: 19 additions & 8 deletions pkg/yurthub/otaupdate/ota.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/gorilla/mux"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
runtimescheme "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
Expand All @@ -37,20 +39,24 @@ import (
type OTAHandler func(kubernetes.Interface, string) http.Handler

// GetPods return pod list
func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler {
func GetPods(podLister corelisters.PodLister, nodeName string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
})
pods, err := podLister.Pods("").List(labels.Everything())
if err != nil {
klog.Errorf("Get pod list failed, %v", err)
WriteErr(w, "Get pod list failed", http.StatusInternalServerError)
return
}
klog.V(5).Infof("Got pod list: %v", podList)

var otaPods []*corev1.Pod
for _, pod := range pods {
if pod.Spec.NodeName == nodeName {
otaPods = append(otaPods, pod)
}
}

// Successfully get pod list, response 200
data, err := encodePodList(podList)
data, err := encodePods(otaPods)
if err != nil {
klog.Errorf("Encode pod list failed, %v", err)
WriteErr(w, "Encode pod list failed", http.StatusInternalServerError)
Expand Down Expand Up @@ -120,8 +126,13 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st
return nil, true
}

// encodePodList returns the encoded PodList
func encodePodList(podList *corev1.PodList) ([]byte, error) {
// Derived from kubelet encodePods
func encodePods(pods []*corev1.Pod) (data []byte, err error) {
podList := new(corev1.PodList)
for _, pod := range pods {
podList.Items = append(podList.Items, *pod)
}

codec := scheme.Codecs.LegacyCodec(runtimescheme.GroupVersion{Group: corev1.GroupName, Version: "v1"})
return runtime.Encode(codec, podList)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/yurthub/otaupdate/ota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"

"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
Expand Down Expand Up @@ -66,14 +67,19 @@ func TestGetPods(t *testing.T) {
normalPod := newPod("normalPod")

clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod)
podInformer := informers.NewSharedInformerFactory(clientset, 0).Core().V1().Pods()

podInformer.Informer().GetStore().Add(updatablePod)
podInformer.Informer().GetStore().Add(notUpdatablePod)
podInformer.Informer().GetStore().Add(normalPod)

req, err := http.NewRequest("GET", "/openyurt.io/v1/pods", nil)
if err != nil {
t.Fatal(err)
}
rr := httptest.NewRecorder()

GetPods(clientset, "").ServeHTTP(rr, req)
GetPods(podInformer.Lister(), "").ServeHTTP(rr, req)

expectedCode := http.StatusOK
assert.Equal(t, expectedCode, rr.Code)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica
c.Handle("/metrics", promhttp.Handler())

// register handler for ota upgrade
c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET")
c.Handle("/pods", ota.GetPods(cfg.SharedFactory.Core().V1().Pods().Lister(), cfg.NodeName)).Methods("GET")
c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade",
ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST")
}
Expand Down

0 comments on commit 493ac4f

Please sign in to comment.