diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index ebb93d56438..8c8dcd6a6a6 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" @@ -95,7 +96,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { trace++ klog.Infof("%d. prepare cloud kube clients", trace) - cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServerURL, transportManager) + cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager) if err != nil { return fmt.Errorf("failed to create cloud clients, %w", err) } @@ -151,6 +152,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager var coordinatorGetter func() poolcoordinator.Coordinator = getFakeCoordinator + var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL if cfg.EnableCoordinator { klog.Infof("%d. start to run coordinator", trace) @@ -160,11 +162,12 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { // coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check // if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has // been registered into informer factory. - coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan) + coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, coordinatorServerURLGetter = + coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan) // wait for coordinator informer registry - klog.Infof("waiting for coordinator informer registry") + klog.Info("waiting for coordinator informer registry") <-coordinatorInformerRegistryChan - klog.Infof("coordinator informer registry finished") + klog.Info("coordinator informer registry finished") } // Start the informer factory if all informers have been registered @@ -181,6 +184,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { coordinatorGetter, coordinatorTransportManagerGetter, coordinatorHealthCheckerGetter, + coordinatorServerURLGetter, ctx.Done()) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) @@ -196,13 +200,13 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { return fmt.Errorf("could not run hub servers, %w", err) } <-ctx.Done() - klog.Infof("hub agent exited") + klog.Info("hub agent exited") return nil } -// createClients will create clients for all cloud APIServer and client for pool coordinator -// It will return a map, mapping cloud APIServer URL to its client, and a pool coordinator client -func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) { +// createClients will create clients for all cloud APIServer +// It will return a map, mapping cloud APIServer URL to its client +func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) { cloudClients := make(map[string]kubernetes.Interface) for i := range remoteServers { restConf := &rest.Config{ @@ -226,10 +230,16 @@ func coordinatorRun(ctx context.Context, cfg *config.YurtHubConfiguration, restConfigMgr *hubrest.RestConfigManager, cloudHealthChecker healthchecker.MultipleBackendsHealthChecker, - coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator) { + coordinatorInformerRegistryChan chan struct{}) ( + func() healthchecker.HealthChecker, + func() transport.Interface, + func() poolcoordinator.Coordinator, + func() *url.URL) { + var coordinatorHealthChecker healthchecker.HealthChecker var coordinatorTransportMgr transport.Interface var coordinator poolcoordinator.Coordinator + var coordinatorServiceUrl *url.URL go func() { coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.ProxiedClient, cfg.SharedFactory) @@ -238,9 +248,39 @@ func coordinatorRun(ctx context.Context, klog.Errorf("coordinator failed to create coordinator cert manager, %v", err) return } - klog.Infof("coordinator new certManager success") + klog.Info("coordinator new certManager success") + + // waiting for service sync complete + if !cache.WaitForCacheSync(ctx.Done(), cfg.SharedFactory.Core().V1().Services().Informer().HasSynced) { + klog.Error("coordinatorRun sync service timeout") + return + } + klog.Info("coordinatorRun sync service complete") + + // resolve pool-coordinator-apiserver and etcd from domain to ips + serviceList := cfg.SharedFactory.Core().V1().Services().Lister() + apiServerService, err := serviceList.Services(util.YurtHubNamespace).Get(util.DefaultPoolCoordinatorAPIServerSvcName) + if err != nil { + klog.Errorf("coordinator failed to get apiServer service, %v", err) + return + } + etcdService, err := serviceList.Services(util.YurtHubNamespace).Get(util.DefaultPoolCoordinatorEtcdSvcName) + if err != nil { + klog.Errorf("coordinator failed to get etcd service, %v", err) + return + } - coorTransportMgr, err := poolCoordinatorTransportMgrGetter(cfg.HeartbeatTimeoutSeconds, cfg.CoordinatorServerURL, coorCertManager, ctx.Done()) + // rewrite coordinator service info for cfg + coordinatorServerURL, err := + url.Parse(fmt.Sprintf("https://%s:%s", apiServerService.Spec.ClusterIP, util.DefaultPoolCoordinatorAPIServerSvcPort)) + if err != nil { + klog.Errorf("coordinator failed to parse apiServer service, %v", err) + return + } + cfg.CoordinatorServerURL = coordinatorServerURL + cfg.CoordinatorStorageAddr = fmt.Sprintf("https://%s:%s", etcdService.Spec.ClusterIP, util.DefaultPoolCoordinatorEtcdSvcPort) + + coorTransportMgr, err := poolCoordinatorTransportMgrGetter(coorCertManager, ctx.Done()) if err != nil { klog.Errorf("coordinator failed to create coordinator transport manager, %v", err) return @@ -280,6 +320,7 @@ func coordinatorRun(ctx context.Context, coordinatorTransportMgr = coorTransportMgr coordinatorHealthChecker = coorHealthChecker coordinator = coor + coordinatorServiceUrl = coordinatorServerURL }() return func() healthchecker.HealthChecker { @@ -288,12 +329,14 @@ func coordinatorRun(ctx context.Context, return coordinatorTransportMgr }, func() poolcoordinator.Coordinator { return coordinator + }, func() *url.URL { + return coordinatorServiceUrl } } -func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) { +func poolCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) { err := wait.PollImmediate(5*time.Second, 4*time.Minute, func() (done bool, err error) { - klog.Infof("waiting for preparing certificates for coordinator client and node lease proxy client") + klog.Info("waiting for preparing certificates for coordinator client and node lease proxy client") if coordinatorCertMgr.GetAPIServerClientCert() == nil { return false, nil } @@ -324,3 +367,7 @@ func getFakeCoordinatorHealthChecker() healthchecker.HealthChecker { func getFakeCoordinatorTransportManager() transport.Interface { return nil } + +func getFakeCoordinatorServerURL() *url.URL { + return nil +} diff --git a/config/setup/yurthub.yaml b/config/setup/yurthub.yaml index 743b58203ff..f6fb35da228 100644 --- a/config/setup/yurthub.yaml +++ b/config/setup/yurthub.yaml @@ -6,7 +6,6 @@ metadata: name: yurt-hub namespace: kube-system spec: - dnsPolicy: ClusterFirstWithHostNet volumes: - name: hub-dir hostPath: diff --git a/pkg/controller/poolcoordinator/cert/util.go b/pkg/controller/poolcoordinator/cert/util.go index 26f52d0a444..901a3ac99b3 100644 --- a/pkg/controller/poolcoordinator/cert/util.go +++ b/pkg/controller/poolcoordinator/cert/util.go @@ -69,7 +69,7 @@ func waitUntilSVCReady(clientSet client.Interface, serviceName string, stopCh <- } // prepare certmanager - ips = ip.ParseIPList(serverSVC.Spec.ClusterIPs) + ips = ip.ParseIPList([]string{serverSVC.Spec.ClusterIP}) dnsnames = serveraddr.GetDefaultDomainsForSvc(PoolcoordinatorNS, serviceName) return ips, dnsnames, nil diff --git a/pkg/yurtadm/constants/constants.go b/pkg/yurtadm/constants/constants.go index 151bc5efc41..1b73f798229 100644 --- a/pkg/yurtadm/constants/constants.go +++ b/pkg/yurtadm/constants/constants.go @@ -192,7 +192,6 @@ metadata: name: yurt-hub namespace: kube-system spec: - dnsPolicy: ClusterFirstWithHostNet volumes: - name: hub-dir hostPath: diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index 4fd555d5dff..eee8a7f7db3 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -49,15 +49,12 @@ type PoolCoordinatorProxy struct { } func NewPoolCoordinatorProxy( - poolCoordinatorAddr *url.URL, localCacheMgr cachemanager.CacheManager, transportMgrGetter func() transport.Interface, + coordinatorServerURLGetter func() *url.URL, filterMgr *manager.Manager, isCoordinatorReady func() bool, stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) { - if poolCoordinatorAddr == nil { - return nil, fmt.Errorf("pool-coordinator addr cannot be nil") - } pp := &PoolCoordinatorProxy{ localCacheMgr: localCacheMgr, @@ -71,12 +68,18 @@ func NewPoolCoordinatorProxy( for { select { case <-ticker.C: + // waiting for coordinator init finish transportMgr := transportMgrGetter() if transportMgr == nil { break } + coordinatorServerURL := coordinatorServerURLGetter() + if coordinatorServerURL == nil { + break + } + proxy, err := util.NewRemoteProxy( - poolCoordinatorAddr, + coordinatorServerURL, pp.modifyResponse, pp.errorHandler, transportMgr, @@ -87,7 +90,7 @@ func NewPoolCoordinatorProxy( } pp.poolCoordinatorProxy = proxy - klog.Infof("create remote proxy for pool-coordinator success") + klog.Infof("create remote proxy for pool-coordinator success, coordinatorServerURL: %s", coordinatorServerURL.String()) return } } @@ -199,7 +202,7 @@ func (pp *PoolCoordinatorProxy) errorHandler(rw http.ResponseWriter, req *http.R func (pp *PoolCoordinatorProxy) modifyResponse(resp *http.Response) error { if resp == nil || resp.Request == nil { - klog.Infof("no request info in response, skip cache response") + klog.Info("no request info in response, skip cache response") return nil } diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index b9c9896765d..1de3f9a8108 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -21,6 +21,7 @@ import ( "errors" "io" "net/http" + "net/url" "strings" v1 "k8s.io/api/authorization/v1" @@ -72,6 +73,7 @@ func NewYurtReverseProxyHandler( coordinatorGetter func() poolcoordinator.Coordinator, coordinatorTransportMgrGetter func() transport.Interface, coordinatorHealthCheckerGetter func() healthchecker.HealthChecker, + coordinatorServerURLGetter func() *url.URL, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), @@ -122,9 +124,9 @@ func NewYurtReverseProxyHandler( if yurtHubCfg.EnableCoordinator { poolProxy, err = pool.NewPoolCoordinatorProxy( - yurtHubCfg.CoordinatorServerURL, localCacheMgr, coordinatorTransportMgrGetter, + coordinatorServerURLGetter, yurtHubCfg.FilterManager, isCoordinatorReady, stopCh)