Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

intercept kubelet get node request in order to reduce the traffic #2039

Merged
merged 7 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

var (
ErrInMemoryCacheMiss = errors.New("in-memory cache miss")
ErrNotNodeOrLease = errors.New("resource is not node or lease")

nonCacheableResources = map[string]struct{}{
"certificatesigningrequests": {},
Expand Down Expand Up @@ -233,20 +234,19 @@

comp, _ := util.ClientComponentFrom(ctx)
// query in-memory cache first
var isInMemoryCache = isInMemoryCache(ctx)
var isInMemoryCacheMiss bool
if isInMemoryCache {
if obj, err := cm.queryInMemeryCache(info); err != nil {
if err == ErrInMemoryCacheMiss {
isInMemoryCacheMiss = true
klog.V(4).Infof("in-memory cache miss when handling request %s, fall back to storage query", util.ReqString(req))
} else {
klog.Errorf("cannot query in-memory cache for reqInfo %s, %v,", util.ReqInfoString(info), err)
}
if obj, err := cm.queryInMemeryCache(ctx, info); err != nil {
if err == ErrInMemoryCacheMiss {

Check warning on line 239 in pkg/yurthub/cachemanager/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/cache_manager.go#L239

Added line #L239 was not covered by tests
isInMemoryCacheMiss = true
klog.V(4).Infof("in-memory cache miss when handling request %s, fall back to storage query", util.ReqString(req))
} else if err == ErrNotNodeOrLease {
klog.V(4).Infof("resource(%s) is not node or lease, it will be found in the disk not cache", info.Resource)

Check warning on line 243 in pkg/yurthub/cachemanager/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/cache_manager.go#L241-L243

Added lines #L241 - L243 were not covered by tests
} else {
klog.V(4).Infof("in-memory cache hit when handling request %s", util.ReqString(req))
return obj, nil
klog.Errorf("cannot query in-memory cache for reqInfo %s, %v,", util.ReqInfoString(info), err)
}
} else {
klog.V(4).Infof("in-memory cache hit when handling request %s", util.ReqString(req))
return obj, nil
}

// fall back to normal query
Expand Down Expand Up @@ -275,12 +275,7 @@
// While cloud-edge network is broken, the inMemoryCache can only be full filled with data from edge cache,
// such as local disk and yurt-coordinator.
if isInMemoryCacheMiss {
if inMemoryCacheKey, err := inMemoryCacheKeyFunc(info); err != nil {
klog.Errorf("cannot in-memory cache key for req %s, %v", util.ReqString(req), err)
} else {
cm.inMemoryCacheFor(inMemoryCacheKey, obj)
klog.V(4).Infof("use obj from backend storage to update in-memory cache of key %s", inMemoryCacheKey)
}
return obj, cm.updateInMemoryCache(ctx, info, obj)
}
return obj, nil
}
Expand Down Expand Up @@ -433,9 +428,15 @@
} else {
updateObjCnt++
}
errMsg := cm.updateInMemoryCache(ctx, info, obj)
if errMsg != nil {
klog.Errorf("failed to update cache, %v", errMsg)

Check warning on line 433 in pkg/yurthub/cachemanager/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/cache_manager.go#L432-L433

Added lines #L432 - L433 were not covered by tests
}
case watch.Deleted:
err = cm.storage.Delete(key)
delObjCnt++
// for now, If it's a delete request, no need to modify the inmemory cache,
// because currently, there shouldn't be any delete requests for nodes or leases.

Check warning on line 439 in pkg/yurthub/cachemanager/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/cache_manager.go#L438-L439

Added lines #L438 - L439 were not covered by tests
default:
// impossible go to here
}
Expand Down Expand Up @@ -606,6 +607,10 @@
return err
}

return cm.updateInMemoryCache(ctx, info, obj)
}

func (cm *cacheManager) updateInMemoryCache(ctx context.Context, info *apirequest.RequestInfo, obj runtime.Object) error {
// update the in-memory cache with cloud response
if !isInMemoryCache(ctx) {
return nil
Expand Down Expand Up @@ -803,7 +808,11 @@
return cm.restMapperManager.DeleteKindFor(gvr)
}

func (cm *cacheManager) queryInMemeryCache(reqInfo *apirequest.RequestInfo) (runtime.Object, error) {
func (cm *cacheManager) queryInMemeryCache(ctx context.Context, reqInfo *apirequest.RequestInfo) (runtime.Object, error) {
if !isInMemoryCache(ctx) {
return nil, ErrNotNodeOrLease
}

key, err := inMemoryCacheKeyFunc(reqInfo)
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request)
switch {
case util.IsKubeletLeaseReq(req):
p.handleKubeletLease(rw, req)
case util.IsKubeletGetNodeReq(req):
if p.localProxy != nil {
p.localProxy.ServeHTTP(rw, req)
} else {
p.loadBalancer.ServeHTTP(rw, req)
}
case util.IsEventCreateRequest(req):
p.eventHandler(rw, req)
case util.IsPoolScopedResouceListWatchRequest(req):
Expand Down
18 changes: 13 additions & 5 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@
}
} else if info.Verb == "get" {
query := req.URL.Query()
if str, _ := query["timeout"]; len(str) > 0 {
if str := query["timeout"]; len(str) > 0 {
if t, err := time.ParseDuration(str[0]); err == nil {
if t > time.Duration(getAndListTimeoutReduce)*time.Second {
timeout = t - time.Duration(getAndListTimeoutReduce)*time.Second
Expand Down Expand Up @@ -415,6 +415,18 @@
return true
}

// IsKubeletGetNodeReq judge whether the request is a get node request from kubelet
func IsKubeletGetNodeReq(req *http.Request) bool {
ctx := req.Context()
if comp, ok := util.ClientComponentFrom(ctx); !ok || comp != "kubelet" {
return false

Check warning on line 422 in pkg/yurthub/proxy/util/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/proxy/util/util.go#L419-L422

Added lines #L419 - L422 were not covered by tests
}
if info, ok := apirequest.RequestInfoFrom(ctx); !ok || info.Resource != "nodes" || info.Verb != "get" {
return false

Check warning on line 425 in pkg/yurthub/proxy/util/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/proxy/util/util.go#L424-L425

Added lines #L424 - L425 were not covered by tests
}
return true

Check warning on line 427 in pkg/yurthub/proxy/util/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/proxy/util/util.go#L427

Added line #L427 was not covered by tests
}

// WriteObject write object to response writer
func WriteObject(statusCode int, obj runtime.Object, w http.ResponseWriter, req *http.Request) error {
ctx := req.Context()
Expand Down Expand Up @@ -503,10 +515,6 @@
}

streamingEncoder := streaming.NewEncoder(framer.NewFrameWriter(rw), streamingSerializer)
if err != nil {
klog.Errorf("ReListWatchReq %s failed with error = %s", util.ReqString(req), err.Error())
return
}

outEvent := &metav1.WatchEvent{
Type: string(watch.Error),
Expand Down
Loading