From 0c6489f20031641a33c3869f8d1aa781d5ba0502 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 21 Jun 2023 09:52:13 +0800 Subject: [PATCH] fix yurthub memory leak (#1562) (cherry picked from commit 381b5f6caa06669985a9f72523e7eaa36e8d4426) Co-authored-by: JameKeal <413621396@qq.com> --- pkg/yurthub/poolcoordinator/coordinator.go | 4 +++- pkg/yurthub/storage/etcd/keycache.go | 7 +++++-- pkg/yurthub/storage/etcd/storage.go | 6 ++++++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go index cf85e8be299..194f9933260 100644 --- a/pkg/yurthub/poolcoordinator/coordinator.go +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -288,9 +288,9 @@ func (coordinator *coordinator) Run() { continue } - klog.Infof("coordinator newCloudLeaseClient success.") if err := coordinator.poolCacheSyncManager.EnsureStart(); err != nil { klog.Errorf("failed to sync pool-scoped resource, %v", err) + cancelEtcdStorage() coordinator.statusInfoChan <- electorStatusInfo continue } @@ -299,9 +299,11 @@ func (coordinator *coordinator) Run() { nodeLeaseProxyClient, err := coordinator.newNodeLeaseProxyClient() if err != nil { klog.Errorf("cloud not get cloud lease client when becoming leader yurthub, %v", err) + cancelEtcdStorage() coordinator.statusInfoChan <- electorStatusInfo continue } + klog.Infof("coordinator newCloudLeaseClient success.") coordinator.delegateNodeLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{ FilterFunc: ifDelegateHeartBeat, Handler: cache.ResourceEventHandlerFuncs{ diff --git a/pkg/yurthub/storage/etcd/keycache.go b/pkg/yurthub/storage/etcd/keycache.go index 43357ef7170..28a76515fe3 100644 --- a/pkg/yurthub/storage/etcd/keycache.go +++ b/pkg/yurthub/storage/etcd/keycache.go @@ -108,9 +108,12 @@ func (c *componentKeyCache) Recover() error { func (c *componentKeyCache) getPoolScopedKeyset() (*keyCache, error) { keys := &keyCache{m: make(map[schema.GroupVersionResource]storageKeySet)} - for _, gvr := range c.poolScopedResourcesGetter() { + getFunc := func(key string) (*clientv3.GetResponse, error) { getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) defer cancel() + return c.etcdClient.Get(getCtx, key, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + } + for _, gvr := range c.poolScopedResourcesGetter() { rootKey, err := c.keyFunc(storage.KeyBuildInfo{ Component: coordinatorconstants.DefaultPoolScopedUserAgent, Group: gvr.Group, @@ -120,7 +123,7 @@ func (c *componentKeyCache) getPoolScopedKeyset() (*keyCache, error) { if err != nil { return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err) } - getResp, err := c.etcdClient.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly()) + getResp, err := getFunc(rootKey.Key()) if err != nil { return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err) } diff --git a/pkg/yurthub/storage/etcd/storage.go b/pkg/yurthub/storage/etcd/storage.go index 2897ed31fdb..61fc10a4fb7 100644 --- a/pkg/yurthub/storage/etcd/storage.go +++ b/pkg/yurthub/storage/etcd/storage.go @@ -141,6 +141,9 @@ func NewStorage(ctx context.Context, cfg *EtcdStorageConfig) (storage.Store, err poolScopedResourcesGetter: resources.GetPoolScopeResources, } if err := cache.Recover(); err != nil { + if err := client.Close(); err != nil { + return nil, fmt.Errorf("failed to close etcd client, %v", err) + } return nil, fmt.Errorf("failed to recover component key cache from %s, %v", cacheFilePath, err) } s.localComponentKeyCache = cache @@ -182,6 +185,9 @@ func (s *etcdStorage) clientLifeCycleManagement() { for { select { case <-s.ctx.Done(): + if err := s.client.Close(); err != nil { + klog.Errorf("failed to close etcd client, %v", err) + } klog.Info("etcdstorage lifecycle routine exited") return default: