From 18b6383a5c4f73dd01f0d514f725746bb6c1ebaa Mon Sep 17 00:00:00 2001 From: rambohe-ch Date: Mon, 15 Jan 2024 17:38:16 +0800 Subject: [PATCH 1/7] intercept kubelet get node request in order to reduce the traffic --- pkg/yurthub/proxy/proxy.go | 6 ++++++ pkg/yurthub/proxy/util/util.go | 12 ++++++++++++ 2 files changed, 18 insertions(+) diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 58dbfa9362b..cce4df2c349 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -191,6 +191,12 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) switch { case util.IsKubeletLeaseReq(req): p.handleKubeletLease(rw, req) + case util.IsKubeletGetNodeReq(req): + if p.localProxy != nil { + p.localProxy.ServeHTTP(rw, req) + } else { + p.loadBalancer.ServeHTTP(rw, req) + } case util.IsEventCreateRequest(req): p.eventHandler(rw, req) case util.IsPoolScopedResouceListWatchRequest(req): diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index ab2ae0d3e7a..ede5bf41f12 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -415,6 +415,18 @@ func IsKubeletLeaseReq(req *http.Request) bool { return true } +// IsKubeletGetNodeReq judge whether the request is a get node request from kubelet +func IsKubeletGetNodeReq(req *http.Request) bool { + ctx := req.Context() + if comp, ok := util.ClientComponentFrom(ctx); !ok || comp != "kubelet" { + return false + } + if info, ok := apirequest.RequestInfoFrom(ctx); !ok || info.Resource != "nodes" || info.Verb != "get" { + return false + } + return true +} + // WriteObject write object to response writer func WriteObject(statusCode int, obj runtime.Object, w http.ResponseWriter, req *http.Request) error { ctx := req.Context() From 7e82a8a2227d84d157e934d18f69ad519dc1a1b3 Mon Sep 17 00:00:00 2001 From: vie-serendipity <2733147505@qq.com> Date: Sat, 18 May 2024 11:19:20 +0800 Subject: [PATCH 2/7] fix: node ready immediate --- pkg/yurthub/proxy/proxy.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index cce4df2c349..df885fbbafd 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -60,6 +60,7 @@ type yurtReverseProxy struct { isCoordinatorReady func() bool workingMode hubutil.WorkingMode enableYurtCoordinator bool + lastConnectionStatus bool } // NewYurtReverseProxyHandler creates a http handler for proxying @@ -192,7 +193,7 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) case util.IsKubeletLeaseReq(req): p.handleKubeletLease(rw, req) case util.IsKubeletGetNodeReq(req): - if p.localProxy != nil { + if p.localProxy != nil && !p.isKubeletFirstReq() { p.localProxy.ServeHTTP(rw, req) } else { p.loadBalancer.ServeHTTP(rw, req) @@ -214,6 +215,18 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) } } +func (p *yurtReverseProxy) isKubeletFirstReq() bool { + if p.cloudHealthChecker.IsHealthy() && !p.lastConnectionStatus { + p.lastConnectionStatus = true + return true + } else if p.cloudHealthChecker.IsHealthy() && p.lastConnectionStatus { + return false + } else { + p.lastConnectionStatus = false + return false + } +} + func (p *yurtReverseProxy) handleKubeletLease(rw http.ResponseWriter, req *http.Request) { p.cloudHealthChecker.RenewKubeletLeaseTime() coordinatorHealtChecker := p.coordinatorHealtCheckerGetter() From 4a62aee75c63ec77543b49654de0c15b1b246ee8 Mon Sep 17 00:00:00 2001 From: vie-serendipity <2733147505@qq.com> Date: Wed, 22 May 2024 13:59:16 +0800 Subject: [PATCH 3/7] fix: cache object returned by watch req --- pkg/yurthub/cachemanager/cache_manager.go | 41 +++++++++++++---------- pkg/yurthub/proxy/util/util.go | 6 +--- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index ac16db535ab..792a38d12b6 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -52,6 +52,7 @@ import ( var ( ErrInMemoryCacheMiss = errors.New("in-memory cache miss") + ErrNotNodeOrLease = errors.New("resource is not node or lease") nonCacheableResources = map[string]struct{}{ "certificatesigningrequests": {}, @@ -233,20 +234,19 @@ func (cm *cacheManager) queryOneObject(req *http.Request) (runtime.Object, error comp, _ := util.ClientComponentFrom(ctx) // query in-memory cache first - var isInMemoryCache = isInMemoryCache(ctx) var isInMemoryCacheMiss bool - if isInMemoryCache { - if obj, err := cm.queryInMemeryCache(info); err != nil { - if err == ErrInMemoryCacheMiss { - isInMemoryCacheMiss = true - klog.V(4).Infof("in-memory cache miss when handling request %s, fall back to storage query", util.ReqString(req)) - } else { - klog.Errorf("cannot query in-memory cache for reqInfo %s, %v,", util.ReqInfoString(info), err) - } + if obj, err := cm.queryInMemeryCache(ctx, info); err != nil { + if err == ErrInMemoryCacheMiss { + isInMemoryCacheMiss = true + klog.V(4).Infof("in-memory cache miss when handling request %s, fall back to storage query", util.ReqString(req)) + } else if err == ErrNotNodeOrLease { + klog.V(4).Infof("resource(%s) is not node or lease, it will be found in the disk not cache", info.Resource) } else { - klog.V(4).Infof("in-memory cache hit when handling request %s", util.ReqString(req)) - return obj, nil + klog.Errorf("cannot query in-memory cache for reqInfo %s, %v,", util.ReqInfoString(info), err) } + } else { + klog.V(4).Infof("in-memory cache hit when handling request %s", util.ReqString(req)) + return obj, nil } // fall back to normal query @@ -275,12 +275,7 @@ func (cm *cacheManager) queryOneObject(req *http.Request) (runtime.Object, error // While cloud-edge network is broken, the inMemoryCache can only be full filled with data from edge cache, // such as local disk and yurt-coordinator. if isInMemoryCacheMiss { - if inMemoryCacheKey, err := inMemoryCacheKeyFunc(info); err != nil { - klog.Errorf("cannot in-memory cache key for req %s, %v", util.ReqString(req), err) - } else { - cm.inMemoryCacheFor(inMemoryCacheKey, obj) - klog.V(4).Infof("use obj from backend storage to update in-memory cache of key %s", inMemoryCacheKey) - } + return obj, cm.updateInMemoryCache(ctx, info, obj) } return obj, nil } @@ -453,6 +448,8 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re case watch.Error: klog.Infof("unable to understand watch event %#v", obj) } + err = cm.updateInMemoryCache(ctx, info, obj) + klog.ErrorS(err, "failed to update cache") } } @@ -606,6 +603,10 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ return err } + return cm.updateInMemoryCache(ctx, info, obj) +} + +func (cm *cacheManager) updateInMemoryCache(ctx context.Context, info *apirequest.RequestInfo, obj runtime.Object) error { // update the in-memory cache with cloud response if !isInMemoryCache(ctx) { return nil @@ -803,7 +804,11 @@ func (cm *cacheManager) DeleteKindFor(gvr schema.GroupVersionResource) error { return cm.restMapperManager.DeleteKindFor(gvr) } -func (cm *cacheManager) queryInMemeryCache(reqInfo *apirequest.RequestInfo) (runtime.Object, error) { +func (cm *cacheManager) queryInMemeryCache(ctx context.Context, reqInfo *apirequest.RequestInfo) (runtime.Object, error) { + if !isInMemoryCache(ctx) { + return nil, ErrNotNodeOrLease + } + key, err := inMemoryCacheKeyFunc(reqInfo) if err != nil { return nil, err diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index ede5bf41f12..51b94ae4339 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -331,7 +331,7 @@ func WithRequestTimeout(handler http.Handler) http.Handler { } } else if info.Verb == "get" { query := req.URL.Query() - if str, _ := query["timeout"]; len(str) > 0 { + if str := query["timeout"]; len(str) > 0 { if t, err := time.ParseDuration(str[0]); err == nil { if t > time.Duration(getAndListTimeoutReduce)*time.Second { timeout = t - time.Duration(getAndListTimeoutReduce)*time.Second @@ -515,10 +515,6 @@ func ReListWatchReq(rw http.ResponseWriter, req *http.Request) { } streamingEncoder := streaming.NewEncoder(framer.NewFrameWriter(rw), streamingSerializer) - if err != nil { - klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error()) - return - } outEvent := &metav1.WatchEvent{ Type: string(watch.Error), From 7bdb75fceb047ef37db2a35d6d0e97e3bf9141cc Mon Sep 17 00:00:00 2001 From: vie-serendipity <2733147505@qq.com> Date: Wed, 22 May 2024 14:21:25 +0800 Subject: [PATCH 4/7] fix: remove first req check --- pkg/yurthub/proxy/proxy.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index df885fbbafd..a7069519f09 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -193,7 +193,7 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) case util.IsKubeletLeaseReq(req): p.handleKubeletLease(rw, req) case util.IsKubeletGetNodeReq(req): - if p.localProxy != nil && !p.isKubeletFirstReq() { + if p.localProxy != nil { p.localProxy.ServeHTTP(rw, req) } else { p.loadBalancer.ServeHTTP(rw, req) @@ -215,18 +215,6 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) } } -func (p *yurtReverseProxy) isKubeletFirstReq() bool { - if p.cloudHealthChecker.IsHealthy() && !p.lastConnectionStatus { - p.lastConnectionStatus = true - return true - } else if p.cloudHealthChecker.IsHealthy() && p.lastConnectionStatus { - return false - } else { - p.lastConnectionStatus = false - return false - } -} - func (p *yurtReverseProxy) handleKubeletLease(rw http.ResponseWriter, req *http.Request) { p.cloudHealthChecker.RenewKubeletLeaseTime() coordinatorHealtChecker := p.coordinatorHealtCheckerGetter() From 48b269ebeee996e45833e5b68b50cd85be6b99fb Mon Sep 17 00:00:00 2001 From: vie-serendipity <2733147505@qq.com> Date: Wed, 22 May 2024 14:23:27 +0800 Subject: [PATCH 5/7] fix: remove lastConnectionStatus --- pkg/yurthub/proxy/proxy.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index a7069519f09..cce4df2c349 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -60,7 +60,6 @@ type yurtReverseProxy struct { isCoordinatorReady func() bool workingMode hubutil.WorkingMode enableYurtCoordinator bool - lastConnectionStatus bool } // NewYurtReverseProxyHandler creates a http handler for proxying From 957197580ff459bc79bc423081125628d4cc4d29 Mon Sep 17 00:00:00 2001 From: vie-serendipity <2733147505@qq.com> Date: Wed, 22 May 2024 15:32:56 +0800 Subject: [PATCH 6/7] fix: only cache object when type is add or modify --- pkg/yurthub/cachemanager/cache_manager.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 792a38d12b6..53df4e59300 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -428,9 +428,13 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re } else { updateObjCnt++ } + errMsg := cm.updateInMemoryCache(ctx, info, obj) + klog.ErrorS(errMsg, "failed to update cache") case watch.Deleted: err = cm.storage.Delete(key) delObjCnt++ + // for now, If it's a delete request, no need to modify the inmemory cache, + // because currently, there shouldn't be any delete requests for nodes or leases. default: // impossible go to here } @@ -448,8 +452,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re case watch.Error: klog.Infof("unable to understand watch event %#v", obj) } - err = cm.updateInMemoryCache(ctx, info, obj) - klog.ErrorS(err, "failed to update cache") } } From a8cc010842c64748f84ee2deb06451acfd8aa620 Mon Sep 17 00:00:00 2001 From: vie-serendipity <2733147505@qq.com> Date: Thu, 23 May 2024 11:34:54 +0800 Subject: [PATCH 7/7] chore: modify log content --- pkg/yurthub/cachemanager/cache_manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 53df4e59300..d4dc99e4cd2 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -429,7 +429,9 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re updateObjCnt++ } errMsg := cm.updateInMemoryCache(ctx, info, obj) - klog.ErrorS(errMsg, "failed to update cache") + if errMsg != nil { + klog.Errorf("failed to update cache, %v", errMsg) + } case watch.Deleted: err = cm.storage.Delete(key) delObjCnt++