Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【FIX】fix yurthub dnsPolicy when using pool-coordinator #1321

Merged
merged 1 commit into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 73 additions & 14 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package app
import (
"context"
"fmt"
"net"
"net/url"
"time"

Expand All @@ -27,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
Expand Down Expand Up @@ -95,7 +97,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
trace++

klog.Infof("%d. prepare cloud kube clients", trace)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServerURL, transportManager)
cloudClients, err := createClients(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, transportManager)
if err != nil {
return fmt.Errorf("failed to create cloud clients, %w", err)
}
Expand Down Expand Up @@ -151,6 +153,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager
var coordinatorGetter func() poolcoordinator.Coordinator = getFakeCoordinator
var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL

if cfg.EnableCoordinator {
klog.Infof("%d. start to run coordinator", trace)
Expand All @@ -160,11 +163,12 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has
// been registered into informer factory.
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, coordinatorServerURLGetter =
coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
// wait for coordinator informer registry
klog.Infof("waiting for coordinator informer registry")
klog.Info("waiting for coordinator informer registry")
<-coordinatorInformerRegistryChan
klog.Infof("coordinator informer registry finished")
klog.Info("coordinator informer registry finished")
}

// Start the informer factory if all informers have been registered
Expand All @@ -181,6 +185,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
coordinatorGetter,
coordinatorTransportManagerGetter,
coordinatorHealthCheckerGetter,
coordinatorServerURLGetter,
ctx.Done())
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
Expand All @@ -196,13 +201,13 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
return fmt.Errorf("could not run hub servers, %w", err)
}
<-ctx.Done()
klog.Infof("hub agent exited")
klog.Info("hub agent exited")
return nil
}

