Skip to content

Commit

Permalink
fix yurthub dnsPolicy when using pool-coordinator and update pool-coo…
Browse files Browse the repository at this point in the history
…rdinator from domain to service ip
  • Loading branch information
JameKeal committed Apr 4, 2023
1 parent 056ba74 commit 1314e29
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 23 deletions.
87 changes: 73 additions & 14 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package app
import (
"context"
"fmt"
"net"
"net/url"
"time"

Expand All @@ -27,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
Expand Down Expand Up @@ -95,7 +97,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
trace++

klog.Infof("%d. prepare cloud kube clients", trace)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServerURL, transportManager)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager)
if err != nil {
return fmt.Errorf("failed to create cloud clients, %w", err)
}
Expand Down Expand Up @@ -151,6 +153,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager
var coordinatorGetter func() poolcoordinator.Coordinator = getFakeCoordinator
var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL

if cfg.EnableCoordinator {
klog.Infof("%d. start to run coordinator", trace)
Expand All @@ -160,11 +163,12 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has
// been registered into informer factory.
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, coordinatorServerURLGetter =
coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
// wait for coordinator informer registry
klog.Infof("waiting for coordinator informer registry")
klog.Info("waiting for coordinator informer registry")
<-coordinatorInformerRegistryChan
klog.Infof("coordinator informer registry finished")
klog.Info("coordinator informer registry finished")
}

// Start the informer factory if all informers have been registered
Expand All @@ -181,6 +185,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
coordinatorGetter,
coordinatorTransportManagerGetter,
coordinatorHealthCheckerGetter,
coordinatorServerURLGetter,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
Expand All @@ -196,13 +201,13 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
return fmt.Errorf("could not run hub servers, %w", err)
}
<-ctx.Done()
klog.Infof("hub agent exited")
klog.Info("hub agent exited")
return nil
}

// createClients will create clients for all cloud APIServer and client for pool coordinator
// It will return a map, mapping cloud APIServer URL to its client, and a pool coordinator client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) {
// createClients will create clients for all cloud APIServer
// It will return a map, mapping cloud APIServer URL to its client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) {
cloudClients := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Expand All @@ -220,16 +225,22 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi
}

