Skip to content

Commit

Permalink
update pool-coordinator from domain to service ip
Browse files Browse the repository at this point in the history
  • Loading branch information
JameKeal committed Mar 31, 2023
1 parent 6972481 commit d9dcae9
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 24 deletions.
73 changes: 60 additions & 13 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
Expand Down Expand Up @@ -95,7 +96,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
trace++

klog.Infof("%d. prepare cloud kube clients", trace)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServerURL, transportManager)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager)
if err != nil {
return fmt.Errorf("failed to create cloud clients, %w", err)
}
Expand Down Expand Up @@ -151,6 +152,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager
var coordinatorGetter func() poolcoordinator.Coordinator = getFakeCoordinator
var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL

if cfg.EnableCoordinator {
klog.Infof("%d. start to run coordinator", trace)
Expand All @@ -160,11 +162,12 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has
// been registered into informer factory.
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, coordinatorServerURLGetter =
coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
// wait for coordinator informer registry
klog.Infof("waiting for coordinator informer registry")
klog.Info("waiting for coordinator informer registry")
<-coordinatorInformerRegistryChan
klog.Infof("coordinator informer registry finished")
klog.Info("coordinator informer registry finished")
}

// Start the informer factory if all informers have been registered
Expand All @@ -181,6 +184,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
coordinatorGetter,
coordinatorTransportManagerGetter,
coordinatorHealthCheckerGetter,
coordinatorServerURLGetter,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
Expand All @@ -196,13 +200,13 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
return fmt.Errorf("could not run hub servers, %w", err)
}
<-ctx.Done()
klog.Infof("hub agent exited")
klog.Info("hub agent exited")
return nil
}

