Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: apiserver watcher refresh shouldn't crash plugin manager #576

Merged
merged 33 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
06b5fd5
do not return error in case of refresh failure in a watcher
jimassa Jul 31, 2024
dae8dbe
set warn
jimassa Jul 31, 2024
faf7eae
refactor apiserver watcher manager to not return error on resolve host
jimassa Aug 6, 2024
657a683
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 6, 2024
744563b
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 7, 2024
2ccd9c0
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 12, 2024
3a87c51
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 12, 2024
8bcd27e
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 14, 2024
1dea567
improve error handling in ApiServerWatcher resolve host function and …
jimassa Aug 15, 2024
306de47
Merge branch 'jmassa/api_watcher_fix' of ssh://github.com/microsoft/r…
jimassa Aug 15, 2024
94ff619
mock kube config to test host name extraction
jimassa Aug 15, 2024
59a86ef
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 15, 2024
3e1a872
fix return in test
jimassa Aug 15, 2024
72626b9
Merge branch 'jmassa/api_watcher_fix' of ssh://github.com/microsoft/r…
jimassa Aug 15, 2024
34d234b
update apiserver tests
jimassa Aug 15, 2024
bf5fd78
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 15, 2024
3649ef6
update apiserver tests
jimassa Aug 16, 2024
80d6b92
use parseURL instead of parse
jimassa Aug 16, 2024
512820a
wrap errors
jimassa Aug 16, 2024
d36be67
fix dynamic error return
jimassa Aug 16, 2024
eea6e63
fmt
jimassa Aug 16, 2024
560838f
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 16, 2024
a6318ad
add retry to lookup host
jimassa Aug 19, 2024
f2a4d37
Merge branch 'jmassa/api_watcher_fix' of ssh://github.com/microsoft/r…
jimassa Aug 19, 2024
ed31762
fix lint errors
jimassa Aug 19, 2024
5f118d2
fix lint
jimassa Aug 19, 2024
fcd542f
fix lint
jimassa Aug 19, 2024
63c67a0
fix lint error
jimassa Aug 19, 2024
b9e7c79
lint: govet
jimassa Aug 19, 2024
6a95bc0
Merge branch 'main' into jmassa/api_watcher_fix
jimassa Aug 19, 2024
7a563c7
use utils.Retry to retry host lookup
jimassa Aug 19, 2024
668c58b
Merge branch 'jmassa/api_watcher_fix' of ssh://github.com/microsoft/r…
jimassa Aug 19, 2024
8f2501f
fix func name typo
jimassa Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/managers/pluginmanager/pluginmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestNewManagerStart(t *testing.T) {

for _, tt := range tests {
mgr, err := NewPluginManager(tt.cfg, tel, api.PluginName(tt.pluginName))
mgr.watcherManager = setupWatcherManagerMock(gomock.NewController(t))
require.Nil(t, err, "Expected nil but got error:%w", err)
require.NotNil(t, mgr, "Expected mgr to be intialized but found nil")
require.Condition(t, assert.Comparison(func() bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/managers/watchermanager/watchermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (wm *WatcherManager) Start(ctx context.Context) error {

for _, w := range wm.Watchers {
if err := w.Init(ctx); err != nil {
wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)))
wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err))
return err
}
wm.wg.Add(1)
Expand Down
24 changes: 20 additions & 4 deletions pkg/managers/watchermanager/watchermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,28 @@ import (
"golang.org/x/sync/errgroup"
)

var errInitFailed = errors.New("init failed")

func TestStopWatcherManagerGracefully(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
log.SetupZapLogger(log.GetDefaultLogOpts())
mgr := NewWatcherManager()

mockAPIServerWatcher := mock.NewMockIWatcher(ctl)
mockEndpointWatcher := mock.NewMockIWatcher(ctl)

mgr.Watchers = []IWatcher{
mockEndpointWatcher,
mockAPIServerWatcher,
}

mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes()
mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes()

mockEndpointWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes()
mockAPIServerWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes()

ctx, _ := context.WithCancel(context.Background())
g, errctx := errgroup.WithContext(ctx)

Expand All @@ -37,17 +53,17 @@ func TestWatcherInitFailsGracefully(t *testing.T) {
defer ctl.Finish()
log.SetupZapLogger(log.GetDefaultLogOpts())

mockApiServerWatcher := mock.NewMockIWatcher(ctl)
mockAPIServerWatcher := mock.NewMockIWatcher(ctl)
mockEndpointWatcher := mock.NewMockIWatcher(ctl)

mgr := NewWatcherManager()
mgr.Watchers = []IWatcher{
mockApiServerWatcher,
mockAPIServerWatcher,
mockEndpointWatcher,
}

mockApiServerWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes()
mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes()
mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes()
mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes()

err := mgr.Start(context.Background())
require.NotNil(t, err, "Expected error when starting watcher manager")
Expand Down
117 changes: 68 additions & 49 deletions pkg/watchers/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package apiserver

import (
"context"
"errors"
"fmt"
"net"
"net/url"
"strings"
Expand All @@ -16,6 +16,7 @@ import (
fm "github.com/microsoft/retina/pkg/managers/filtermanager"
"github.com/microsoft/retina/pkg/pubsub"
"go.uber.org/zap"
"k8s.io/client-go/rest"
kcfg "sigs.k8s.io/controller-runtime/pkg/client/config"
)

Expand All @@ -24,13 +25,14 @@ const (
)

type ApiServerWatcher struct {
isRunning bool
l *log.ZapLogger
current cache
new cache
apiServerURL string
hostResolver IHostResolver
filterManager fm.IFilterManager
isRunning bool
l *log.ZapLogger
current cache
new cache
apiServerHostName string
hostResolver IHostResolver
filterManager fm.IFilterManager
restConfig *rest.Config
}

var a *ApiServerWatcher
Expand All @@ -42,7 +44,6 @@ func Watcher() *ApiServerWatcher {
isRunning: false,
l: log.Logger().Named("apiserver-watcher"),
current: make(cache),
apiServerURL: getHostURL(),
hostResolver: net.DefaultResolver,
}
}
Expand All @@ -56,11 +57,35 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error {
return nil
}

a.filterManager = a.getFilterManager()
// Get filter manager.
if a.filterManager == nil {
return errors.New("failed to initialize filter manager")
var err error
a.filterManager, err = fm.Init(filterManagerRetries)
if err != nil {
a.l.Error("failed to init filter manager", zap.Error(err))
return fmt.Errorf("failed to init filter manager: %w", err)
}
}

// Get kubeconfig.
if a.restConfig == nil {
config, err := kcfg.GetConfig()
if err != nil {
a.l.Error("failed to get kubeconfig", zap.Error(err))
return fmt.Errorf("failed to get kubeconfig: %w", err)
}
a.restConfig = config
}

hostName, err := a.getHostName()
if err != nil {
a.l.Error("failed to get host name", zap.Error(err))
return fmt.Errorf("failed to get host name: %w", err)
}
a.apiServerHostName = hostName

a.isRunning = true

return nil
}

Expand All @@ -77,8 +102,10 @@ func (a *ApiServerWatcher) Stop(ctx context.Context) error {
func (a *ApiServerWatcher) Refresh(ctx context.Context) error {
err := a.initNewCache(ctx)
if err != nil {
a.l.Error("failed to initialize new cache", zap.Error(err))
return err
}

// Compare the new IPs with the old ones.
created, deleted := a.diffCache()

Expand Down Expand Up @@ -120,7 +147,13 @@ func (a *ApiServerWatcher) Refresh(ctx context.Context) error {
}

func (a *ApiServerWatcher) initNewCache(ctx context.Context) error {
ips := a.getApiServerIPs(ctx)
ips, err := a.resolveIPs(ctx, a.apiServerHostName)
if err != nil {
a.l.Error("failed to resolve IPs", zap.Error(err))
return err
}

// Reset new cache.
a.new = make(cache)
for _, ip := range ips {
a.new[ip] = struct{}{}
Expand All @@ -145,39 +178,25 @@ func (a *ApiServerWatcher) diffCache() (created, deleted []interface{}) {
return
}

func (a *ApiServerWatcher) getApiServerIPs(ctx context.Context) []string {
host := a.retrieveApiServerHostname()
ips := a.resolveIPs(ctx, host)
return ips
}

func (a *ApiServerWatcher) retrieveApiServerHostname() string {
parsedURL, err := url.Parse(a.apiServerURL)
if err != nil {
a.l.Warn("failed to parse URL", zap.String("url", a.apiServerURL), zap.Error(err))
return ""
}

host := strings.TrimPrefix(parsedURL.Host, "www.")
if colonIndex := strings.IndexByte(host, ':'); colonIndex != -1 {
host = host[:colonIndex]
}
return host
}

func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) []string {
func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]string, error) {
// perform a DNS lookup for the host URL using the net.DefaultResolver which uses the local resolver.
// Possible errors here are:
// - Canceled context: The context was canceled before the lookup completed.
// -DNS server errors ie NXDOMAIN, SERVFAIL.
// - Network errors ie timeout, unreachable DNS server.
// -Other DNS-related errors encapsulated in a DNSError.
jimassa marked this conversation as resolved.
Show resolved Hide resolved
hostIPs, err := a.hostResolver.LookupHost(ctx, host)
jimassa marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
a.l.Warn("failed to resolve IPs for host", zap.String("host", host), zap.Error(err))
return nil
// We chose not to return this error to the caller because we want to rety the DNS lookup in the next refresh.
// If there is an error that can't be resolved by retrying, we will single it out and return it to the caller.
a.l.Warn("APIServer LookupHost failed", zap.Error(err))
}

if len(hostIPs) == 0 {
a.l.Warn("no IPs found for host", zap.String("host", host))
return nil
a.l.Debug("no IPs found for host", zap.String("host", host))
}

return hostIPs
return hostIPs, nil
}

func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) {
Expand All @@ -194,19 +213,19 @@ func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) {
a.l.Debug("Published event", zap.Any("eventType", eventType), zap.Any("netIPs", ipsToPublish))
}

func getHostURL() string {
config, err := kcfg.GetConfig()
func (a *ApiServerWatcher) getHostName() (string, error) {
// Parse the host URL.
hostURL := a.restConfig.Host
parsedURL, err := url.ParseRequestURI(hostURL)
if err != nil {
log.Logger().Error("failed to get config", zap.Error(err))
return ""
log.Logger().Error("failed to parse URL", zap.String("url", hostURL), zap.Error(err))
return "", fmt.Errorf("failed to parse URL: %w", err)
}
return config.Host
}

func (a *ApiServerWatcher) getFilterManager() *fm.FilterManager {
f, err := fm.Init(filterManagerRetries)
if err != nil {
a.l.Error("failed to init filter manager", zap.Error(err))
// Extract the host name from the URL.
host := strings.TrimPrefix(parsedURL.Host, "www.")
if colonIndex := strings.IndexByte(host, ':'); colonIndex != -1 {
host = host[:colonIndex]
}
return f
return host, nil
}
40 changes: 26 additions & 14 deletions pkg/watchers/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
filtermanagermocks "github.com/microsoft/retina/pkg/managers/filtermanager"
"github.com/microsoft/retina/pkg/watchers/apiserver/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"k8s.io/client-go/rest"
)

func TestGetWatcher(t *testing.T) {
Expand All @@ -42,7 +44,8 @@ func TestAPIServerWatcherStop(t *testing.T) {
a := &ApiServerWatcher{
isRunning: false,
l: log.Logger().Named("apiserver-watcher"),
filtermanager: mockedFilterManager,
filterManager: mockedFilterManager,
restConfig: getMockConfig(true),
}
err := a.Stop(ctx)
assert.NoError(t, err, "Expected no error when stopping a stopped apiserver watcher")
Expand Down Expand Up @@ -71,9 +74,8 @@ func TestRefresh(t *testing.T) {

a := &ApiServerWatcher{
l: log.Logger().Named("apiserver-watcher"),
apiServerUrl: "https://kubernetes.default.svc.cluster.local:443",
hostResolver: mockedResolver,
filtermanager: mockedFilterManager,
filterManager: mockedFilterManager,
}

// Return 2 random IPs for the host everytime LookupHost is called.
Expand Down Expand Up @@ -105,7 +107,6 @@ func TestDiffCache(t *testing.T) {

a := &ApiServerWatcher{
l: log.Logger().Named("apiserver-watcher"),
apiServerUrl: "https://kubernetes.default.svc.cluster.local:443",
hostResolver: mockedResolver,
current: old,
new: new,
Expand All @@ -116,7 +117,7 @@ func TestDiffCache(t *testing.T) {
assert.Equal(t, 1, len(deleted), "Expected 1 deleted host")
}

func TestRefreshError(t *testing.T) {
func TestNoRefreshErrorOnLookupHost(t *testing.T) {
log.SetupZapLogger(log.GetDefaultLogOpts())
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand All @@ -128,17 +129,16 @@ func TestRefreshError(t *testing.T) {

a := &ApiServerWatcher{
l: log.Logger().Named("apiserver-watcher"),
apiServerUrl: "https://kubernetes.default.svc.cluster.local:443",
hostResolver: mockedResolver,
}

mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errors.New("Error")).AnyTimes()

a.Refresh(ctx)
assert.Error(t, a.Refresh(context.Background()), "Expected error when refreshing the cache")
require.NoError(t, a.Refresh(context.Background()), "Expected error when refreshing the cache")
}

func TestResolveIPEmpty(t *testing.T) {
func TestInitWithIncorrectURL(t *testing.T) {
log.SetupZapLogger(log.GetDefaultLogOpts())
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand All @@ -147,19 +147,31 @@ func TestResolveIPEmpty(t *testing.T) {
defer cancel()

mockedResolver := mocks.NewMockIHostResolver(ctrl)
mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl)

a := &ApiServerWatcher{
l: log.Logger().Named("apiserver-watcher"),
apiServerUrl: "https://kubernetes.default.svc.cluster.local:443",
hostResolver: mockedResolver,
l: log.Logger().Named("apiserver-watcher"),
hostResolver: mockedResolver,
restConfig: getMockConfig(false),
filterManager: mockedFilterManager,
}

mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return([]string{}, nil).AnyTimes()

a.Refresh(ctx)
assert.Error(t, a.Refresh(context.Background()), "Expected error when refreshing the cache")
require.Error(t, a.Init(ctx), "Expected error during init")
}

func randomIP() string {
return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256))
}

// Mock function to simulate getting a Kubernetes config
func getMockConfig(isCorrect bool) *rest.Config {
if isCorrect {
return &rest.Config{
Host: "https://kubernetes.default.svc.cluster.local:443",
}
}
return &rest.Config{
Host: "",
}
}
Loading