From 06b5fd55bf6df53985c961457bf25b210d5b50b4 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Wed, 31 Jul 2024 12:21:48 -0400 Subject: [PATCH 01/20] do not return error in case of refresh failure in a watcher --- pkg/managers/watchermanager/watchermanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/managers/watchermanager/watchermanager.go b/pkg/managers/watchermanager/watchermanager.go index af6a69c88e..bae1fed9b9 100644 --- a/pkg/managers/watchermanager/watchermanager.go +++ b/pkg/managers/watchermanager/watchermanager.go @@ -74,7 +74,7 @@ func (wm *WatcherManager) runWatcher(ctx context.Context, w IWatcher) error { err := w.Refresh(ctx) if err != nil { wm.l.Error("refresh failed", zap.Error(err)) - return err + continue } } } From dae8dbec29481620276a97f2226bb589077b877f Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Wed, 31 Jul 2024 13:58:01 -0400 Subject: [PATCH 02/20] set warn --- pkg/managers/watchermanager/watchermanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/managers/watchermanager/watchermanager.go b/pkg/managers/watchermanager/watchermanager.go index bae1fed9b9..cba12f8287 100644 --- a/pkg/managers/watchermanager/watchermanager.go +++ b/pkg/managers/watchermanager/watchermanager.go @@ -73,7 +73,7 @@ func (wm *WatcherManager) runWatcher(ctx context.Context, w IWatcher) error { case <-ticker.C: err := w.Refresh(ctx) if err != nil { - wm.l.Error("refresh failed", zap.Error(err)) + wm.l.Warn("refresh failed", zap.Error(err)) continue } } From faf7eae1affd9cf607dd3a606ca67aeab3ca2ead Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Tue, 6 Aug 2024 08:37:37 -0400 Subject: [PATCH 03/20] refactor apiserver watcher manager to not return error on resolve host --- pkg/managers/watchermanager/watchermanager.go | 4 +- pkg/watchers/apiserver/apiserver.go | 127 +++++++----------- 2 files changed, 51 insertions(+), 80 deletions(-) diff --git a/pkg/managers/watchermanager/watchermanager.go b/pkg/managers/watchermanager/watchermanager.go index cba12f8287..af6a69c88e 100644 --- a/pkg/managers/watchermanager/watchermanager.go +++ b/pkg/managers/watchermanager/watchermanager.go @@ -73,8 +73,8 @@ func (wm *WatcherManager) runWatcher(ctx context.Context, w IWatcher) error { case <-ticker.C: err := w.Refresh(ctx) if err != nil { - wm.l.Warn("refresh failed", zap.Error(err)) - continue + wm.l.Error("refresh failed", zap.Error(err)) + return err } } } diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index c96cd93cab..4d195e2c64 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -5,7 +5,7 @@ package apiserver import ( "context" - "fmt" + "errors" "net" "net/url" "strings" @@ -28,21 +28,21 @@ type ApiServerWatcher struct { l *log.ZapLogger current cache new cache - apiServerUrl string + apiServerURL string hostResolver IHostResolver - filtermanager fm.IFilterManager + filterManager fm.IFilterManager } var a *ApiServerWatcher -// Watcher creates a new apiserver watcher. +// Watcher creates a new ApiServerWatcher instance. func Watcher() *ApiServerWatcher { if a == nil { a = &ApiServerWatcher{ isRunning: false, l: log.Logger().Named("apiserver-watcher"), current: make(cache), - apiServerUrl: getHostURL(), + apiServerURL: getHostURL(), hostResolver: net.DefaultResolver, } } @@ -56,12 +56,15 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error { return nil } - a.filtermanager = getFilterManager() + a.filterManager = a.getFilterManager() + if a.filterManager == nil { + return errors.New("failed to initialize filter manager") + } a.isRunning = true return nil } -// Stop the apiserver watcher. +// Stop stops the ApiServerWatcher. func (a *ApiServerWatcher) Stop(ctx context.Context) error { if !a.isRunning { a.l.Info("apiserver watcher is not running") @@ -76,46 +79,40 @@ func (a *ApiServerWatcher) Refresh(ctx context.Context) error { if err != nil { return err } - // Compare the new ips with the old ones. + // Compare the new IPs with the old ones. created, deleted := a.diffCache() - // Publish the new ips. - createdIps := []net.IP{} - deletedIps := []net.IP{} + createdIPs := []net.IP{} + deletedIPs := []net.IP{} for _, v := range created { - a.l.Info("New Apiserver ips:", zap.Any("ip", v)) + a.l.Info("New Apiserver IPs:", zap.Any("ip", v)) ip := net.ParseIP(v.(string)).To4() - createdIps = append(createdIps, ip) + createdIPs = append(createdIPs, ip) } for _, v := range deleted { - a.l.Info("Deleted Apiserver ips:", zap.Any("ip", v)) + a.l.Info("Deleted Apiserver IPs:", zap.Any("ip", v)) ip := net.ParseIP(v.(string)).To4() - deletedIps = append(deletedIps, ip) + deletedIPs = append(deletedIPs, ip) } - if len(createdIps) > 0 { - // Publish the new ips. - a.publish(createdIps, cc.EventTypeAddAPIServerIPs) - // Add ips to filter manager if any. - err := a.filtermanager.AddIPs(createdIps, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) + if len(createdIPs) > 0 { + a.publish(createdIPs, cc.EventTypeAddAPIServerIPs) + err := a.filterManager.AddIPs(createdIPs, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) if err != nil { - a.l.Error("Failed to add ips to filter manager", zap.Error(err)) + a.l.Error("Failed to add IPs to filter manager", zap.Error(err)) } } - if len(deletedIps) > 0 { - // Publish the deleted ips. - a.publish(deletedIps, cc.EventTypeDeleteAPIServerIPs) - // Delete ips from filter manager if any. - err := a.filtermanager.DeleteIPs(deletedIps, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) + if len(deletedIPs) > 0 { + a.publish(deletedIPs, cc.EventTypeDeleteAPIServerIPs) + err := a.filterManager.DeleteIPs(deletedIPs, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) if err != nil { - a.l.Error("Failed to delete ips from filter manager", zap.Error(err)) + a.l.Error("Failed to delete IPs from filter manager", zap.Error(err)) } } - // update the current cache and reset the new cache a.current = a.new.deepcopy() a.new = nil @@ -123,12 +120,7 @@ func (a *ApiServerWatcher) Refresh(ctx context.Context) error { } func (a *ApiServerWatcher) initNewCache(ctx context.Context) error { - ips, err := a.getApiServerIPs(ctx) - if err != nil { - return err - } - - // Reset the new cache. + ips := a.getApiServerIPs(ctx) a.new = make(cache) for _, ip := range ips { a.new[ip] = struct{}{} @@ -137,14 +129,14 @@ func (a *ApiServerWatcher) initNewCache(ctx context.Context) error { } func (a *ApiServerWatcher) diffCache() (created, deleted []interface{}) { - // check if there are new ips + // Check if there are any new IPs. for k := range a.new { if _, ok := a.current[k]; !ok { created = append(created, k) } } - // check if there are deleted ips + // Check if there are any deleted IPs. for k := range a.current { if _, ok := a.new[k]; !ok { deleted = append(deleted, k) @@ -153,53 +145,39 @@ func (a *ApiServerWatcher) diffCache() (created, deleted []interface{}) { return } -func (a *ApiServerWatcher) getApiServerIPs(ctx context.Context) ([]string, error) { - // Parse the URL - host, err := a.retrieveApiServerHostname() - if err != nil { - return nil, err - } - - // Get the ips for the host - ips, err := a.resolveIPs(ctx, host) - if err != nil { - return nil, err - } - - return ips, nil +func (a *ApiServerWatcher) getApiServerIPs(ctx context.Context) []string { + host := a.retrieveApiServerHostname() + ips := a.resolveIPs(ctx, host) + return ips } -// parse url to extract hostname -func (a *ApiServerWatcher) retrieveApiServerHostname() (string, error) { - // Parse the URL - url, err := url.Parse(a.apiServerUrl) +func (a *ApiServerWatcher) retrieveApiServerHostname() string { + parsedURL, err := url.Parse(a.apiServerURL) if err != nil { - fmt.Println("Failed to parse URL:", err) - return "", err + a.l.Warn("failed to parse URL", zap.String("url", a.apiServerURL), zap.Error(err)) + return "" } - // Remove the scheme (http:// or https://) and port from the host - host := strings.TrimPrefix(url.Host, "www.") - colonIndex := strings.IndexByte(host, ':') - if colonIndex != -1 { + host := strings.TrimPrefix(parsedURL.Host, "www.") + if colonIndex := strings.IndexByte(host, ':'); colonIndex != -1 { host = host[:colonIndex] } - return host, nil + return host } -// Resolve the list of ips for the given host -func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]string, error) { - hostIps, err := a.hostResolver.LookupHost(ctx, host) +func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) []string { + hostIPs, err := a.hostResolver.LookupHost(ctx, host) if err != nil { - return nil, err + a.l.Warn("failed to resolve IPs for host", zap.String("host", host), zap.Error(err)) + return nil } - if len(hostIps) == 0 { - a.l.Error("no ips found for host", zap.String("host", host)) - return nil, fmt.Errorf("no ips found for host %s", host) + if len(hostIPs) == 0 { + a.l.Warn("no IPs found for host", zap.String("host", host)) + return nil } - return hostIps, nil + return hostIPs } func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { @@ -212,16 +190,10 @@ func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { ipsToPublish = append(ipsToPublish, ip.String()) } ps := pubsub.New() - ps.Publish(common.PubSubAPIServer, - cc.NewCacheEvent( - eventType, - common.NewAPIServerObject(ipsToPublish), - ), - ) + ps.Publish(common.PubSubAPIServer, cc.NewCacheEvent(eventType, common.NewAPIServerObject(ipsToPublish))) a.l.Debug("Published event", zap.Any("eventType", eventType), zap.Any("netIPs", ipsToPublish)) } -// getHostURL returns the host url from the config. func getHostURL() string { config, err := kcfg.GetConfig() if err != nil { @@ -231,8 +203,7 @@ func getHostURL() string { return config.Host } -// Get FilterManager -func getFilterManager() *fm.FilterManager { +func (a *ApiServerWatcher) getFilterManager() *fm.FilterManager { f, err := fm.Init(filterManagerRetries) if err != nil { a.l.Error("failed to init filter manager", zap.Error(err)) From 1dea567b48f497ebe6d2ecfc5b9035133e7af044 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Thu, 15 Aug 2024 16:01:53 -0400 Subject: [PATCH 04/20] improve error handling in ApiServerWatcher resolve host function and move getHostname into Init --- pkg/managers/watchermanager/watchermanager.go | 2 +- pkg/watchers/apiserver/apiserver.go | 97 +++++++++++-------- 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/pkg/managers/watchermanager/watchermanager.go b/pkg/managers/watchermanager/watchermanager.go index af6a69c88e..55617db286 100644 --- a/pkg/managers/watchermanager/watchermanager.go +++ b/pkg/managers/watchermanager/watchermanager.go @@ -36,7 +36,7 @@ func (wm *WatcherManager) Start(ctx context.Context) error { for _, w := range wm.Watchers { if err := w.Init(ctx); err != nil { - wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w))) + wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err)) return err } wm.wg.Add(1) diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index 4d195e2c64..08109ce551 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -24,13 +24,13 @@ const ( ) type ApiServerWatcher struct { - isRunning bool - l *log.ZapLogger - current cache - new cache - apiServerURL string - hostResolver IHostResolver - filterManager fm.IFilterManager + isRunning bool + l *log.ZapLogger + current cache + new cache + apiServerHostName string + hostResolver IHostResolver + filterManager fm.IFilterManager } var a *ApiServerWatcher @@ -42,7 +42,6 @@ func Watcher() *ApiServerWatcher { isRunning: false, l: log.Logger().Named("apiserver-watcher"), current: make(cache), - apiServerURL: getHostURL(), hostResolver: net.DefaultResolver, } } @@ -60,7 +59,16 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error { if a.filterManager == nil { return errors.New("failed to initialize filter manager") } + + hostName, error := getHostName() + if error != nil { + a.l.Error("APIServer watcher failed to get host name", zap.Error(error)) + return error + } + a.apiServerHostName = hostName + a.isRunning = true + return nil } @@ -75,10 +83,12 @@ func (a *ApiServerWatcher) Stop(ctx context.Context) error { } func (a *ApiServerWatcher) Refresh(ctx context.Context) error { - err := a.initNewCache(ctx) - if err != nil { - return err + error := a.initNewCache(ctx) + if error != nil { + a.l.Error("failed to initialize new cache", zap.Error(error)) + return error } + // Compare the new IPs with the old ones. created, deleted := a.diffCache() @@ -120,7 +130,13 @@ func (a *ApiServerWatcher) Refresh(ctx context.Context) error { } func (a *ApiServerWatcher) initNewCache(ctx context.Context) error { - ips := a.getApiServerIPs(ctx) + ips, error := a.resolveIPs(ctx, a.apiServerHostName) + if error != nil { + a.l.Error("failed to resolve IPs", zap.Error(error)) + return error + } + + // Reset new cache. a.new = make(cache) for _, ip := range ips { a.new[ip] = struct{}{} @@ -145,39 +161,25 @@ func (a *ApiServerWatcher) diffCache() (created, deleted []interface{}) { return } -func (a *ApiServerWatcher) getApiServerIPs(ctx context.Context) []string { - host := a.retrieveApiServerHostname() - ips := a.resolveIPs(ctx, host) - return ips -} - -func (a *ApiServerWatcher) retrieveApiServerHostname() string { - parsedURL, err := url.Parse(a.apiServerURL) - if err != nil { - a.l.Warn("failed to parse URL", zap.String("url", a.apiServerURL), zap.Error(err)) - return "" - } - - host := strings.TrimPrefix(parsedURL.Host, "www.") - if colonIndex := strings.IndexByte(host, ':'); colonIndex != -1 { - host = host[:colonIndex] - } - return host -} - -func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) []string { +func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]string, error) { + // perform a DNS lookup for the host URL using the net.DefaultResolver which uses the local resolver. + // Possible errors here are: + // - Canceled context: The context was canceled before the lookup completed. + // -DNS server errors ie NXDOMAIN, SERVFAIL. + // - Network errors ie timeout, unreachable DNS server. + // -Other DNS-related errors encapsulated in a DNSError. hostIPs, err := a.hostResolver.LookupHost(ctx, host) if err != nil { - a.l.Warn("failed to resolve IPs for host", zap.String("host", host), zap.Error(err)) - return nil + // We chose not to return this error to the caller because we want to rety the DNS lookup in the next refresh. + // If there is an error that can't be resolved by retrying, we will single it out and return it to the caller. + a.l.Warn("APIServer LookupHost failed", zap.Error(err)) } if len(hostIPs) == 0 { - a.l.Warn("no IPs found for host", zap.String("host", host)) - return nil + a.l.Debug("no IPs found for host", zap.String("host", host)) } - return hostIPs + return hostIPs, nil } func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { @@ -194,13 +196,26 @@ func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { a.l.Debug("Published event", zap.Any("eventType", eventType), zap.Any("netIPs", ipsToPublish)) } -func getHostURL() string { +func getHostName() (string, error) { + // Get the host name from the kubeconfig. config, err := kcfg.GetConfig() if err != nil { log.Logger().Error("failed to get config", zap.Error(err)) - return "" + return "", err + } + // Parse the host URL. + parsedURL, err := url.Parse(config.Host) + if err != nil { + log.Logger().Error("failed to parse URL", zap.String("url", config.Host), zap.Error(err)) + return "", err + } + + // Extract the host name from the URL. + host := strings.TrimPrefix(parsedURL.Host, "www.") + if colonIndex := strings.IndexByte(host, ':'); colonIndex != -1 { + host = host[:colonIndex] } - return config.Host + return host, nil } func (a *ApiServerWatcher) getFilterManager() *fm.FilterManager { From 94ff619769ea27d9d5bec832bcea1afe2b966ede Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Thu, 15 Aug 2024 17:19:56 -0400 Subject: [PATCH 05/20] mock kube config to test host name extraction --- pkg/watchers/apiserver/apiserver.go | 49 ++++++++++++++---------- pkg/watchers/apiserver/apiserver_test.go | 17 +++++--- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index 08109ce551..b127e46ed1 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -16,6 +16,7 @@ import ( fm "github.com/microsoft/retina/pkg/managers/filtermanager" "github.com/microsoft/retina/pkg/pubsub" "go.uber.org/zap" + "k8s.io/client-go/rest" kcfg "sigs.k8s.io/controller-runtime/pkg/client/config" ) @@ -31,6 +32,7 @@ type ApiServerWatcher struct { apiServerHostName string hostResolver IHostResolver filterManager fm.IFilterManager + restConfig *rest.Config } var a *ApiServerWatcher @@ -60,10 +62,20 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error { return errors.New("failed to initialize filter manager") } - hostName, error := getHostName() - if error != nil { - a.l.Error("APIServer watcher failed to get host name", zap.Error(error)) - return error + // Get kubeconfig. + if a.restConfig == nil { + config, err := kcfg.GetConfig() + if err != nil { + a.l.Error("failed to get kubeconfig", zap.Error(err)) + return err + } + a.restConfig = config + } + + hostName, err := a.getHostName() + if err != nil { + a.l.Error("APIServer watcher failed to get host name", zap.Error(err)) + return err } a.apiServerHostName = hostName @@ -83,10 +95,10 @@ func (a *ApiServerWatcher) Stop(ctx context.Context) error { } func (a *ApiServerWatcher) Refresh(ctx context.Context) error { - error := a.initNewCache(ctx) - if error != nil { - a.l.Error("failed to initialize new cache", zap.Error(error)) - return error + err := a.initNewCache(ctx) + if err != nil { + a.l.Error("failed to initialize new cache", zap.Error(err)) + return err } // Compare the new IPs with the old ones. @@ -130,10 +142,10 @@ func (a *ApiServerWatcher) Refresh(ctx context.Context) error { } func (a *ApiServerWatcher) initNewCache(ctx context.Context) error { - ips, error := a.resolveIPs(ctx, a.apiServerHostName) - if error != nil { - a.l.Error("failed to resolve IPs", zap.Error(error)) - return error + ips, err := a.resolveIPs(ctx, a.apiServerHostName) + if err != nil { + a.l.Error("failed to resolve IPs", zap.Error(err)) + return err } // Reset new cache. @@ -196,17 +208,12 @@ func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { a.l.Debug("Published event", zap.Any("eventType", eventType), zap.Any("netIPs", ipsToPublish)) } -func getHostName() (string, error) { - // Get the host name from the kubeconfig. - config, err := kcfg.GetConfig() - if err != nil { - log.Logger().Error("failed to get config", zap.Error(err)) - return "", err - } +func (a *ApiServerWatcher) getHostName() (string, error) { // Parse the host URL. - parsedURL, err := url.Parse(config.Host) + hostURL := a.restConfig.Host + parsedURL, err := url.Parse(hostURL) if err != nil { - log.Logger().Error("failed to parse URL", zap.String("url", config.Host), zap.Error(err)) + log.Logger().Error("failed to parse URL", zap.String("url", hostURL), zap.Error(err)) return "", err } diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 6ae3ac0f3a..2d4b924499 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -17,6 +17,7 @@ import ( "github.com/microsoft/retina/pkg/watchers/apiserver/mocks" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" + "k8s.io/client-go/rest" ) func TestGetWatcher(t *testing.T) { @@ -42,7 +43,8 @@ func TestAPIServerWatcherStop(t *testing.T) { a := &ApiServerWatcher{ isRunning: false, l: log.Logger().Named("apiserver-watcher"), - filtermanager: mockedFilterManager, + filterManager: mockedFilterManager, + restConfig: getMockConfig(), } err := a.Stop(ctx) assert.NoError(t, err, "Expected no error when stopping a stopped apiserver watcher") @@ -71,9 +73,8 @@ func TestRefresh(t *testing.T) { a := &ApiServerWatcher{ l: log.Logger().Named("apiserver-watcher"), - apiServerUrl: "https://kubernetes.default.svc.cluster.local:443", hostResolver: mockedResolver, - filtermanager: mockedFilterManager, + filterManager: mockedFilterManager, } // Return 2 random IPs for the host everytime LookupHost is called. @@ -105,7 +106,6 @@ func TestDiffCache(t *testing.T) { a := &ApiServerWatcher{ l: log.Logger().Named("apiserver-watcher"), - apiServerUrl: "https://kubernetes.default.svc.cluster.local:443", hostResolver: mockedResolver, current: old, new: new, @@ -128,7 +128,6 @@ func TestRefreshError(t *testing.T) { a := &ApiServerWatcher{ l: log.Logger().Named("apiserver-watcher"), - apiServerUrl: "https://kubernetes.default.svc.cluster.local:443", hostResolver: mockedResolver, } @@ -150,7 +149,6 @@ func TestResolveIPEmpty(t *testing.T) { a := &ApiServerWatcher{ l: log.Logger().Named("apiserver-watcher"), - apiServerUrl: "https://kubernetes.default.svc.cluster.local:443", hostResolver: mockedResolver, } @@ -163,3 +161,10 @@ func TestResolveIPEmpty(t *testing.T) { func randomIP() string { return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)) } + +// Mock function to simulate getting a Kubernetes config +func getMockConfig() (*rest.Config, error) { + return &rest.Config{ + Host: "https://kubernetes.default.svc.cluster.local:443", + }, nil +} From 3e1a872656b3543d041801cef3f588a32908ea92 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Thu, 15 Aug 2024 17:42:12 -0400 Subject: [PATCH 06/20] fix return in test --- pkg/watchers/apiserver/apiserver_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 2d4b924499..ef3407ccea 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -163,8 +163,8 @@ func randomIP() string { } // Mock function to simulate getting a Kubernetes config -func getMockConfig() (*rest.Config, error) { +func getMockConfig() *rest.Config { return &rest.Config{ Host: "https://kubernetes.default.svc.cluster.local:443", - }, nil + } } From 34d234bd03d5e660a5366dc095ec1686d5cda2cd Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Thu, 15 Aug 2024 19:34:20 -0400 Subject: [PATCH 07/20] update apiserver tests --- .../watchermanager/watchermanager_test.go | 11 ++++++++++ pkg/watchers/apiserver/apiserver_test.go | 22 ++++++++++++------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/pkg/managers/watchermanager/watchermanager_test.go b/pkg/managers/watchermanager/watchermanager_test.go index 4719d629f5..a09dad27d7 100644 --- a/pkg/managers/watchermanager/watchermanager_test.go +++ b/pkg/managers/watchermanager/watchermanager_test.go @@ -20,6 +20,17 @@ func TestStopWatcherManagerGracefully(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) mgr := NewWatcherManager() + mockApiServerWatcher := mock.NewMockIWatcher(ctl) + mockEndpointWatcher := mock.NewMockIWatcher(ctl) + + mgr.Watchers = []IWatcher{ + mockApiServerWatcher, + mockEndpointWatcher, + } + + mockApiServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() + mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() + ctx, _ := context.WithCancel(context.Background()) g, errctx := errgroup.WithContext(ctx) diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index ef3407ccea..e748acf1ac 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -44,7 +44,7 @@ func TestAPIServerWatcherStop(t *testing.T) { isRunning: false, l: log.Logger().Named("apiserver-watcher"), filterManager: mockedFilterManager, - restConfig: getMockConfig(), + restConfig: getMockConfig(true), } err := a.Stop(ctx) assert.NoError(t, err, "Expected no error when stopping a stopped apiserver watcher") @@ -116,7 +116,7 @@ func TestDiffCache(t *testing.T) { assert.Equal(t, 1, len(deleted), "Expected 1 deleted host") } -func TestRefreshError(t *testing.T) { +func TestNoRefreshErrorOnLookupHost(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -134,10 +134,10 @@ func TestRefreshError(t *testing.T) { mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errors.New("Error")).AnyTimes() a.Refresh(ctx) - assert.Error(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") + assert.NoError(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") } -func TestResolveIPEmpty(t *testing.T) { +func TestInitWithIncorrectURL(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -150,12 +150,13 @@ func TestResolveIPEmpty(t *testing.T) { a := &ApiServerWatcher{ l: log.Logger().Named("apiserver-watcher"), hostResolver: mockedResolver, + restConfig: getMockConfig(false), } mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return([]string{}, nil).AnyTimes() - a.Refresh(ctx) - assert.Error(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") + a.Init(ctx) + assert.Error(t, a.Init(context.Background()), "Expected error during init") } func randomIP() string { @@ -163,8 +164,13 @@ func randomIP() string { } // Mock function to simulate getting a Kubernetes config -func getMockConfig() *rest.Config { +func getMockConfig(isCorrect bool) *rest.Config { + if isCorrect { + return &rest.Config{ + Host: "https://kubernetes.default.svc.cluster.local:443", + } + } return &rest.Config{ - Host: "https://kubernetes.default.svc.cluster.local:443", + Host: "", } } From 3649ef661b5cb7b6f7e658fc1dd222aec760f27d Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Fri, 16 Aug 2024 11:34:51 -0400 Subject: [PATCH 08/20] update apiserver tests --- .../watchermanager/watchermanager_test.go | 6 +++--- pkg/watchers/apiserver/apiserver.go | 17 ++++++----------- pkg/watchers/apiserver/apiserver_test.go | 12 ++++++------ 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/pkg/managers/watchermanager/watchermanager_test.go b/pkg/managers/watchermanager/watchermanager_test.go index a09dad27d7..38b58e09b8 100644 --- a/pkg/managers/watchermanager/watchermanager_test.go +++ b/pkg/managers/watchermanager/watchermanager_test.go @@ -9,6 +9,7 @@ import ( "github.com/microsoft/retina/pkg/log" mock "github.com/microsoft/retina/pkg/managers/watchermanager/mocks" + "github.com/microsoft/retina/pkg/watchers/endpoint" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "golang.org/x/sync/errgroup" @@ -21,15 +22,14 @@ func TestStopWatcherManagerGracefully(t *testing.T) { mgr := NewWatcherManager() mockApiServerWatcher := mock.NewMockIWatcher(ctl) - mockEndpointWatcher := mock.NewMockIWatcher(ctl) mgr.Watchers = []IWatcher{ + endpoint.Watcher(), mockApiServerWatcher, - mockEndpointWatcher, } mockApiServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() - mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() + mockApiServerWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() ctx, _ := context.WithCancel(context.Background()) g, errctx := errgroup.WithContext(ctx) diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index b127e46ed1..c4d07879dc 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -5,7 +5,6 @@ package apiserver import ( "context" - "errors" "net" "net/url" "strings" @@ -57,9 +56,13 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error { return nil } - a.filterManager = a.getFilterManager() + // Get filter manager. if a.filterManager == nil { - return errors.New("failed to initialize filter manager") + a.filterManager, err := fm.Init(filterManagerRetries) + if err != nil { + a.l.Error("failed to init filter manager", zap.Error(err)) + return err + } } // Get kubeconfig. @@ -224,11 +227,3 @@ func (a *ApiServerWatcher) getHostName() (string, error) { } return host, nil } - -func (a *ApiServerWatcher) getFilterManager() *fm.FilterManager { - f, err := fm.Init(filterManagerRetries) - if err != nil { - a.l.Error("failed to init filter manager", zap.Error(err)) - } - return f -} diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index e748acf1ac..6e06061138 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -146,17 +146,17 @@ func TestInitWithIncorrectURL(t *testing.T) { defer cancel() mockedResolver := mocks.NewMockIHostResolver(ctrl) + mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), - hostResolver: mockedResolver, - restConfig: getMockConfig(false), + l: log.Logger().Named("apiserver-watcher"), + hostResolver: mockedResolver, + restConfig: getMockConfig(false), + filterManager: mockedFilterManager, } mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return([]string{}, nil).AnyTimes() - - a.Init(ctx) - assert.Error(t, a.Init(context.Background()), "Expected error during init") + assert.Error(t, a.Init(ctx), "Expected error during init") } func randomIP() string { From 80d6b92f8e239c53d13e62838ce232189f9a8467 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Fri, 16 Aug 2024 14:13:20 -0400 Subject: [PATCH 09/20] use parseURL instead of parse --- pkg/managers/pluginmanager/pluginmanager_test.go | 1 + pkg/watchers/apiserver/apiserver.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/managers/pluginmanager/pluginmanager_test.go b/pkg/managers/pluginmanager/pluginmanager_test.go index fbbbb0dca2..b0beb55cb8 100644 --- a/pkg/managers/pluginmanager/pluginmanager_test.go +++ b/pkg/managers/pluginmanager/pluginmanager_test.go @@ -120,6 +120,7 @@ func TestNewManagerStart(t *testing.T) { for _, tt := range tests { mgr, err := NewPluginManager(tt.cfg, tel, api.PluginName(tt.pluginName)) + mgr.watcherManager = setupWatcherManagerMock(gomock.NewController(t)) require.Nil(t, err, "Expected nil but got error:%w", err) require.NotNil(t, mgr, "Expected mgr to be intialized but found nil") require.Condition(t, assert.Comparison(func() bool { diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index c4d07879dc..c62c805f4f 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -58,7 +58,8 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error { // Get filter manager. if a.filterManager == nil { - a.filterManager, err := fm.Init(filterManagerRetries) + var err error + a.filterManager, err = fm.Init(filterManagerRetries) if err != nil { a.l.Error("failed to init filter manager", zap.Error(err)) return err @@ -214,7 +215,7 @@ func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { func (a *ApiServerWatcher) getHostName() (string, error) { // Parse the host URL. hostURL := a.restConfig.Host - parsedURL, err := url.Parse(hostURL) + parsedURL, err := url.ParseRequestURI(hostURL) if err != nil { log.Logger().Error("failed to parse URL", zap.String("url", hostURL), zap.Error(err)) return "", err From 512820adce6e68818305c89933aaf4f14f4a228c Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Fri, 16 Aug 2024 14:33:17 -0400 Subject: [PATCH 10/20] wrap errors --- pkg/managers/watchermanager/watchermanager_test.go | 14 +++++++------- pkg/watchers/apiserver/apiserver.go | 11 ++++++----- pkg/watchers/apiserver/apiserver_test.go | 5 +++-- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pkg/managers/watchermanager/watchermanager_test.go b/pkg/managers/watchermanager/watchermanager_test.go index 38b58e09b8..9a5b7998bc 100644 --- a/pkg/managers/watchermanager/watchermanager_test.go +++ b/pkg/managers/watchermanager/watchermanager_test.go @@ -21,15 +21,15 @@ func TestStopWatcherManagerGracefully(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) mgr := NewWatcherManager() - mockApiServerWatcher := mock.NewMockIWatcher(ctl) + mockAPIServerWatcher := mock.NewMockIWatcher(ctl) mgr.Watchers = []IWatcher{ endpoint.Watcher(), - mockApiServerWatcher, + mockAPIServerWatcher, } - mockApiServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() - mockApiServerWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() + mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() + mockAPIServerWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() ctx, _ := context.WithCancel(context.Background()) g, errctx := errgroup.WithContext(ctx) @@ -48,16 +48,16 @@ func TestWatcherInitFailsGracefully(t *testing.T) { defer ctl.Finish() log.SetupZapLogger(log.GetDefaultLogOpts()) - mockApiServerWatcher := mock.NewMockIWatcher(ctl) + mockAPIServerWatcher := mock.NewMockIWatcher(ctl) mockEndpointWatcher := mock.NewMockIWatcher(ctl) mgr := NewWatcherManager() mgr.Watchers = []IWatcher{ - mockApiServerWatcher, + mockAPIServerWatcher, mockEndpointWatcher, } - mockApiServerWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes() + mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes() mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes() err := mgr.Start(context.Background()) diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index c62c805f4f..11726c9712 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -5,6 +5,7 @@ package apiserver import ( "context" + "fmt" "net" "net/url" "strings" @@ -62,7 +63,7 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error { a.filterManager, err = fm.Init(filterManagerRetries) if err != nil { a.l.Error("failed to init filter manager", zap.Error(err)) - return err + return fmt.Errorf("failed to init filter manager: %w", err) } } @@ -71,15 +72,15 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error { config, err := kcfg.GetConfig() if err != nil { a.l.Error("failed to get kubeconfig", zap.Error(err)) - return err + return fmt.Errorf("failed to get kubeconfig: %w", err) } a.restConfig = config } hostName, err := a.getHostName() if err != nil { - a.l.Error("APIServer watcher failed to get host name", zap.Error(err)) - return err + a.l.Error("failed to get host name", zap.Error(err)) + return fmt.Errorf("failed to get host name: %w", err) } a.apiServerHostName = hostName @@ -218,7 +219,7 @@ func (a *ApiServerWatcher) getHostName() (string, error) { parsedURL, err := url.ParseRequestURI(hostURL) if err != nil { log.Logger().Error("failed to parse URL", zap.String("url", hostURL), zap.Error(err)) - return "", err + return "", fmt.Errorf("failed to parse URL: %w", err) } // Extract the host name from the URL. diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 6e06061138..9946f159ce 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -16,6 +16,7 @@ import ( filtermanagermocks "github.com/microsoft/retina/pkg/managers/filtermanager" "github.com/microsoft/retina/pkg/watchers/apiserver/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "k8s.io/client-go/rest" ) @@ -134,7 +135,7 @@ func TestNoRefreshErrorOnLookupHost(t *testing.T) { mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errors.New("Error")).AnyTimes() a.Refresh(ctx) - assert.NoError(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") + require.NoError(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") } func TestInitWithIncorrectURL(t *testing.T) { @@ -156,7 +157,7 @@ func TestInitWithIncorrectURL(t *testing.T) { } mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return([]string{}, nil).AnyTimes() - assert.Error(t, a.Init(ctx), "Expected error during init") + require.Error(t, a.Init(ctx), "Expected error during init") } func randomIP() string { From d36be67e35a013537c669469a317cc1957e56a63 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Fri, 16 Aug 2024 14:45:04 -0400 Subject: [PATCH 11/20] fix dynamic error return --- .../watchermanager/watchermanager_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/managers/watchermanager/watchermanager_test.go b/pkg/managers/watchermanager/watchermanager_test.go index 9a5b7998bc..d935ffeb18 100644 --- a/pkg/managers/watchermanager/watchermanager_test.go +++ b/pkg/managers/watchermanager/watchermanager_test.go @@ -9,12 +9,15 @@ import ( "github.com/microsoft/retina/pkg/log" mock "github.com/microsoft/retina/pkg/managers/watchermanager/mocks" - "github.com/microsoft/retina/pkg/watchers/endpoint" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "golang.org/x/sync/errgroup" ) +var ( + errInitFailed = errors.New("init failed") +) + func TestStopWatcherManagerGracefully(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() @@ -22,13 +25,17 @@ func TestStopWatcherManagerGracefully(t *testing.T) { mgr := NewWatcherManager() mockAPIServerWatcher := mock.NewMockIWatcher(ctl) + mockEndpointWatcher := mock.NewMockIWatcher(ctl) mgr.Watchers = []IWatcher{ - endpoint.Watcher(), + mockEndpointWatcher, mockAPIServerWatcher, } mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() + mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() + + mockEndpointWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() mockAPIServerWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() ctx, _ := context.WithCancel(context.Background()) @@ -57,8 +64,8 @@ func TestWatcherInitFailsGracefully(t *testing.T) { mockEndpointWatcher, } - mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes() - mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes() + mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes() + mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes() err := mgr.Start(context.Background()) require.NotNil(t, err, "Expected error when starting watcher manager") From eea6e63795aa84e44aef49d5427064f7f6dae6e5 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Fri, 16 Aug 2024 14:51:43 -0400 Subject: [PATCH 12/20] fmt --- pkg/managers/watchermanager/watchermanager_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/managers/watchermanager/watchermanager_test.go b/pkg/managers/watchermanager/watchermanager_test.go index d935ffeb18..37dcf4809f 100644 --- a/pkg/managers/watchermanager/watchermanager_test.go +++ b/pkg/managers/watchermanager/watchermanager_test.go @@ -14,9 +14,7 @@ import ( "golang.org/x/sync/errgroup" ) -var ( - errInitFailed = errors.New("init failed") -) +var errInitFailed = errors.New("init failed") func TestStopWatcherManagerGracefully(t *testing.T) { ctl := gomock.NewController(t) From a6318add1d479eba7442eefe51b192895e894eec Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 11:56:23 -0400 Subject: [PATCH 13/20] add retry to lookup host --- pkg/watchers/apiserver/apiserver.go | 28 ++++++++++++----- pkg/watchers/apiserver/apiserver_test.go | 40 ++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index 11726c9712..a859afc3fc 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -22,6 +22,7 @@ import ( const ( filterManagerRetries = 3 + hostLookupRetries = 3 ) type ApiServerWatcher struct { @@ -33,6 +34,7 @@ type ApiServerWatcher struct { hostResolver IHostResolver filterManager fm.IFilterManager restConfig *rest.Config + remainingRetries int } var a *ApiServerWatcher @@ -41,10 +43,11 @@ var a *ApiServerWatcher func Watcher() *ApiServerWatcher { if a == nil { a = &ApiServerWatcher{ - isRunning: false, - l: log.Logger().Named("apiserver-watcher"), - current: make(cache), - hostResolver: net.DefaultResolver, + isRunning: false, + l: log.Logger().Named("apiserver-watcher"), + current: make(cache), + hostResolver: net.DefaultResolver, + remainingRetries: hostLookupRetries, } } @@ -187,15 +190,26 @@ func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]strin // -Other DNS-related errors encapsulated in a DNSError. hostIPs, err := a.hostResolver.LookupHost(ctx, host) if err != nil { - // We chose not to return this error to the caller because we want to rety the DNS lookup in the next refresh. - // If there is an error that can't be resolved by retrying, we will single it out and return it to the caller. - a.l.Warn("APIServer LookupHost failed", zap.Error(err)) + // Decrement the remaining retries counter. + a.remainingRetries-- + a.l.Debug("APIServer LookupHost failed", zap.Error(err), zap.Int("remainingRetries", a.remainingRetries)) + + // If the remaining retries counter is zero, return an error. + if a.remainingRetries < 0 { + return nil, fmt.Errorf("failed to lookup host: %w", err) + } + + // do not return an error, instead return nil IPs so that on the next refresh, the lookup is retried. + return nil, nil } if len(hostIPs) == 0 { a.l.Debug("no IPs found for host", zap.String("host", host)) } + // Reset the retry counter. + a.remainingRetries = hostLookupRetries + return hostIPs, nil } diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 9946f159ce..02078e2e91 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -128,8 +128,9 @@ func TestNoRefreshErrorOnLookupHost(t *testing.T) { mockedResolver := mocks.NewMockIHostResolver(ctrl) a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), - hostResolver: mockedResolver, + l: log.Logger().Named("apiserver-watcher"), + hostResolver: mockedResolver, + remainingRetries: 3, } mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errors.New("Error")).AnyTimes() @@ -175,3 +176,38 @@ func getMockConfig(isCorrect bool) *rest.Config { Host: "", } } + +func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { + log.SetupZapLogger(log.GetDefaultLogOpts()) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + mockedResolver := mocks.NewMockIHostResolver(ctrl) + mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) + + a := &ApiServerWatcher{ + l: log.Logger().Named("apiserver-watcher"), + hostResolver: mockedResolver, + filterManager: mockedFilterManager, + remainingRetries: 3, // Set the initial retry count to 3 + } + + // Simulate LookupHost failing for all attempts. + mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("simulated DNS error")).AnyTimes() + + mockedFilterManager.EXPECT().AddIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockedFilterManager.EXPECT().DeleteIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + // Call Refresh three times and expect it to succeed (no error) + for i := 0; i < 3; i++ { + err := a.Refresh(ctx) + require.NoError(t, err, "Expected no error on attempt %d", i+1) + } + + // Call Refresh the fourth time and expect it to fail + err := a.Refresh(ctx) + require.Error(t, err, "Expected error on the fourth attempt") +} From ed31762e262bf747f9781b10ef0f2585e4a9debf Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 13:17:50 -0400 Subject: [PATCH 14/20] fix lint errors --- pkg/watchers/apiserver/apiserver_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 02078e2e91..51bad1345d 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -140,7 +140,8 @@ func TestNoRefreshErrorOnLookupHost(t *testing.T) { } func TestInitWithIncorrectURL(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) + err := log.SetupZapLogger(log.GetDefaultLogOpts()) + require.NoError(t, err, "Expected no error during logger setup") ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -185,6 +186,8 @@ func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() + var simulatedDNSError = errors.New("simulated DNS error") + mockedResolver := mocks.NewMockIHostResolver(ctrl) mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) @@ -196,7 +199,7 @@ func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { } // Simulate LookupHost failing for all attempts. - mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("simulated DNS error")).AnyTimes() + mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, simulatedDNSError).AnyTimes() mockedFilterManager.EXPECT().AddIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockedFilterManager.EXPECT().DeleteIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() From 5f118d28e885795b19689fa4075b91a57450674f Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 13:35:59 -0400 Subject: [PATCH 15/20] fix lint --- pkg/watchers/apiserver/apiserver_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 51bad1345d..e98b752054 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -140,7 +140,7 @@ func TestNoRefreshErrorOnLookupHost(t *testing.T) { } func TestInitWithIncorrectURL(t *testing.T) { - err := log.SetupZapLogger(log.GetDefaultLogOpts()) + _, err := log.SetupZapLogger(log.GetDefaultLogOpts()) require.NoError(t, err, "Expected no error during logger setup") ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -186,7 +186,7 @@ func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - var simulatedDNSError = errors.New("simulated DNS error") + simulatedDNSError := errors.New("simulated DNS error") mockedResolver := mocks.NewMockIHostResolver(ctrl) mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) From fcd542f826e9ff2d02571497fb9edf5cb8d75bd1 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 13:48:34 -0400 Subject: [PATCH 16/20] fix lint --- pkg/watchers/apiserver/apiserver_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index e98b752054..cf69d013cf 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -21,6 +21,8 @@ import ( "k8s.io/client-go/rest" ) +var errDNS = errors.New("DNS error") + func TestGetWatcher(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) @@ -140,8 +142,7 @@ func TestNoRefreshErrorOnLookupHost(t *testing.T) { } func TestInitWithIncorrectURL(t *testing.T) { - _, err := log.SetupZapLogger(log.GetDefaultLogOpts()) - require.NoError(t, err, "Expected no error during logger setup") + log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -179,15 +180,14 @@ func getMockConfig(isCorrect bool) *rest.Config { } func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) + _, err := log.SetupZapLogger(log.GetDefaultLogOpts()) + require.NoError(t, err) ctrl := gomock.NewController(t) defer ctrl.Finish() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - simulatedDNSError := errors.New("simulated DNS error") - mockedResolver := mocks.NewMockIHostResolver(ctrl) mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) @@ -199,7 +199,7 @@ func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { } // Simulate LookupHost failing for all attempts. - mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, simulatedDNSError).AnyTimes() + mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errDNS).AnyTimes() mockedFilterManager.EXPECT().AddIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockedFilterManager.EXPECT().DeleteIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() From 63c67a01b8cbfa444666020cb77743d86312f429 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 14:08:49 -0400 Subject: [PATCH 17/20] fix lint error --- pkg/watchers/apiserver/apiserver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index cf69d013cf..6fde786313 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -211,6 +211,6 @@ func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { } // Call Refresh the fourth time and expect it to fail - err := a.Refresh(ctx) + err = a.Refresh(ctx) require.Error(t, err, "Expected error on the fourth attempt") } From b9e7c79a388eb85a085335995855612be07ab57a Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 14:16:56 -0400 Subject: [PATCH 18/20] lint: govet --- pkg/watchers/apiserver/apiserver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 6fde786313..879b75cb66 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -206,7 +206,7 @@ func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { // Call Refresh three times and expect it to succeed (no error) for i := 0; i < 3; i++ { - err := a.Refresh(ctx) + err = a.Refresh(ctx) require.NoError(t, err, "Expected no error on attempt %d", i+1) } From 7a563c70c7c9dbaa4678eade85f6911561e7108a Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 17:37:22 -0400 Subject: [PATCH 19/20] use utils.Retry to retry host lookup --- pkg/watchers/apiserver/apiserver.go | 40 ++++++++++++------------ pkg/watchers/apiserver/apiserver_test.go | 34 +++++++++----------- 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index a859afc3fc..269049c8f4 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -15,6 +15,7 @@ import ( "github.com/microsoft/retina/pkg/log" fm "github.com/microsoft/retina/pkg/managers/filtermanager" "github.com/microsoft/retina/pkg/pubsub" + "github.com/microsoft/retina/pkg/utils" "go.uber.org/zap" "k8s.io/client-go/rest" kcfg "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -22,7 +23,7 @@ import ( const ( filterManagerRetries = 3 - hostLookupRetries = 3 + hostLookupRetries = 6 // 6 retries for a total of 63 seconds. ) type ApiServerWatcher struct { @@ -34,7 +35,6 @@ type ApiServerWatcher struct { hostResolver IHostResolver filterManager fm.IFilterManager restConfig *rest.Config - remainingRetries int } var a *ApiServerWatcher @@ -43,11 +43,10 @@ var a *ApiServerWatcher func Watcher() *ApiServerWatcher { if a == nil { a = &ApiServerWatcher{ - isRunning: false, - l: log.Logger().Named("apiserver-watcher"), - current: make(cache), - hostResolver: net.DefaultResolver, - remainingRetries: hostLookupRetries, + isRunning: false, + l: log.Logger().Named("apiserver-watcher"), + current: make(cache), + hostResolver: net.DefaultResolver, } } @@ -188,28 +187,29 @@ func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]strin // -DNS server errors ie NXDOMAIN, SERVFAIL. // - Network errors ie timeout, unreachable DNS server. // -Other DNS-related errors encapsulated in a DNSError. - hostIPs, err := a.hostResolver.LookupHost(ctx, host) - if err != nil { - // Decrement the remaining retries counter. - a.remainingRetries-- - a.l.Debug("APIServer LookupHost failed", zap.Error(err), zap.Int("remainingRetries", a.remainingRetries)) + var hostIPs []string + var err error - // If the remaining retries counter is zero, return an error. - if a.remainingRetries < 0 { - return nil, fmt.Errorf("failed to lookup host: %w", err) + retryFunc := func() error { + hostIPs, err = a.hostResolver.LookupHost(ctx, host) + if err != nil { + a.l.Debug("APIServer LookupHost failed", zap.Error(err)) + return err } + return nil + } - // do not return an error, instead return nil IPs so that on the next refresh, the lookup is retried. - return nil, nil + // Retry the lookup for hostIPs in case of failure. + err = utils.Retry(retryFunc, hostLookupRetries) + if err != nil { + a.l.Error("failed to resolve IPs", zap.Error(err)) + return nil, err } if len(hostIPs) == 0 { a.l.Debug("no IPs found for host", zap.String("host", host)) } - // Reset the retry counter. - a.remainingRetries = hostLookupRetries - return hostIPs, nil } diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 879b75cb66..64be5f0a0b 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -119,7 +119,7 @@ func TestDiffCache(t *testing.T) { assert.Equal(t, 1, len(deleted), "Expected 1 deleted host") } -func TestNoRefreshErrorOnLookupHost(t *testing.T) { +func TesRefreshLookUpAlwaysFail(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -130,15 +130,14 @@ func TestNoRefreshErrorOnLookupHost(t *testing.T) { mockedResolver := mocks.NewMockIHostResolver(ctrl) a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), - hostResolver: mockedResolver, - remainingRetries: 3, + l: log.Logger().Named("apiserver-watcher"), + hostResolver: mockedResolver, } mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errors.New("Error")).AnyTimes() a.Refresh(ctx) - require.NoError(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") + require.Error(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") } func TestInitWithIncorrectURL(t *testing.T) { @@ -179,7 +178,7 @@ func getMockConfig(isCorrect bool) *rest.Config { } } -func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { +func TestRefreshFailsFirstFourAttemptsSucceedsOnFifth(t *testing.T) { _, err := log.SetupZapLogger(log.GetDefaultLogOpts()) require.NoError(t, err) ctrl := gomock.NewController(t) @@ -192,25 +191,20 @@ func TestRefreshFailsOnlyOnFourthAttempt(t *testing.T) { mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), - hostResolver: mockedResolver, - filterManager: mockedFilterManager, - remainingRetries: 3, // Set the initial retry count to 3 + l: log.Logger().Named("apiserver-watcher"), + hostResolver: mockedResolver, + filterManager: mockedFilterManager, } - // Simulate LookupHost failing for all attempts. - mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errDNS).AnyTimes() + // Simulate LookupHost failing the first four times and succeeding on the fifth. + gomock.InOrder( + mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errDNS).Times(4), + mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return([]string{"127.0.0.1"}, nil).Times(1), + ) mockedFilterManager.EXPECT().AddIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockedFilterManager.EXPECT().DeleteIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - // Call Refresh three times and expect it to succeed (no error) - for i := 0; i < 3; i++ { - err = a.Refresh(ctx) - require.NoError(t, err, "Expected no error on attempt %d", i+1) - } - - // Call Refresh the fourth time and expect it to fail err = a.Refresh(ctx) - require.Error(t, err, "Expected error on the fourth attempt") + require.NoError(t, err, "Expected no error when refreshing the cache") } From 8f2501f1d89208e0c87e6cd76032f03f9fc0e345 Mon Sep 17 00:00:00 2001 From: Jacques Massa Date: Mon, 19 Aug 2024 17:51:49 -0400 Subject: [PATCH 20/20] fix func name typo --- pkg/watchers/apiserver/apiserver.go | 7 ++----- pkg/watchers/apiserver/apiserver_test.go | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index 269049c8f4..606c2cde74 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -151,8 +151,7 @@ func (a *ApiServerWatcher) Refresh(ctx context.Context) error { func (a *ApiServerWatcher) initNewCache(ctx context.Context) error { ips, err := a.resolveIPs(ctx, a.apiServerHostName) if err != nil { - a.l.Error("failed to resolve IPs", zap.Error(err)) - return err + return fmt.Errorf("failed to resolve IPs: %w", err) } // Reset new cache. @@ -193,8 +192,7 @@ func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]strin retryFunc := func() error { hostIPs, err = a.hostResolver.LookupHost(ctx, host) if err != nil { - a.l.Debug("APIServer LookupHost failed", zap.Error(err)) - return err + return fmt.Errorf("APIServer LookupHost failed: %w", err) } return nil } @@ -202,7 +200,6 @@ func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]strin // Retry the lookup for hostIPs in case of failure. err = utils.Retry(retryFunc, hostLookupRetries) if err != nil { - a.l.Error("failed to resolve IPs", zap.Error(err)) return nil, err } diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 64be5f0a0b..04105e85c1 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -119,7 +119,7 @@ func TestDiffCache(t *testing.T) { assert.Equal(t, 1, len(deleted), "Expected 1 deleted host") } -func TesRefreshLookUpAlwaysFail(t *testing.T) { +func TestRefreshLookUpAlwaysFail(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish()