From 56363590e1e87dd901567c062e25f76526b45a8f Mon Sep 17 00:00:00 2001 From: "lizhixin.lzx" Date: Thu, 9 Feb 2023 14:33:12 +0800 Subject: [PATCH] fix: pool-coordinator ask client to re-list-watch when proxy target change(cloud to pool-coor or pool-coor to cloud) --- pkg/yurthub/proxy/pool/pool.go | 1 + pkg/yurthub/proxy/remote/loadbalancer.go | 13 ++++----- pkg/yurthub/proxy/util/util.go | 36 ++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index 67b512556b8..4fd555d5dff 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -164,6 +164,7 @@ func (pp *PoolCoordinatorProxy) poolWatch(rw http.ResponseWriter, req *http.Requ case <-t.C: if !pp.isCoordinatorReady() { klog.Infof("notified the pool coordinator is not ready for handling request, cancel watch %s", hubutil.ReqString(req)) + util.ReListWatchReq(rw, req) poolServeCancel() return } diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 3d3465b1def..923ab0bd7cf 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -178,10 +178,10 @@ func NewLoadBalancer( algo = &rrLoadBalancerAlgo{backends: backends, checker: healthChecker} } - return &loadBalancer{ - backends: backends, - algo: algo, - }, nil + lb.backends = backends + lb.algo = algo + + return lb, nil } func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { @@ -211,19 +211,18 @@ func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { for { select { case <-t.C: - if lb.coordinatorGetter == nil { - continue - } coordinator := lb.coordinatorGetter() if coordinator == nil { continue } if _, isReady := coordinator.IsReady(); isReady { klog.Infof("notified the pool coordinator is ready, cancel the req %s making it handled by pool coordinator", hubutil.ReqString(req)) + util.ReListWatchReq(rw, req) cloudServeCancel() return } case <-clientReqCtx.Done(): + klog.Infof("watch req %s is canceled by client, when pool coordinator is not ready", hubutil.ReqString(req)) return } } diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index a86486620b0..df577f15ab7 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -19,6 +19,7 @@ package util import ( "context" "fmt" + "mime" "net/http" "strings" "time" @@ -32,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/serviceaccount" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" @@ -490,3 +493,36 @@ func IsEventCreateRequest(req *http.Request) bool { info.Resource == "events" && info.Verb == "create" } + +func ReListWatchReq(rw http.ResponseWriter, req *http.Request) { + agent, _ := util.ClientComponentFrom(req.Context()) + klog.Infof("component %s request urL %s with rv = %s is rejected, expect re-list", + agent, util.ReqString(req), req.URL.Query().Get("resourceVersion")) + + serializerManager := serializer.NewSerializerManager() + mediaType, params, _ := mime.ParseMediaType(runtime.ContentTypeProtobuf) + + _, streamingSerializer, framer, err := serializerManager.WatchEventClientNegotiator.StreamDecoder(mediaType, params) + if err != nil { + klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error()) + return + } + + streamingEncoder := streaming.NewEncoder(framer.NewFrameWriter(rw), streamingSerializer) + if err != nil { + klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error()) + return + } + + outEvent := &metav1.WatchEvent{ + Type: string(watch.Error), + } + + if err := streamingEncoder.Encode(outEvent); err != nil { + klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error()) + return + } + + klog.Infof("this request write error event back finished.") + rw.(http.Flusher).Flush() +}