// createClients will create clients for all cloud APIServer and client for pool coordinator
// It will return a map, mapping cloud APIServer URL to its client, and a pool coordinator client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) {
// createClients will create clients for all cloud APIServer
// It will return a map, mapping cloud APIServer URL to its client
func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, tp transport.Interface) (map[string]kubernetes.Interface, error) {
cloudClients := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Expand All @@ -220,16 +225,22 @@ func createClients(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordi
}

// coordinatorRun will initialize and start all coordinator-related components in an async way.
// It returns Getter function for coordinator, coordinator health checker and coordinator transport manager,
// It returns Getter function for coordinator, coordinator health checker, coordinator transport manager and coordinator service url,
// which will return the relative component if it has been initialized, otherwise it will return nil.
func coordinatorRun(ctx context.Context,
cfg *config.YurtHubConfiguration,
JameKeal marked this conversation as resolved.
Show resolved Hide resolved
restConfigMgr *hubrest.RestConfigManager,
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker,
coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator) {
coordinatorInformerRegistryChan chan struct{}) (
func() healthchecker.HealthChecker,
func() transport.Interface,
func() poolcoordinator.Coordinator,
func() *url.URL) {

var coordinatorHealthChecker healthchecker.HealthChecker
var coordinatorTransportMgr transport.Interface
var coordinator poolcoordinator.Coordinator
var coordinatorServiceUrl *url.URL

go func() {
coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.ProxiedClient, cfg.SharedFactory)
Expand All @@ -238,9 +249,50 @@ func coordinatorRun(ctx context.Context,
klog.Errorf("coordinator failed to create coordinator cert manager, %v", err)
return
}
klog.Infof("coordinator new certManager success")
klog.Info("coordinator new certManager success")

// waiting for service sync complete
if !cache.WaitForCacheSync(ctx.Done(), cfg.SharedFactory.Core().V1().Services().Informer().HasSynced) {
klog.Error("coordinatorRun sync service shutdown")
return
}
klog.Info("coordinatorRun sync service complete")

// resolve pool-coordinator-apiserver and etcd from domain to ips
serviceList := cfg.SharedFactory.Core().V1().Services().Lister()
// if pool-coordinator-apiserver and pool-coordinator-etcd address is ip, don't need to resolve
apiServerIP := net.ParseIP(cfg.CoordinatorServerURL.Hostname())
etcdUrl, err := url.Parse(cfg.CoordinatorStorageAddr)
if err != nil {
klog.Errorf("coordinator parse etcd address failed: %+v", err)
return
}
etcdIP := net.ParseIP(etcdUrl.Hostname())
if apiServerIP == nil {
apiServerService, err := serviceList.Services(util.YurtHubNamespace).Get(cfg.CoordinatorServerURL.Hostname())
if err != nil {
klog.Errorf("coordinator failed to get apiServer service, %v", err)
return
}
// rewrite coordinator service info for cfg
coordinatorServerURL, err :=
url.Parse(fmt.Sprintf("https://%s:%s", apiServerService.Spec.ClusterIP, cfg.CoordinatorServerURL.Port()))
if err != nil {
klog.Errorf("coordinator failed to parse apiServer service, %v", err)
return
}
cfg.CoordinatorServerURL = coordinatorServerURL
}
if etcdIP == nil {
etcdService, err := serviceList.Services(util.YurtHubNamespace).Get(etcdUrl.Hostname())
if err != nil {
klog.Errorf("coordinator failed to get etcd service, %v", err)
return
}
cfg.CoordinatorStorageAddr = fmt.Sprintf("https://%s:%s", etcdService.Spec.ClusterIP, etcdUrl.Port())
}

coorTransportMgr, err := poolCoordinatorTransportMgrGetter(cfg.HeartbeatTimeoutSeconds, cfg.CoordinatorServerURL, coorCertManager, ctx.Done())
coorTransportMgr, err := poolCoordinatorTransportMgrGetter(coorCertManager, ctx.Done())
if err != nil {
klog.Errorf("coordinator failed to create coordinator transport manager, %v", err)
return
Expand Down Expand Up @@ -280,6 +332,7 @@ func coordinatorRun(ctx context.Context,
coordinatorTransportMgr = coorTransportMgr
coordinatorHealthChecker = coorHealthChecker
coordinator = coor
coordinatorServiceUrl = cfg.CoordinatorServerURL
}()

return func() healthchecker.HealthChecker {
Expand All @@ -288,12 +341,14 @@ func coordinatorRun(ctx context.Context,
return coordinatorTransportMgr
}, func() poolcoordinator.Coordinator {
return coordinator
}, func() *url.URL {
return coordinatorServiceUrl
}
}

func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
func poolCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
err := wait.PollImmediate(5*time.Second, 4*time.Minute, func() (done bool, err error) {
klog.Infof("waiting for preparing certificates for coordinator client and node lease proxy client")
klog.Info("waiting for preparing certificates for coordinator client and node lease proxy client")
if coordinatorCertMgr.GetAPIServerClientCert() == nil {
return false, nil
}
Expand Down Expand Up @@ -324,3 +379,7 @@ func getFakeCoordinatorHealthChecker() healthchecker.HealthChecker {
func getFakeCoordinatorTransportManager() transport.Interface {
return nil
}

func getFakeCoordinatorServerURL() *url.URL {
return nil
}
2 changes: 1 addition & 1 deletion pkg/controller/poolcoordinator/cert/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func waitUntilSVCReady(clientSet client.Interface, serviceName string, stopCh <-
}

// prepare certmanager
ips = ip.ParseIPList(serverSVC.Spec.ClusterIPs)
ips = ip.ParseIPList([]string{serverSVC.Spec.ClusterIP})
JameKeal marked this conversation as resolved.
Show resolved Hide resolved
dnsnames = serveraddr.GetDefaultDomainsForSvc(PoolcoordinatorNS, serviceName)

return ips, dnsnames, nil
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/poolcoordinator/cert/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

"github.com/openyurtio/openyurt/pkg/util/ip"
)

const (
Expand Down Expand Up @@ -111,3 +113,27 @@ func TestGetAPIServerSVCURL(t *testing.T) {
assert.Equal(t, nil, err)
assert.Equal(t, "https://xxxx:644", url)
}

func TestWaitUntilSVCReady(t *testing.T) {
stop := make(chan struct{})
defer close(stop)

normalClient := fake.NewSimpleClientset(&corev1.Service{
ObjectMeta: v1.ObjectMeta{
Namespace: PoolcoordinatorNS,
Name: PoolcoordinatorAPIServerSVC,
},
Spec: corev1.ServiceSpec{
ClusterIP: "xxxx",
Ports: []corev1.ServicePort{
{
Port: 644,
},
},
},
})
ips, _, err := waitUntilSVCReady(normalClient, PoolcoordinatorAPIServerSVC, stop)
assert.Equal(t, nil, err)
expectIPS := ip.ParseIPList([]string{"xxxx"})
assert.Equal(t, expectIPS, ips)
}
17 changes: 10 additions & 7 deletions pkg/yurthub/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,12 @@ type PoolCoordinatorProxy struct {
}

func NewPoolCoordinatorProxy(
poolCoordinatorAddr *url.URL,
localCacheMgr cachemanager.CacheManager,
transportMgrGetter func() transport.Interface,
coordinatorServerURLGetter func() *url.URL,
filterMgr *manager.Manager,
isCoordinatorReady func() bool,
stopCh <-chan struct{}) (*PoolCoordinatorProxy, error) {
if poolCoordinatorAddr == nil {
return nil, fmt.Errorf("pool-coordinator addr cannot be nil")
}

pp := &PoolCoordinatorProxy{
localCacheMgr: localCacheMgr,
Expand All @@ -71,12 +68,18 @@ func NewPoolCoordinatorProxy(
for {
select {
case <-ticker.C:
// waiting for coordinator init finish
transportMgr := transportMgrGetter()
if transportMgr == nil {
break
}
coordinatorServerURL := coordinatorServerURLGetter()
if coordinatorServerURL == nil {
break
}

proxy, err := util.NewRemoteProxy(
poolCoordinatorAddr,
coordinatorServerURL,
pp.modifyResponse,
pp.errorHandler,
transportMgr,
Expand All @@ -87,7 +90,7 @@ func NewPoolCoordinatorProxy(
}

pp.poolCoordinatorProxy = proxy
klog.Infof("create remote proxy for pool-coordinator success")
klog.Infof("create remote proxy for pool-coordinator success, coordinatorServerURL: %s", coordinatorServerURL.String())
return
}
}
Expand Down Expand Up @@ -199,7 +202,7 @@ func (pp *PoolCoordinatorProxy) errorHandler(rw http.ResponseWriter, req *http.R

func (pp *PoolCoordinatorProxy) modifyResponse(resp *http.Response) error {
if resp == nil || resp.Request == nil {
klog.Infof("no request info in response, skip cache response")
klog.Info("no request info in response, skip cache response")
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"io"
"net/http"
"net/url"
"strings"

v1 "k8s.io/api/authorization/v1"
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewYurtReverseProxyHandler(
coordinatorGetter func() poolcoordinator.Coordinator,
coordinatorTransportMgrGetter func() transport.Interface,
coordinatorHealthCheckerGetter func() healthchecker.HealthChecker,
coordinatorServerURLGetter func() *url.URL,
stopCh <-chan struct{}) (http.Handler, error) {
cfg := &server.Config{
LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix),
Expand Down Expand Up @@ -122,9 +124,9 @@ func NewYurtReverseProxyHandler(

if yurtHubCfg.EnableCoordinator {
poolProxy, err = pool.NewPoolCoordinatorProxy(
yurtHubCfg.CoordinatorServerURL,
localCacheMgr,
coordinatorTransportMgrGetter,
coordinatorServerURLGetter,
yurtHubCfg.FilterManager,
isCoordinatorReady,
stopCh)
Expand Down