diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index ebb93d56438..e680683bc6a 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -19,6 +19,7 @@ package app import ( "context" "fmt" + "net" "net/url" "time" @@ -27,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" @@ -95,7 +97,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { trace++ klog.Infof("%d. prepare cloud kube clients", trace) - cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServerURL, transportManager) + cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager) if err != nil { return fmt.Errorf("failed to create cloud clients, %w", err) } @@ -151,6 +153,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager var coordinatorGetter func() poolcoordinator.Coordinator = getFakeCoordinator + var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL if cfg.EnableCoordinator { klog.Infof("%d. start to run coordinator", trace) @@ -160,11 +163,12 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { // coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check // if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has // been registered into informer factory. - coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan) + coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, coordinatorServerURLGetter = + coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan) // wait for coordinator informer registry - klog.Infof("waiting for coordinator informer registry") + klog.Info("waiting for coordinator informer registry") <-coordinatorInformerRegistryChan - klog.Infof("coordinator informer registry finished") + klog.Info("coordinator informer registry finished") } // Start the informer factory if all informers have been registered @@ -181,6 +185,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { coordinatorGetter, coordinatorTransportManagerGetter, coordinatorHealthCheckerGetter, + coordinatorServerURLGetter, ctx.Done()) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) @@ -196,13 +201,13 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { return fmt.Errorf("could not run hub servers, %w", err) } <-ctx.Done() - klog.Infof("hub agent exited") + klog.Info("hub agent exited") return nil } -// createClients will create clients for all cloud APIServer and client for pool coordinator -// It will return a map, mapping cloud APIServer URL to its client, and a pool coordinator client -func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) { +// createClients will create clients for all cloud APIServer +// It will return a map, mapping cloud APIServer URL to its client +func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) { cloudClients := make(map[string]kubernetes.Interface) for i := range remoteServers { restConf := &rest.Config{ @@ -220,16 +225,22 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi } // coordinatorRun will initialize and start all coordinator-related components in an async way. -// It returns Getter function for coordinator, coordinator health checker and coordinator transport manager, +// It returns Getter function for coordinator, coordinator health checker, coordinator transport manager and coordinator service url, // which will return the relative component if it has been initialized, otherwise it will return nil. func coordinatorRun(ctx context.Context, cfg *config.YurtHubConfiguration, restConfigMgr *hubrest.RestConfigManager, cloudHealthChecker healthchecker.MultipleBackendsHealthChecker, - coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator) { + coordinatorInformerRegistryChan chan struct{}) ( + func() healthchecker.HealthChecker, + func() transport.Interface, + func() poolcoordinator.Coordinator, + func() *url.URL) { + var coordinatorHealthChecker healthchecker.HealthChecker var coordinatorTransportMgr transport.Interface var coordinator poolcoordinator.Coordinator + var coordinatorServiceUrl *url.URL go func() { coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.ProxiedClient, cfg.SharedFactory) @@ -238,9 +249,50 @@ func coordinatorRun(ctx context.Context, klog.Errorf("coordinator failed to create coordinator cert manager, %v", err) return } - klog.Infof("coordinator new certManager success") + klog.Info("coordinator new certManager success") + + // waiting for service sync complete + if !cache.WaitForCacheSync(ctx.Done(), cfg.SharedFactory.Core().V1().Services().Informer().HasSynced) { + klog.Error("coordinatorRun sync service shutdown") + return + } + klog.Info("coordinatorRun sync service complete") + + // resolve pool-coordinator-apiserver and etcd from domain to ips + serviceList := cfg.SharedFactory.Core().V1().Services().Lister() + // if pool-coordinator-apiserver and pool-coordinator-etcd address is ip, don't need to resolve + apiServerIP := net.ParseIP(cfg.CoordinatorServerURL.Hostname()) + etcdUrl, err := url.Parse(cfg.CoordinatorStorageAddr) + if err != nil { + klog.Errorf("coordinator parse etcd address failed: %+v", err) + return + } + etcdIP := net.ParseIP(etcdUrl.Hostname()) + if apiServerIP == nil { + apiServerService, err := serviceList.Services(util.YurtHubNamespace).Get(cfg.CoordinatorServerURL.Hostname()) + if err != nil { + klog.Errorf("coordinator failed to get apiServer service, %v", err) + return + } + // rewrite coordinator service info for cfg + coordinatorServerURL, err := + url.Parse(fmt.Sprintf("https://%s:%s", apiServerService.Spec.ClusterIP, cfg.CoordinatorServerURL.Port())) + if err != nil { + klog.Errorf("coordinator failed to parse apiServer service, %v", err) + return + } + cfg.CoordinatorServerURL = coordinatorServerURL + } + if etcdIP == nil { + etcdService, err := serviceList.Services(util.YurtHubNamespace).Get(etcdUrl.Hostname()) + if err != nil { + klog.Errorf("coordinator failed to get etcd service, %v", err) + return + } + cfg.CoordinatorStorageAddr = fmt.Sprintf("https://%s:%s", etcdService.Spec.ClusterIP, etcdUrl.Port()) + } - coorTransportMgr, err := poolCoordinatorTransportMgrGetter(cfg.HeartbeatTimeoutSeconds, cfg.CoordinatorServerURL, coorCertManager, ctx.Done()) + coorTransportMgr, err := poolCoordinatorTransportMgrGetter(coorCertManager, ctx.Done()) if err != nil { klog.Errorf("coordinator failed to create coordinator transport manager, %v", err) return @@ -280,6 +332,7 @@ func coordinatorRun(ctx context.Context, coordinatorTransportMgr = coorTransportMgr coordinatorHealthChecker = coorHealthChecker coordinator = coor + coordinatorServiceUrl = cfg.CoordinatorServerURL }() return func() healthchecker.HealthChecker { @@ -288,12 +341,14 @@ func coordinatorRun(ctx context.Context, return coordinatorTransportMgr }, func() poolcoordinator.Coordinator { return coordinator + }, func() *url.URL { + return coordinatorServiceUrl } } -func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) { +func poolCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) { err := wait.PollImmediate(5*time.Second, 4*time.Minute, func() (done bool, err error) { - klog.Infof("waiting for preparing certificates for coordinator client and node lease proxy client") + klog.Info("waiting for preparing certificates for coordinator client and node lease proxy client") if coordinatorCertMgr.GetAPIServerClientCert() == nil { return false, nil } @@ -324,3 +379,7 @@ func getFakeCoordinatorHealthChecker() healthchecker.HealthChecker { func getFakeCoordinatorTransportManager() transport.Interface { return nil } + +func getFakeCoordinatorServerURL() *url.URL { + return nil +} diff --git a/pkg/controller/poolcoordinator/cert/util.go b/pkg/controller/poolcoordinator/cert/util.go index d7c39abc42f..7c158e6a516 100644 --- a/pkg/controller/poolcoordinator/cert/util.go +++ b/pkg/controller/poolcoordinator/cert/util.go @@ -69,7 +69,7 @@ func waitUntilSVCReady(clientSet client.Interface, serviceName string, stopCh <- } // prepare certmanager - ips = ip.ParseIPList(serverSVC.Spec.ClusterIPs) + ips = ip.ParseIPList([]string{serverSVC.Spec.ClusterIP}) dnsnames = serveraddr.GetDefaultDomainsForSvc(PoolcoordinatorNS, serviceName) return ips, dnsnames, nil diff --git a/pkg/controller/poolcoordinator/cert/util_test.go b/pkg/controller/poolcoordinator/cert/util_test.go index ea95c1c7078..87805c7ded9 100644 --- a/pkg/controller/poolcoordinator/cert/util_test.go +++ b/pkg/controller/poolcoordinator/cert/util_test.go @@ -25,6 +25,8 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + + "github.com/openyurtio/openyurt/pkg/util/ip" ) const ( @@ -111,3 +113,27 @@ func TestGetAPIServerSVCURL(t *testing.T) { assert.Equal(t, nil, err) assert.Equal(t, "https://xxxx:644", url) } + +func TestWaitUntilSVCReady(t *testing.T) { + stop := make(chan struct{}) + defer close(stop) + + normalClient := fake.NewSimpleClientset(&corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Namespace: PoolcoordinatorNS, + Name: PoolcoordinatorAPIServerSVC, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "xxxx", + Ports: []corev1.ServicePort{ + { + Port: 644, + }, + }, + }, + }) + ips, _, err := waitUntilSVCReady(normalClient, PoolcoordinatorAPIServerSVC, stop) + assert.Equal(t, nil, err) + expectIPS := ip.ParseIPList([]string{"xxxx"}) + assert.Equal(t, expectIPS, ips) +} diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index 4fd555d5dff..eee8a7f7db3 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -49,15 +49,12 @@ type PoolCoordinatorProxy struct { } func NewPoolCoordinatorProxy( - poolCoordinatorAddr *url.URL, localCacheMgr cachemanager.CacheManager, transportMgrGetter func() transport.Interface, + coordinatorServerURLGetter func() *url.URL, filterMgr *manager.Manager, isCoordinatorReady func() bool, stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) { - if poolCoordinatorAddr == nil { - return nil, fmt.Errorf("pool-coordinator addr cannot be nil") - } pp := &PoolCoordinatorProxy{ localCacheMgr: localCacheMgr, @@ -71,12 +68,18 @@ func NewPoolCoordinatorProxy( for { select { case <-ticker.C: + // waiting for coordinator init finish transportMgr := transportMgrGetter() if transportMgr == nil { break } + coordinatorServerURL := coordinatorServerURLGetter() + if coordinatorServerURL == nil { + break + } + proxy, err := util.NewRemoteProxy( - poolCoordinatorAddr, + coordinatorServerURL, pp.modifyResponse, pp.errorHandler, transportMgr, @@ -87,7 +90,7 @@ func NewPoolCoordinatorProxy( } pp.poolCoordinatorProxy = proxy - klog.Infof("create remote proxy for pool-coordinator success") + klog.Infof("create remote proxy for pool-coordinator success, coordinatorServerURL: %s", coordinatorServerURL.String()) return } } @@ -199,7 +202,7 @@ func (pp *PoolCoordinatorProxy) errorHandler(rw http.ResponseWriter, req *http.R func (pp *PoolCoordinatorProxy) modifyResponse(resp *http.Response) error { if resp == nil || resp.Request == nil { - klog.Infof("no request info in response, skip cache response") + klog.Info("no request info in response, skip cache response") return nil } diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index b9c9896765d..1de3f9a8108 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -21,6 +21,7 @@ import ( "errors" "io" "net/http" + "net/url" "strings" v1 "k8s.io/api/authorization/v1" @@ -72,6 +73,7 @@ func NewYurtReverseProxyHandler( coordinatorGetter func() poolcoordinator.Coordinator, coordinatorTransportMgrGetter func() transport.Interface, coordinatorHealthCheckerGetter func() healthchecker.HealthChecker, + coordinatorServerURLGetter func() *url.URL, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), @@ -122,9 +124,9 @@ func NewYurtReverseProxyHandler( if yurtHubCfg.EnableCoordinator { poolProxy, err = pool.NewPoolCoordinatorProxy( - yurtHubCfg.CoordinatorServerURL, localCacheMgr, coordinatorTransportMgrGetter, + coordinatorServerURLGetter, yurtHubCfg.FilterManager, isCoordinatorReady, stopCh)