// coordinatorRun will initialize and start all coordinator-related components in an async way.
// It returns Getter function for coordinator, coordinator health checker and coordinator transport manager,
// It returns Getter function for coordinator, coordinator health checker, coordinator transport manager and coordinator service url,
// which will return the relative component if it has been initialized, otherwise it will return nil.
func coordinatorRun(ctx context.Context,
cfg *config.YurtHubConfiguration,
restConfigMgr *hubrest.RestConfigManager,
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker,
coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator) {
coordinatorInformerRegistryChan chan struct{}) (
func() healthchecker.HealthChecker,
func() transport.Interface,
func() poolcoordinator.Coordinator,
func() *url.URL) {

var coordinatorHealthChecker healthchecker.HealthChecker
var coordinatorTransportMgr transport.Interface
var coordinator poolcoordinator.Coordinator
var coordinatorServiceUrl *url.URL

go func() {
coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.ProxiedClient, cfg.SharedFactory)
Expand All @@ -238,9 +249,50 @@ func coordinatorRun(ctx context.Context,
klog.Errorf("coordinator failed to create coordinator cert manager, %v", err)
return
}
klog.Infof("coordinator new certManager success")
klog.Info("coordinator new certManager success")

// waiting for service sync complete
if !cache.WaitForCacheSync(ctx.Done(), cfg.SharedFactory.Core().V1().Services().Informer().HasSynced) {
klog.Error("coordinatorRun sync service shutdown")
return
}
klog.Info("coordinatorRun sync service complete")

// resolve pool-coordinator-apiserver and etcd from domain to ips
serviceList := cfg.SharedFactory.Core().V1().Services().Lister()
// if pool-coordinator-apiserver and pool-coordinator-etcd address is ip, don't need to resolve
apiServerIP := net.ParseIP(cfg.CoordinatorServerURL.Hostname())
etcdUrl, err := url.Parse(cfg.CoordinatorStorageAddr)
if err != nil {
klog.Errorf("coordinator parse etcd address failed: %+v", err)
return
}
etcdIP := net.ParseIP(etcdUrl.Hostname())
if apiServerIP == nil {
apiServerService, err := serviceList.Services(util.YurtHubNamespace).Get(cfg.CoordinatorServerURL.Hostname())
if err != nil {
klog.Errorf("coordinator failed to get apiServer service, %v", err)
return
}
// rewrite coordinator service info for cfg
coordinatorServerURL, err :=
url.Parse(fmt.Sprintf("https://%s:%s", apiServerService.Spec.ClusterIP, cfg.CoordinatorServerURL.Port()))
if err != nil {
klog.Errorf("coordinator failed to parse apiServer service, %v", err)
return
}
cfg.CoordinatorServerURL = coordinatorServerURL
}
if etcdIP == nil {
etcdService, err := serviceList.Services(util.YurtHubNamespace).Get(etcdUrl.Hostname())
if err != nil {
klog.Errorf("coordinator failed to get etcd service, %v", err)
return
}
cfg.CoordinatorStorageAddr = fmt.Sprintf("https://%s:%s", etcdService.Spec.ClusterIP, etcdUrl.Port())
}

coorTransportMgr, err := poolCoordinatorTransportMgrGetter(cfg.HeartbeatTimeoutSeconds, cfg.CoordinatorServerURL, coorCertManager, ctx.Done())
coorTransportMgr, err := poolCoordinatorTransportMgrGetter(coorCertManager, ctx.Done())
if err != nil {
klog.Errorf("coordinator failed to create coordinator transport manager, %v", err)
return
Expand Down Expand Up @@ -280,6 +332,7 @@ func coordinatorRun(ctx context.Context,
coordinatorTransportMgr = coorTransportMgr
coordinatorHealthChecker = coorHealthChecker
coordinator = coor
coordinatorServiceUrl = cfg.CoordinatorServerURL
}()

return func() healthchecker.HealthChecker {
Expand All @@ -288,12 +341,14 @@ func coordinatorRun(ctx context.Context,
return coordinatorTransportMgr
}, func() poolcoordinator.Coordinator {
return coordinator
}, func() *url.URL {
return coordinatorServiceUrl
}
}

func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
func poolCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
err := wait.PollImmediate(5*time.Second, 4*time.Minute, func() (done bool, err error) {
klog.Infof("waiting for preparing certificates for coordinator client and node lease proxy client")
klog.Info("waiting for preparing certificates for coordinator client and node lease proxy client")
if coordinatorCertMgr.GetAPIServerClientCert() == nil {
return false, nil
}
Expand Down Expand Up @@ -324,3 +379,7 @@ func getFakeCoordinatorHealthChecker() healthchecker.HealthChecker {
func getFakeCoordinatorTransportManager() transport.Interface {
return nil
}

func getFakeCoordinatorServerURL() *url.URL {
return nil
}
2 changes: 1 addition & 1 deletion pkg/controller/poolcoordinator/cert/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func waitUntilSVCReady(clientSet client.Interface, serviceName string, stopCh <-
}

// prepare certmanager
ips = ip.ParseIPList(serverSVC.Spec.ClusterIPs)
ips = ip.ParseIPList([]string{serverSVC.Spec.ClusterIP})
dnsnames = serveraddr.GetDefaultDomainsForSvc(PoolcoordinatorNS, serviceName)

return ips, dnsnames, nil
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/poolcoordinator/cert/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

"github.com/openyurtio/openyurt/pkg/util/ip"
)

const (
Expand Down Expand Up @@ -111,3 +113,27 @@ func TestGetAPIServerSVCURL(t *testing.T) {
assert.Equal(t, nil, err)
assert.Equal(t, "https://xxxx:644", url)
}

func TestWaitUntilSVCReady(t *testing.T) {
stop := make(chan struct{})
defer close(stop)

normalClient := fake.NewSimpleClientset(&corev1.Service{
ObjectMeta: v1.ObjectMeta{
Namespace: PoolcoordinatorNS,
Name: PoolcoordinatorAPIServerSVC,
},
Spec: corev1.ServiceSpec{
ClusterIP: "xxxx",
Ports: []corev1.ServicePort{
{
Port: 644,
},
},
},
})
ips, _, err := waitUntilSVCReady(normalClient, PoolcoordinatorAPIServerSVC, stop)
assert.Equal(t, nil, err)
expectIPS := ip.ParseIPList([]string{"xxxx"})
assert.Equal(t, expectIPS, ips)
}
17 changes: 10 additions & 7 deletions pkg/yurthub/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,12 @@ type PoolCoordinatorProxy struct {
}

func NewPoolCoordinatorProxy(
poolCoordinatorAddr *url.URL,
localCacheMgr cachemanager.CacheManager,
transportMgrGetter func() transport.Interface,
coordinatorServerURLGetter func() *url.URL,
filterMgr *manager.Manager,
isCoordinatorReady func() bool,
stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) {
if poolCoordinatorAddr == nil {
return nil, fmt.Errorf("pool-coordinator addr cannot be nil")
}

pp := &PoolCoordinatorProxy{
localCacheMgr: localCacheMgr,
Expand All @@ -71,12 +68,18 @@ func NewPoolCoordinatorProxy(
for {
select {
case <-ticker.C:
// waiting for coordinator init finish
transportMgr := transportMgrGetter()
if transportMgr == nil {
break
}
coordinatorServerURL := coordinatorServerURLGetter()
if coordinatorServerURL == nil {
break
}

proxy, err := util.NewRemoteProxy(
poolCoordinatorAddr,
coordinatorServerURL,
pp.modifyResponse,
pp.errorHandler,
transportMgr,
Expand All @@ -87,7 +90,7 @@ func NewPoolCoordinatorProxy(
}

pp.poolCoordinatorProxy = proxy
klog.Infof("create remote proxy for pool-coordinator success")
klog.Infof("create remote proxy for pool-coordinator success, coordinatorServerURL: %s", coordinatorServerURL.String())
return
}
}
Expand Down Expand Up @@ -199,7 +202,7 @@ func (pp *PoolCoordinatorProxy) errorHandler(rw http.ResponseWriter, req *http.R

func (pp *PoolCoordinatorProxy) modifyResponse(resp *http.Response) error {
if resp == nil || resp.Request == nil {
klog.Infof("no request info in response, skip cache response")
klog.Info("no request info in response, skip cache response")
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"io"
"net/http"
"net/url"
"strings"

v1 "k8s.io/api/authorization/v1"
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewYurtReverseProxyHandler(
coordinatorGetter func() poolcoordinator.Coordinator,
coordinatorTransportMgrGetter func() transport.Interface,
coordinatorHealthCheckerGetter func() healthchecker.HealthChecker,
coordinatorServerURLGetter func() *url.URL,
stopCh <-chan struct{}) (http.Handler, error) {
cfg := &server.Config{
LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix),
Expand Down Expand Up @@ -122,9 +124,9 @@ func NewYurtReverseProxyHandler(

if yurtHubCfg.EnableCoordinator {
poolProxy, err = pool.NewPoolCoordinatorProxy(
yurtHubCfg.CoordinatorServerURL,
localCacheMgr,
coordinatorTransportMgrGetter,
coordinatorServerURLGetter,
yurtHubCfg.FilterManager,
isCoordinatorReady,
stopCh)
Expand Down

0 comments on commit 1314e29

Please sign in to comment.