// createClients will create clients for all cloud APIServer and client for pool coordinator
// It will return a map, mapping cloud APIServer URL to its client, and a pool coordinator client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) {
// createClients will create clients for all cloud APIServer
// It will return a map, mapping cloud APIServer URL to its client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) {
cloudClients := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Expand All @@ -226,10 +230,16 @@ func coordinatorRun(ctx context.Context,
cfg *config.YurtHubConfiguration,
restConfigMgr *hubrest.RestConfigManager,
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker,
coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator) {
coordinatorInformerRegistryChan chan struct{}) (
func() healthchecker.HealthChecker,
func() transport.Interface,
func() poolcoordinator.Coordinator,
func() *url.URL) {

var coordinatorHealthChecker healthchecker.HealthChecker
var coordinatorTransportMgr transport.Interface
var coordinator poolcoordinator.Coordinator
var coordinatorServiceUrl *url.URL

go func() {
coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.ProxiedClient, cfg.SharedFactory)
Expand All @@ -238,9 +248,39 @@ func coordinatorRun(ctx context.Context,
klog.Errorf("coordinator failed to create coordinator cert manager, %v", err)
return
}
klog.Infof("coordinator new certManager success")
klog.Info("coordinator new certManager success")

// waiting for service sync complete
if !cache.WaitForCacheSync(ctx.Done(), cfg.SharedFactory.Core().V1().Services().Informer().HasSynced) {
klog.Error("coordinatorRun sync service timeout")
return
}
klog.Info("coordinatorRun sync service complete")

// resolve pool-coordinator-apiserver and etcd from domain to ips
serviceList := cfg.SharedFactory.Core().V1().Services().Lister()
apiServerService, err := serviceList.Services(util.YurtHubNamespace).Get(util.DefaultPoolCoordinatorAPIServerSvcName)
if err != nil {
klog.Errorf("coordinator failed to get apiServer service, %v", err)
return
}
etcdService, err := serviceList.Services(util.YurtHubNamespace).Get(util.DefaultPoolCoordinatorEtcdSvcName)
if err != nil {
klog.Errorf("coordinator failed to get etcd service, %v", err)
return
}

coorTransportMgr, err := poolCoordinatorTransportMgrGetter(cfg.HeartbeatTimeoutSeconds, cfg.CoordinatorServerURL, coorCertManager, ctx.Done())
// rewrite coordinator service info for cfg
coordinatorServerURL, err :=
url.Parse(fmt.Sprintf("https://%s:%s", apiServerService.Spec.ClusterIP, util.DefaultPoolCoordinatorAPIServerSvcPort))
if err != nil {
klog.Errorf("coordinator failed to parse apiServer service, %v", err)
return
}
cfg.CoordinatorServerURL = coordinatorServerURL
cfg.CoordinatorStorageAddr = fmt.Sprintf("https://%s:%s", etcdService.Spec.ClusterIP, util.DefaultPoolCoordinatorEtcdSvcPort)

coorTransportMgr, err := poolCoordinatorTransportMgrGetter(coorCertManager, ctx.Done())
if err != nil {
klog.Errorf("coordinator failed to create coordinator transport manager, %v", err)
return
Expand Down Expand Up @@ -280,6 +320,7 @@ func coordinatorRun(ctx context.Context,
coordinatorTransportMgr = coorTransportMgr
coordinatorHealthChecker = coorHealthChecker
coordinator = coor
coordinatorServiceUrl = coordinatorServerURL
}()

return func() healthchecker.HealthChecker {
Expand All @@ -288,12 +329,14 @@ func coordinatorRun(ctx context.Context,
return coordinatorTransportMgr
}, func() poolcoordinator.Coordinator {
return coordinator
}, func() *url.URL {
return coordinatorServiceUrl
}
}

func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
func poolCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
err := wait.PollImmediate(5*time.Second, 4*time.Minute, func() (done bool, err error) {
klog.Infof("waiting for preparing certificates for coordinator client and node lease proxy client")
klog.Info("waiting for preparing certificates for coordinator client and node lease proxy client")
if coordinatorCertMgr.GetAPIServerClientCert() == nil {
return false, nil
}
Expand Down Expand Up @@ -324,3 +367,7 @@ func getFakeCoordinatorHealthChecker() healthchecker.HealthChecker {
func getFakeCoordinatorTransportManager() transport.Interface {
return nil
}

func getFakeCoordinatorServerURL() *url.URL {
return nil
}
1 change: 0 additions & 1 deletion config/setup/yurthub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ metadata:
name: yurt-hub
namespace: kube-system
spec:
dnsPolicy: ClusterFirstWithHostNet
volumes:
- name: hub-dir
hostPath:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/poolcoordinator/cert/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func waitUntilSVCReady(clientSet client.Interface, serviceName string, stopCh <-
}

// prepare certmanager
ips = ip.ParseIPList(serverSVC.Spec.ClusterIPs)
ips = ip.ParseIPList([]string{serverSVC.Spec.ClusterIP})
dnsnames = serveraddr.GetDefaultDomainsForSvc(PoolcoordinatorNS, serviceName)

return ips, dnsnames, nil
Expand Down
1 change: 0 additions & 1 deletion pkg/yurtadm/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ metadata:
name: yurt-hub
namespace: kube-system
spec:
dnsPolicy: ClusterFirstWithHostNet
volumes:
- name: hub-dir
hostPath:
Expand Down
17 changes: 10 additions & 7 deletions pkg/yurthub/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,12 @@ type PoolCoordinatorProxy struct {
}

func NewPoolCoordinatorProxy(
poolCoordinatorAddr *url.URL,
localCacheMgr cachemanager.CacheManager,
transportMgrGetter func() transport.Interface,
coordinatorServerURLGetter func() *url.URL,
filterMgr *manager.Manager,
isCoordinatorReady func() bool,
stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) {
if poolCoordinatorAddr == nil {
return nil, fmt.Errorf("pool-coordinator addr cannot be nil")
}

pp := &PoolCoordinatorProxy{
localCacheMgr: localCacheMgr,
Expand All @@ -71,12 +68,18 @@ func NewPoolCoordinatorProxy(
for {
select {
case <-ticker.C:
// waiting for coordinator init finish
transportMgr := transportMgrGetter()
if transportMgr == nil {
break
}
coordinatorServerURL := coordinatorServerURLGetter()
if coordinatorServerURL == nil {
break
}

proxy, err := util.NewRemoteProxy(
poolCoordinatorAddr,
coordinatorServerURL,
pp.modifyResponse,
pp.errorHandler,
transportMgr,
Expand All @@ -87,7 +90,7 @@ func NewPoolCoordinatorProxy(
}

pp.poolCoordinatorProxy = proxy
klog.Infof("create remote proxy for pool-coordinator success")
klog.Infof("create remote proxy for pool-coordinator success, coordinatorServerURL: %s", coordinatorServerURL.String())
return
}
}
Expand Down Expand Up @@ -199,7 +202,7 @@ func (pp *PoolCoordinatorProxy) errorHandler(rw http.ResponseWriter, req *http.R

func (pp *PoolCoordinatorProxy) modifyResponse(resp *http.Response) error {
if resp == nil || resp.Request == nil {
klog.Infof("no request info in response, skip cache response")
klog.Info("no request info in response, skip cache response")
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"io"
"net/http"
"net/url"
"strings"

v1 "k8s.io/api/authorization/v1"
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewYurtReverseProxyHandler(
coordinatorGetter func() poolcoordinator.Coordinator,
coordinatorTransportMgrGetter func() transport.Interface,
coordinatorHealthCheckerGetter func() healthchecker.HealthChecker,
coordinatorServerURLGetter func() *url.URL,
stopCh <-chan struct{}) (http.Handler, error) {
cfg := &server.Config{
LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix),
Expand Down Expand Up @@ -122,9 +124,9 @@ func NewYurtReverseProxyHandler(

if yurtHubCfg.EnableCoordinator {
poolProxy, err = pool.NewPoolCoordinatorProxy(
yurtHubCfg.CoordinatorServerURL,
localCacheMgr,
coordinatorTransportMgrGetter,
coordinatorServerURLGetter,
yurtHubCfg.FilterManager,
isCoordinatorReady,
stopCh)
Expand Down

0 comments on commit d9dcae9

Please sign in to comment.