From a898dea4d4b87937972d343e38be9ae00fe75226 Mon Sep 17 00:00:00 2001 From: rambohe Date: Mon, 16 Jan 2023 09:49:25 +0800 Subject: [PATCH] bugfix: yurthub can not exit when SIGINT/SIGTERM happened (#1143) --- cmd/yurthub/app/config/config.go | 257 +++++++++++++------- cmd/yurthub/app/config/config_test.go | 47 ++++ cmd/yurthub/app/options/options.go | 28 ++- cmd/yurthub/app/options/options_test.go | 215 ++++++++++++++++ cmd/yurthub/app/start.go | 59 +---- pkg/yurthub/certificate/token/token.go | 24 +- pkg/yurthub/certificate/token/token_test.go | 33 ++- pkg/yurthub/filter/manager/manager.go | 3 +- pkg/yurthub/filter/manager/manager_test.go | 2 +- pkg/yurthub/gc/gc.go | 8 +- pkg/yurthub/kubernetes/rest/config_test.go | 7 +- pkg/yurthub/network/iptables.go | 8 +- pkg/yurthub/network/network.go | 30 +-- pkg/yurthub/proxy/util/util_test.go | 6 +- pkg/yurthub/server/certificate_test.go | 7 +- pkg/yurthub/server/server.go | 148 +++-------- pkg/yurthub/tenant/tenant.go | 6 +- pkg/yurthub/util/util.go | 6 +- 18 files changed, 550 insertions(+), 344 deletions(-) create mode 100644 cmd/yurthub/app/config/config_test.go create mode 100644 cmd/yurthub/app/options/options_test.go diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index d25e995002e..f333c411c7d 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -17,7 +17,6 @@ limitations under the License. package config import ( - "crypto/tls" "fmt" "net" "net/url" @@ -27,7 +26,11 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + apiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/dynamiccertificates" + apiserveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -41,10 +44,13 @@ import ( "github.com/openyurtio/openyurt/pkg/projectinfo" ipUtils "github.com/openyurtio/openyurt/pkg/util/ip" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + "github.com/openyurtio/openyurt/pkg/yurthub/certificate" + "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/network" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" "github.com/openyurtio/openyurt/pkg/yurthub/util" yurtcorev1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1" @@ -56,40 +62,33 @@ import ( // YurtHubConfiguration represents configuration of yurthub type YurtHubConfiguration struct { - LBMode string - RemoteServers []*url.URL - YurtHubServerAddr string - YurtHubCertOrganizations []string - YurtHubProxyServerAddr string - YurtHubProxyServerSecureAddr string - YurtHubProxyServerDummyAddr string - YurtHubProxyServerSecureDummyAddr string - GCFrequency int - NodeName string - HeartbeatFailedRetry int - HeartbeatHealthyThreshold int - HeartbeatTimeoutSeconds int - HeartbeatIntervalSeconds int - MaxRequestInFlight int - JoinToken string - RootDir string - EnableProfiling bool - EnableDummyIf bool - EnableIptables bool - HubAgentDummyIfName string - StorageWrapper cachemanager.StorageWrapper - SerializerManager *serializer.SerializerManager - RESTMapperManager *meta.RESTMapperManager - TLSConfig *tls.Config - SharedFactory informers.SharedInformerFactory - YurtSharedFactory yurtinformers.SharedInformerFactory - WorkingMode util.WorkingMode - KubeletHealthGracePeriod time.Duration - FilterManager *manager.Manager - CertIPs []net.IP - CoordinatorServer *url.URL - MinRequestTimeout time.Duration - CaCertHashes []string + LBMode string + RemoteServers []*url.URL + GCFrequency int + NodeName string + HeartbeatFailedRetry int + HeartbeatHealthyThreshold int + HeartbeatTimeoutSeconds int + HeartbeatIntervalSeconds int + MaxRequestInFlight int + EnableProfiling bool + StorageWrapper cachemanager.StorageWrapper + SerializerManager *serializer.SerializerManager + RESTMapperManager *meta.RESTMapperManager + SharedFactory informers.SharedInformerFactory + YurtSharedFactory yurtinformers.SharedInformerFactory + WorkingMode util.WorkingMode + KubeletHealthGracePeriod time.Duration + FilterManager *manager.Manager + CoordinatorServer *url.URL + MinRequestTimeout time.Duration + TenantNs string + NetworkMgr *network.NetworkManager + CertManager certificate.YurtCertificateManager + YurtHubServerServing *apiserver.DeprecatedInsecureServingInfo + YurtHubProxyServerServing *apiserver.DeprecatedInsecureServingInfo + YurtHubDummyProxyServerServing *apiserver.DeprecatedInsecureServingInfo + YurtHubSecureProxyServerServing *apiserver.SecureServingInfo } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -99,13 +98,6 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { return nil, err } - hubCertOrgs := make([]string, 0) - if options.YurtHubCertOrganizations != "" { - for _, orgStr := range strings.Split(options.YurtHubCertOrganizations, ",") { - hubCertOrgs = append(hubCertOrgs, orgStr) - } - } - storageManager, err := disk.NewDiskStorage(options.DiskCachePath) if err != nil { klog.Errorf("could not create storage manager, %v", err) @@ -119,17 +111,12 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { return nil, err } - hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort) - proxyServerAddr := net.JoinHostPort(options.YurtHubProxyHost, options.YurtHubProxyPort) - proxySecureServerAddr := net.JoinHostPort(options.YurtHubProxyHost, options.YurtHubProxySecurePort) - proxyServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxyPort) - proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort) workingMode := util.WorkingMode(options.WorkingMode) - sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr), options.EnableNodePool) + sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), options.EnableNodePool) if err != nil { return nil, err } - tenantNs := util.ParseTenantNs(options.YurtHubCertOrganizations) + tenantNs := util.ParseTenantNsFromOrgs(options.YurtHubCertOrganizations) registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled(options), options.NodePoolName, options.NodeName, tenantNs) filterManager, err := manager.NewFilterManager(options, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, us[0].Host) if err != nil { @@ -137,46 +124,46 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { return nil, err } - // use dummy ip and bind ip as cert IP SANs - certIPs := ipUtils.RemoveDupIPs([]net.IP{ - net.ParseIP(options.HubAgentDummyIfIP), - net.ParseIP(options.YurtHubHost), - net.ParseIP(options.YurtHubProxyHost), - }) - cfg := &YurtHubConfiguration{ - LBMode: options.LBMode, - RemoteServers: us, - YurtHubServerAddr: hubServerAddr, - YurtHubCertOrganizations: hubCertOrgs, - YurtHubProxyServerAddr: proxyServerAddr, - YurtHubProxyServerSecureAddr: proxySecureServerAddr, - YurtHubProxyServerDummyAddr: proxyServerDummyAddr, - YurtHubProxyServerSecureDummyAddr: proxySecureServerDummyAddr, - GCFrequency: options.GCFrequency, - NodeName: options.NodeName, - HeartbeatFailedRetry: options.HeartbeatFailedRetry, - HeartbeatHealthyThreshold: options.HeartbeatHealthyThreshold, - HeartbeatTimeoutSeconds: options.HeartbeatTimeoutSeconds, - HeartbeatIntervalSeconds: options.HeartbeatIntervalSeconds, - MaxRequestInFlight: options.MaxRequestInFlight, - JoinToken: options.JoinToken, - RootDir: options.RootDir, - EnableProfiling: options.EnableProfiling, - EnableDummyIf: options.EnableDummyIf, - EnableIptables: options.EnableIptables, - HubAgentDummyIfName: options.HubAgentDummyIfName, - WorkingMode: workingMode, - StorageWrapper: storageWrapper, - SerializerManager: serializerManager, - RESTMapperManager: restMapperManager, - SharedFactory: sharedFactory, - YurtSharedFactory: yurtSharedFactory, - KubeletHealthGracePeriod: options.KubeletHealthGracePeriod, - FilterManager: filterManager, - CertIPs: certIPs, - MinRequestTimeout: options.MinRequestTimeout, - CaCertHashes: options.CACertHashes, + LBMode: options.LBMode, + RemoteServers: us, + GCFrequency: options.GCFrequency, + NodeName: options.NodeName, + HeartbeatFailedRetry: options.HeartbeatFailedRetry, + HeartbeatHealthyThreshold: options.HeartbeatHealthyThreshold, + HeartbeatTimeoutSeconds: options.HeartbeatTimeoutSeconds, + HeartbeatIntervalSeconds: options.HeartbeatIntervalSeconds, + MaxRequestInFlight: options.MaxRequestInFlight, + EnableProfiling: options.EnableProfiling, + WorkingMode: workingMode, + StorageWrapper: storageWrapper, + SerializerManager: serializerManager, + RESTMapperManager: restMapperManager, + SharedFactory: sharedFactory, + YurtSharedFactory: yurtSharedFactory, + KubeletHealthGracePeriod: options.KubeletHealthGracePeriod, + FilterManager: filterManager, + MinRequestTimeout: options.MinRequestTimeout, + TenantNs: tenantNs, + } + + certMgr, err := createCertManager(options, us) + if err != nil { + return nil, err + } + cfg.CertManager = certMgr + + if options.EnableDummyIf { + klog.V(2).Infof("create dummy network interface %s(%s) and init iptables manager", options.HubAgentDummyIfName, options.HubAgentDummyIfIP) + networkMgr, err := network.NewNetworkManager(options) + if err != nil { + return nil, fmt.Errorf("could not create network manager, %w", err) + } + cfg.NetworkMgr = networkMgr + } + + if err = prepareServerServing(options, certMgr, cfg); err != nil { + return nil, err } return cfg, nil @@ -314,3 +301,97 @@ func serviceTopologyFilterEnabled(options *options.YurtHubOptions) bool { return true } + +func createCertManager(options *options.YurtHubOptions, remoteServers []*url.URL) (certificate.YurtCertificateManager, error) { + // use dummy ip and bind ip as cert IP SANs + certIPs := ipUtils.RemoveDupIPs([]net.IP{ + net.ParseIP(options.HubAgentDummyIfIP), + net.ParseIP(options.YurtHubHost), + net.ParseIP(options.YurtHubProxyHost), + }) + + cfg := &token.CertificateManagerConfiguration{ + RootDir: options.RootDir, + NodeName: options.NodeName, + JoinToken: options.JoinToken, + CaCertHashes: options.CACertHashes, + YurtHubCertOrganizations: options.YurtHubCertOrganizations, + CertIPs: certIPs, + RemoteServers: remoteServers, + Client: options.ClientForTest, + } + certManager, err := token.NewYurtHubCertManager(cfg) + if err != nil { + return nil, fmt.Errorf("failed to create cert manager for yurthub, %v", err) + } + + certManager.Start() + err = wait.PollImmediate(5*time.Second, 4*time.Minute, func() (bool, error) { + isReady := certManager.Ready() + if isReady { + return true, nil + } + return false, nil + }) + if err != nil { + return nil, fmt.Errorf("hub certificates preparation failed, %v", err) + } + + return certManager, nil +} + +func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.YurtCertificateManager, cfg *YurtHubConfiguration) error { + if err := (&apiserveroptions.DeprecatedInsecureServingOptions{ + BindAddress: net.ParseIP(options.YurtHubHost), + BindPort: options.YurtHubPort, + BindNetwork: "tcp", + }).ApplyTo(&cfg.YurtHubServerServing); err != nil { + return err + } + + if err := (&apiserveroptions.DeprecatedInsecureServingOptions{ + BindAddress: net.ParseIP(options.YurtHubProxyHost), + BindPort: options.YurtHubProxyPort, + BindNetwork: "tcp", + }).ApplyTo(&cfg.YurtHubProxyServerServing); err != nil { + return err + } + + yurtHubSecureProxyHost := options.YurtHubProxyHost + if options.EnableDummyIf { + yurtHubSecureProxyHost = options.HubAgentDummyIfIP + if err := (&apiserveroptions.DeprecatedInsecureServingOptions{ + BindAddress: net.ParseIP(options.HubAgentDummyIfIP), + BindPort: options.YurtHubProxyPort, + BindNetwork: "tcp", + }).ApplyTo(&cfg.YurtHubDummyProxyServerServing); err != nil { + return err + } + } + + serverCertPath := certMgr.GetHubServerCertFile() + serverCaPath := certMgr.GetCaFile() + klog.V(2).Infof("server cert path is: %s, ca path is: %s", serverCertPath, serverCaPath) + caBundleProvider, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca-bundle", serverCaPath) + if err != nil { + return err + } + + if err := (&apiserveroptions.SecureServingOptions{ + BindAddress: net.ParseIP(yurtHubSecureProxyHost), + BindPort: options.YurtHubProxySecurePort, + BindNetwork: "tcp", + ServerCert: apiserveroptions.GeneratableKeyCert{ + CertKey: apiserveroptions.CertKey{ + CertFile: serverCertPath, + KeyFile: serverCertPath, + }, + }, + }).ApplyTo(&cfg.YurtHubSecureProxyServerServing); err != nil { + return err + } + cfg.YurtHubSecureProxyServerServing.ClientCA = caBundleProvider + cfg.YurtHubSecureProxyServerServing.DisableHTTP2 = true + + return nil +} diff --git a/cmd/yurthub/app/config/config_test.go b/cmd/yurthub/app/config/config_test.go new file mode 100644 index 00000000000..cd657071e51 --- /dev/null +++ b/cmd/yurthub/app/config/config_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "testing" + + "github.com/openyurtio/openyurt/cmd/yurthub/app/options" + "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token/testdata" +) + +func TestComplete(t *testing.T) { + options := options.NewYurtHubOptions() + client, err := testdata.CreateCertFakeClient("../../../../pkg/yurthub/certificate/token/testdata") + if err != nil { + t.Errorf("failed to create cert fake client, %v", err) + return + } + options.ClientForTest = client + options.ServerAddr = "https://127.0.0.1:6443" + options.JoinToken = "123456.abcdef1234567890" + options.DiskCachePath = "/tmp/cache" + options.RootDir = "/tmp/cert" + options.NodeName = "foo" + options.EnableDummyIf = false + options.HubAgentDummyIfIP = "169.254.2.1" + cfg, err := Complete(options) + if err != nil { + t.Errorf("expect no err, but got %v", err) + } else if cfg == nil { + t.Errorf("expect cfg not nil, but got nil") + } +} diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index ded7e48b767..94075e70d6b 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -23,6 +23,7 @@ import ( "time" "github.com/spf13/pflag" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -43,11 +44,11 @@ type YurtHubOptions struct { ServerAddr string YurtHubHost string // YurtHub server host (e.g.: expose metrics API) YurtHubProxyHost string // YurtHub proxy server host - YurtHubPort string - YurtHubProxyPort string - YurtHubProxySecurePort string + YurtHubPort int + YurtHubProxyPort int + YurtHubProxySecurePort int GCFrequency int - YurtHubCertOrganizations string + YurtHubCertOrganizations []string NodeName string NodePoolName string LBMode string @@ -74,6 +75,7 @@ type YurtHubOptions struct { MinRequestTimeout time.Duration CACertHashes []string UnsafeSkipCAVerification bool + ClientForTest kubernetes.Interface } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -85,6 +87,7 @@ func NewYurtHubOptions() *YurtHubOptions { YurtHubPort: util.YurtHubPort, YurtHubProxySecurePort: util.YurtHubProxySecurePort, GCFrequency: 120, + YurtHubCertOrganizations: make([]string, 0), LBMode: "rr", HeartbeatFailedRetry: 3, HeartbeatHealthyThreshold: 2, @@ -104,6 +107,7 @@ func NewYurtHubOptions() *YurtHubOptions { KubeletHealthGracePeriod: time.Second * 40, EnableNodePool: true, MinRequestTimeout: time.Second * 1800, + CACertHashes: make([]string, 0), UnsafeSkipCAVerification: true, } return o @@ -119,6 +123,10 @@ func (options *YurtHubOptions) Validate() error { return fmt.Errorf("server-address is empty") } + if len(options.JoinToken) == 0 { + return fmt.Errorf("bootstrap token is empty") + } + if !util.IsSupportedLBMode(options.LBMode) { return fmt.Errorf("lb mode(%s) is not supported", options.LBMode) } @@ -132,7 +140,7 @@ func (options *YurtHubOptions) Validate() error { } if len(options.CACertHashes) == 0 && !options.UnsafeSkipCAVerification { - return fmt.Errorf("Set --discovery-token-unsafe-skip-ca-verification flag as true or pass CACertHashes to continue") + return fmt.Errorf("set --discovery-token-unsafe-skip-ca-verification flag as true or pass CACertHashes to continue") } return nil @@ -141,12 +149,12 @@ func (options *YurtHubOptions) Validate() error { // AddFlags returns flags for a specific yurthub by section name func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.YurtHubHost, "bind-address", o.YurtHubHost, "the IP address of YurtHub Server") - fs.StringVar(&o.YurtHubPort, "serve-port", o.YurtHubPort, "the port on which to serve HTTP requests(like profiling, metrics) for hub agent.") + fs.IntVar(&o.YurtHubPort, "serve-port", o.YurtHubPort, "the port on which to serve HTTP requests(like profiling, metrics) for hub agent.") fs.StringVar(&o.YurtHubProxyHost, "bind-proxy-address", o.YurtHubProxyHost, "the IP address of YurtHub Proxy Server") - fs.StringVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver") - fs.StringVar(&o.YurtHubProxySecurePort, "proxy-secure-port", o.YurtHubProxySecurePort, "the port on which to proxy HTTPS requests to kube-apiserver") + fs.IntVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver") + fs.IntVar(&o.YurtHubProxySecurePort, "proxy-secure-port", o.YurtHubProxySecurePort, "the port on which to proxy HTTPS requests to kube-apiserver") fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver,the format is: \"server1,server2,...\"") - fs.StringVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's client certificate in hubself cert-mgr-mode, the format is: certOrg1,certOrg1,...") + fs.StringSliceVar(&o.YurtHubCertOrganizations, "hub-cert-organizations", o.YurtHubCertOrganizations, "Organizations that will be added into hub's apiserver client certificate, the format is: certOrg1,certOrg2,...") fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).") fs.StringVar(&o.NodeName, "node-name", o.NodeName, "the name of node that runs hub agent") fs.StringVar(&o.LBMode, "lb-mode", o.LBMode, "the mode of load balancer to connect remote servers(rr, priority)") @@ -172,7 +180,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease") fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)") fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.") - fs.StringSliceVar(&o.CACertHashes, "discovery-token-ca-cert-hash", []string{}, "For token-based discovery, validate that the root CA public key matches this hash (format: \":\").") + fs.StringSliceVar(&o.CACertHashes, "discovery-token-ca-cert-hash", o.CACertHashes, "For token-based discovery, validate that the root CA public key matches this hash (format: \":\").") fs.BoolVar(&o.UnsafeSkipCAVerification, "discovery-token-unsafe-skip-ca-verification", o.UnsafeSkipCAVerification, "For token-based discovery, allow joining without --discovery-token-ca-cert-hash pinning.") } diff --git a/cmd/yurthub/app/options/options_test.go b/cmd/yurthub/app/options/options_test.go new file mode 100644 index 00000000000..5a005a2467c --- /dev/null +++ b/cmd/yurthub/app/options/options_test.go @@ -0,0 +1,215 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "fmt" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/spf13/cobra" + + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +func TestNewYurtHubOptions(t *testing.T) { + expectOptions := YurtHubOptions{ + YurtHubHost: "127.0.0.1", + YurtHubProxyHost: "127.0.0.1", + YurtHubProxyPort: util.YurtHubProxyPort, + YurtHubPort: util.YurtHubPort, + YurtHubProxySecurePort: util.YurtHubProxySecurePort, + GCFrequency: 120, + YurtHubCertOrganizations: make([]string, 0), + LBMode: "rr", + HeartbeatFailedRetry: 3, + HeartbeatHealthyThreshold: 2, + HeartbeatTimeoutSeconds: 2, + HeartbeatIntervalSeconds: 10, + MaxRequestInFlight: 250, + RootDir: filepath.Join("/var/lib/", projectinfo.GetHubName()), + EnableProfiling: true, + EnableDummyIf: true, + EnableIptables: true, + HubAgentDummyIfName: fmt.Sprintf("%s-dummy0", projectinfo.GetHubName()), + DiskCachePath: disk.CacheBaseDir, + AccessServerThroughHub: true, + EnableResourceFilter: true, + DisabledResourceFilters: make([]string, 0), + WorkingMode: string(util.WorkingModeEdge), + KubeletHealthGracePeriod: time.Second * 40, + EnableNodePool: true, + MinRequestTimeout: time.Second * 1800, + CACertHashes: make([]string, 0), + UnsafeSkipCAVerification: true, + } + + options := NewYurtHubOptions() + options.AddFlags((&cobra.Command{}).Flags()) + if !reflect.DeepEqual(expectOptions, *options) { + t.Errorf("expect options: %#+v, but got %#+v", expectOptions, *options) + } +} + +func TestValidate(t *testing.T) { + testcases := map[string]struct { + options *YurtHubOptions + isErr bool + }{ + "don't set node name": { + options: NewYurtHubOptions(), + isErr: true, + }, + "don't set server addr": { + options: &YurtHubOptions{ + NodeName: "foo", + }, + isErr: true, + }, + "don't set join token": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + }, + isErr: true, + }, + "invalid lb mode": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "invalid mode", + }, + isErr: true, + }, + "invalid working mode": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "invalid mode", + }, + isErr: true, + }, + "invalid dummy ip": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + HubAgentDummyIfIP: "invalid ip", + }, + isErr: true, + }, + "not in dummy cidr": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + HubAgentDummyIfIP: "169.250.0.0", + }, + isErr: true, + }, + "in exclusive cidr": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + HubAgentDummyIfIP: "169.254.31.1", + }, + isErr: true, + }, + "ip is 169.254.1.1": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + HubAgentDummyIfIP: "169.254.1.1", + }, + isErr: true, + }, + "no ca cert hashes": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + UnsafeSkipCAVerification: false, + }, + isErr: true, + }, + "normal options": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + UnsafeSkipCAVerification: true, + }, + isErr: false, + }, + "normal options with ipv4": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + UnsafeSkipCAVerification: true, + HubAgentDummyIfIP: "fd00::2:1", + }, + isErr: false, + }, + "normal options with ipv6": { + options: &YurtHubOptions{ + NodeName: "foo", + ServerAddr: "1.2.3.4:56", + JoinToken: "xxxx", + LBMode: "rr", + WorkingMode: "cloud", + UnsafeSkipCAVerification: true, + HubAgentDummyIfIP: "169.254.2.1", + }, + isErr: false, + }, + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + err := tc.options.Validate() + if tc.isErr && err == nil { + t.Errorf("expect return err, but got nil") + } else if !tc.isErr && err != nil { + t.Errorf("expect return nil, but got %v", err) + } + }) + } +} diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 018cb1986d8..ccfb7d02155 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -23,7 +23,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -32,11 +31,9 @@ import ( "github.com/openyurtio/openyurt/cmd/yurthub/app/options" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" - "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token" "github.com/openyurtio/openyurt/pkg/yurthub/gc" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" - "github.com/openyurtio/openyurt/pkg/yurthub/network" "github.com/openyurtio/openyurt/pkg/yurthub/proxy" "github.com/openyurtio/openyurt/pkg/yurthub/server" "github.com/openyurtio/openyurt/pkg/yurthub/tenant" @@ -84,30 +81,10 @@ func NewCmdStartYurtHub(stopCh <-chan struct{}) *cobra.Command { // Run runs the YurtHubConfiguration. This should never exit func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { + defer cfg.CertManager.Stop() trace := 1 - klog.Infof("%d. register cert managers", trace) - certManager, err := token.NewYurtHubCertManager(nil, cfg, stopCh) - if err != nil { - return fmt.Errorf("failed to create cert manager for yurthub, %v", err) - } - trace++ - - certManager.Start() - defer certManager.Stop() - err = wait.PollImmediate(5*time.Second, 4*time.Minute, func() (bool, error) { - isReady := certManager.Ready() - if isReady { - return true, nil - } - return false, nil - }) - if err != nil { - return fmt.Errorf("hub certificates preparation failed, %v", err) - } - trace++ - klog.Infof("%d. new transport manager", trace) - transportManager, err := transport.NewTransportManager(certManager, stopCh) + transportManager, err := transport.NewTransportManager(cfg.CertManager, stopCh) if err != nil { return fmt.Errorf("could not new transport manager, %w", err) } @@ -136,19 +113,12 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { trace++ klog.Infof("%d. new restConfig manager", trace) - restConfigMgr, err := hubrest.NewRestConfigManager(certManager, healthChecker) + restConfigMgr, err := hubrest.NewRestConfigManager(cfg.CertManager, healthChecker) if err != nil { return fmt.Errorf("could not new restConfig manager, %w", err) } trace++ - klog.Infof("%d. create tls config for secure servers ", trace) - cfg.TLSConfig, err = server.GenUseCertMgrAndTLSConfig(certManager) - if err != nil { - return fmt.Errorf("could not create tls config, %w", err) - } - trace++ - var cacheMgr cachemanager.CacheManager if cfg.WorkingMode == util.WorkingModeEdge { klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace) @@ -171,38 +141,29 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { trace++ klog.Infof("%d. new tenant sa manager", trace) - tenantMgr := tenant.New(cfg.YurtHubCertOrganizations, cfg.SharedFactory, stopCh) + tenantMgr := tenant.New(cfg.TenantNs, cfg.SharedFactory, stopCh) trace++ klog.Infof("%d. new reverse proxy handler for remote servers", trace) yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, tenantMgr, stopCh) - if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) } trace++ - if cfg.EnableDummyIf { - klog.Infof("%d. create dummy network interface %s and init iptables manager", trace, cfg.HubAgentDummyIfName) - networkMgr, err := network.NewNetworkManager(cfg) - if err != nil { - return fmt.Errorf("could not create network manager, %w", err) - } - networkMgr.Run(stopCh) - trace++ - klog.Infof("%d. new %s server and begin to serve, dummy proxy server: %s, secure dummy proxy server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerDummyAddr, cfg.YurtHubProxyServerSecureDummyAddr) + if cfg.NetworkMgr != nil { + cfg.NetworkMgr.Run(stopCh) } // start shared informers before start hub server cfg.SharedFactory.Start(stopCh) cfg.YurtSharedFactory.Start(stopCh) - klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr) - s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr) - if err != nil { - return fmt.Errorf("could not create hub server, %w", err) + klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName()) + if err := server.RunYurtHubServers(cfg, yurtProxyHandler, restConfigMgr, stopCh); err != nil { + return fmt.Errorf("could not run hub servers, %w", err) } - s.Run() + <-stopCh klog.Infof("hub agent exited") return nil } diff --git a/pkg/yurthub/certificate/token/token.go b/pkg/yurthub/certificate/token/token.go index 18c008bf6e9..dab61eda222 100644 --- a/pkg/yurthub/certificate/token/token.go +++ b/pkg/yurthub/certificate/token/token.go @@ -39,7 +39,6 @@ import ( "k8s.io/client-go/util/certificate" "k8s.io/klog/v2" - "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/projectinfo" yurtutil "github.com/openyurtio/openyurt/pkg/util" certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" @@ -65,6 +64,17 @@ var ( caCertIsNotReadyError = errors.New("ca.crt file") ) +type CertificateManagerConfiguration struct { + RootDir string + NodeName string + JoinToken string + CaCertHashes []string + YurtHubCertOrganizations []string + CertIPs []net.IP + RemoteServers []*url.URL + Client clientset.Interface +} + type yurtHubCertManager struct { client clientset.Interface remoteServers []*url.URL @@ -72,7 +82,7 @@ type yurtHubCertManager struct { apiServerClientCertManager certificate.Manager hubServerCertManager certificate.Manager apiServerClientCertStore certificate.FileStore - serverCertStore certificate.FileStore + hubServerCertStore certificate.FileStore hubRunDir string hubName string joinToken string @@ -80,7 +90,7 @@ type yurtHubCertManager struct { } // NewYurtHubCertManager new a YurtCertificateManager instance -func NewYurtHubCertManager(client clientset.Interface, cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) (hubCert.YurtCertificateManager, error) { +func NewYurtHubCertManager(cfg *CertificateManagerConfiguration) (hubCert.YurtCertificateManager, error) { var err error hubRunDir := cfg.RootDir @@ -89,7 +99,7 @@ func NewYurtHubCertManager(client clientset.Interface, cfg *config.YurtHubConfig } ycm := &yurtHubCertManager{ - client: client, + client: cfg.Client, remoteServers: cfg.RemoteServers, hubRunDir: hubRunDir, hubName: projectinfo.GetHubName(), @@ -112,12 +122,12 @@ func NewYurtHubCertManager(client clientset.Interface, cfg *config.YurtHubConfig } // 3. prepare yurthub server certificate manager - ycm.serverCertStore, err = store.NewFileStoreWrapper(fmt.Sprintf("%s-server", ycm.hubName), ycm.getPkiDir(), ycm.getPkiDir(), "", "") + ycm.hubServerCertStore, err = store.NewFileStoreWrapper(fmt.Sprintf("%s-server", ycm.hubName), ycm.getPkiDir(), ycm.getPkiDir(), "", "") if err != nil { return ycm, errors.Wrap(err, "couldn't new hub server cert store") } - ycm.hubServerCertManager, err = ycm.newHubServerCertificateManager(ycm.serverCertStore, cfg.NodeName, cfg.CertIPs) + ycm.hubServerCertManager, err = ycm.newHubServerCertificateManager(ycm.hubServerCertStore, cfg.NodeName, cfg.CertIPs) if err != nil { return ycm, errors.Wrap(err, "couldn't new hub server certificate manager") } @@ -286,7 +296,7 @@ func (ycm *yurtHubCertManager) GetHubServerCert() *tls.Certificate { } func (ycm *yurtHubCertManager) GetHubServerCertFile() string { - return ycm.serverCertStore.CurrentPath() + return ycm.hubServerCertStore.CurrentPath() } // newAPIServerClientCertificateManager create a certificate manager for yurthub component to prepare client certificate diff --git a/pkg/yurthub/certificate/token/token_test.go b/pkg/yurthub/certificate/token/token_test.go index bb9895821b7..d958c10c982 100644 --- a/pkg/yurthub/certificate/token/token_test.go +++ b/pkg/yurthub/certificate/token/token_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" - "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token/testdata" ) @@ -81,7 +80,6 @@ func TestGetHubConfFile(t *testing.T) { u, _ := url.Parse("http://127.0.0.1") remoteServers := []*url.URL{u} certIPs := []net.IP{net.ParseIP("127.0.0.1")} - stopCh := make(chan struct{}) testcases := map[string]struct { rootDir string path string @@ -98,14 +96,14 @@ func TestGetHubConfFile(t *testing.T) { for k, tc := range testcases { t.Run(k, func(t *testing.T) { - cfg := &config.YurtHubConfiguration{ + cfg := &CertificateManagerConfiguration{ NodeName: nodeName, RemoteServers: remoteServers, CertIPs: certIPs, RootDir: tc.rootDir, } - mgr, err := NewYurtHubCertManager(nil, cfg, stopCh) + mgr, err := NewYurtHubCertManager(cfg) if err != nil { t.Errorf("failed to new cert manager, %v", err) } @@ -122,7 +120,6 @@ func TestGetCaFile(t *testing.T) { u, _ := url.Parse("http://127.0.0.1") remoteServers := []*url.URL{u} certIPs := []net.IP{net.ParseIP("127.0.0.1")} - stopCh := make(chan struct{}) testcases := map[string]struct { rootDir string path string @@ -139,14 +136,14 @@ func TestGetCaFile(t *testing.T) { for k, tc := range testcases { t.Run(k, func(t *testing.T) { - cfg := &config.YurtHubConfiguration{ + cfg := &CertificateManagerConfiguration{ NodeName: nodeName, RemoteServers: remoteServers, CertIPs: certIPs, RootDir: tc.rootDir, } - mgr, err := NewYurtHubCertManager(nil, cfg, stopCh) + mgr, err := NewYurtHubCertManager(cfg) if err != nil { t.Errorf("failed to new cert manager, %v", err) } @@ -163,7 +160,6 @@ func TestGetHubServerCertFile(t *testing.T) { u, _ := url.Parse("http://127.0.0.1") remoteServers := []*url.URL{u} certIPs := []net.IP{net.ParseIP("127.0.0.1")} - stopCh := make(chan struct{}) testcases := map[string]struct { rootDir string path string @@ -180,14 +176,14 @@ func TestGetHubServerCertFile(t *testing.T) { for k, tc := range testcases { t.Run(k, func(t *testing.T) { - cfg := &config.YurtHubConfiguration{ + cfg := &CertificateManagerConfiguration{ NodeName: nodeName, RemoteServers: remoteServers, CertIPs: certIPs, RootDir: tc.rootDir, } - mgr, err := NewYurtHubCertManager(nil, cfg, stopCh) + mgr, err := NewYurtHubCertManager(cfg) if err != nil { t.Errorf("failed to new cert manager, %v", err) } @@ -209,7 +205,6 @@ func TestUpdateBootstrapConf(t *testing.T) { u, _ := url.Parse("http://127.0.0.1") remoteServers := []*url.URL{u} certIPs := []net.IP{net.ParseIP("127.0.0.1")} - stopCh := make(chan struct{}) testcases := map[string]struct { joinToken string err error @@ -228,13 +223,14 @@ func TestUpdateBootstrapConf(t *testing.T) { return } - mgr, err := NewYurtHubCertManager(client, &config.YurtHubConfiguration{ + mgr, err := NewYurtHubCertManager(&CertificateManagerConfiguration{ NodeName: nodeName, RemoteServers: remoteServers, CertIPs: certIPs, RootDir: rootDir, JoinToken: tc.joinToken, - }, stopCh) + Client: client, + }) if err != nil { t.Errorf("failed to new yurt cert manager, %v", err) return @@ -255,7 +251,6 @@ func TestReady(t *testing.T) { u, _ := url.Parse("http://127.0.0.1") remoteServers := []*url.URL{u} certIPs := []net.IP{net.ParseIP("127.0.0.1")} - stopCh := make(chan struct{}) client, err := testdata.CreateCertFakeClient("./testdata") if err != nil { @@ -263,14 +258,15 @@ func TestReady(t *testing.T) { return } - mgr, err := NewYurtHubCertManager(client, &config.YurtHubConfiguration{ + mgr, err := NewYurtHubCertManager(&CertificateManagerConfiguration{ NodeName: nodeName, RemoteServers: remoteServers, CertIPs: certIPs, RootDir: rootDir, JoinToken: joinToken, YurtHubCertOrganizations: []string{"yurthub:tenant:foo"}, - }, stopCh) + Client: client, + }) if err != nil { t.Errorf("failed to new yurt cert manager, %v", err) return @@ -292,14 +288,15 @@ func TestReady(t *testing.T) { // reuse the config and ca file t.Logf("go to check the reuse of config and ca file") - newMgr, err := NewYurtHubCertManager(client, &config.YurtHubConfiguration{ + newMgr, err := NewYurtHubCertManager(&CertificateManagerConfiguration{ NodeName: nodeName, RemoteServers: remoteServers, CertIPs: certIPs, RootDir: rootDir, JoinToken: joinToken, YurtHubCertOrganizations: []string{"yurthub:tenant:foo"}, - }, stopCh) + Client: client, + }) if err != nil { t.Errorf("failed to new another yurt cert manager, %v", err) return diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index 823071b256b..b243fea519c 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -19,6 +19,7 @@ package manager import ( "net" "net/http" + "strconv" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" @@ -58,7 +59,7 @@ func NewFilterManager(options *options.YurtHubOptions, mutatedMasterServiceHost, mutatedMasterServicePort, _ := net.SplitHostPort(apiserverAddr) if options.AccessServerThroughHub { - mutatedMasterServicePort = options.YurtHubProxySecurePort + mutatedMasterServicePort = strconv.Itoa(options.YurtHubProxySecurePort) if options.EnableDummyIf { mutatedMasterServiceHost = options.HubAgentDummyIfIP } else { diff --git a/pkg/yurthub/filter/manager/manager_test.go b/pkg/yurthub/filter/manager/manager_test.go index adf1e32c92f..1110491d721 100644 --- a/pkg/yurthub/filter/manager/manager_test.go +++ b/pkg/yurthub/filter/manager/manager_test.go @@ -146,7 +146,7 @@ func TestFindRunner(t *testing.T) { AccessServerThroughHub: tt.accessServerThroughHub, EnableDummyIf: tt.enableDummyIf, NodeName: "test", - YurtHubProxySecurePort: "10268", + YurtHubProxySecurePort: 10268, HubAgentDummyIfIP: "127.0.0.1", YurtHubProxyHost: "127.0.0.1", } diff --git a/pkg/yurthub/gc/gc.go b/pkg/yurthub/gc/gc.go index d7d5f89044e..344d2287a79 100644 --- a/pkg/yurthub/gc/gc.go +++ b/pkg/yurthub/gc/gc.go @@ -190,7 +190,7 @@ func (m *GCManager) gcEvents(kubeClient clientset.Interface, component string) { for _, key := range localEventKeys { _, _, ns, name := util.SplitKey(key.Key()) if len(ns) == 0 || len(name) == 0 { - klog.Infof("could not get namespace or name for event %s", key) + klog.Infof("could not get namespace or name for event %s", key.Key()) continue } @@ -198,16 +198,16 @@ func (m *GCManager) gcEvents(kubeClient clientset.Interface, component string) { if apierrors.IsNotFound(err) { deletedEvents = append(deletedEvents, key) } else if err != nil { - klog.Errorf("could not get %s %s event for node(%s), %v", component, key, m.nodeName, err) + klog.Errorf("could not get %s %s event for node(%s), %v", component, key.Key(), m.nodeName, err) break } } for _, key := range deletedEvents { if err := m.store.Delete(key); err != nil { - klog.Errorf("failed to gc events %s, %v", key, err) + klog.Errorf("failed to gc events %s, %v", key.Key(), err) } else { - klog.Infof("gc events %s successfully", key) + klog.Infof("gc events %s successfully", key.Key()) } } } diff --git a/pkg/yurthub/kubernetes/rest/config_test.go b/pkg/yurthub/kubernetes/rest/config_test.go index 2affa94effc..ee5c450dfe0 100644 --- a/pkg/yurthub/kubernetes/rest/config_test.go +++ b/pkg/yurthub/kubernetes/rest/config_test.go @@ -25,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" - "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token/testdata" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" @@ -36,7 +35,6 @@ var ( ) func TestGetRestConfig(t *testing.T) { - stopCh := make(chan struct{}) nodeName := "foo" servers := map[string]int{"https://10.10.10.113:6443": 2} u, _ := url.Parse("https://10.10.10.113:6443") @@ -49,13 +47,14 @@ func TestGetRestConfig(t *testing.T) { t.Errorf("failed to create cert fake client, %v", err) return } - certManager, err := token.NewYurtHubCertManager(client, &config.YurtHubConfiguration{ + certManager, err := token.NewYurtHubCertManager(&token.CertificateManagerConfiguration{ NodeName: nodeName, RemoteServers: remoteServers, CertIPs: certIPs, RootDir: testDir, JoinToken: "123456.abcdef1234567890", - }, stopCh) + Client: client, + }) if err != nil { t.Errorf("failed to create certManager, %v", err) return diff --git a/pkg/yurthub/network/iptables.go b/pkg/yurthub/network/iptables.go index c0e42d01e6c..d21f2da84f3 100644 --- a/pkg/yurthub/network/iptables.go +++ b/pkg/yurthub/network/iptables.go @@ -57,14 +57,10 @@ func NewIptablesManager(dummyIfIP, dummyIfPort string) *IptablesManager { func makeupIptablesRules(ifIP, ifPort string) []iptablesRule { return []iptablesRule{ - // accept traffic to 169.254.2.1:10261 + // accept traffic to 169.254.2.1:10261/169.254.2.1:10268 {iptables.Prepend, iptables.TableFilter, iptables.ChainInput, []string{"-p", "tcp", "-m", "comment", "--comment", "for container access hub agent", "--dport", ifPort, "--destination", ifIP, "-j", "ACCEPT"}}, - // accept traffic from 169.254.2.1:10261 + // accept traffic from 169.254.2.1:10261/169.254.2.1:10268 {iptables.Prepend, iptables.TableFilter, iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", ifIP, "-j", "ACCEPT"}}, - // accept traffic to localhost:10261 - {iptables.Prepend, iptables.TableFilter, iptables.ChainInput, []string{"-p", "tcp", "--dport", ifPort, "--destination", "localhost", "-j", "ACCEPT"}}, - // accept traffic from localhost:10261 - {iptables.Prepend, iptables.TableFilter, iptables.ChainOutput, []string{"-p", "tcp", "--sport", ifPort, "-s", "localhost", "-j", "ACCEPT"}}, } } diff --git a/pkg/yurthub/network/network.go b/pkg/yurthub/network/network.go index 895fea491c8..2e7801a5f0d 100644 --- a/pkg/yurthub/network/network.go +++ b/pkg/yurthub/network/network.go @@ -17,13 +17,13 @@ limitations under the License. package network import ( - "fmt" "net" + "strconv" "time" "k8s.io/klog/v2" - "github.com/openyurtio/openyurt/cmd/yurthub/app/config" + "github.com/openyurtio/openyurt/cmd/yurthub/app/options" ) const ( @@ -38,29 +38,17 @@ type NetworkManager struct { enableIptables bool } -func NewNetworkManager(cfg *config.YurtHubConfiguration) (*NetworkManager, error) { - if cfg == nil { - return nil, fmt.Errorf("configuration for hub agent is nil") - } - - ip, port, err := net.SplitHostPort(cfg.YurtHubProxyServerDummyAddr) - if err != nil { - return nil, err - } +func NewNetworkManager(options *options.YurtHubOptions) (*NetworkManager, error) { m := &NetworkManager{ ifController: NewDummyInterfaceController(), - iptablesManager: NewIptablesManager(ip, port), - dummyIfIP: net.ParseIP(ip), - dummyIfName: cfg.HubAgentDummyIfName, - enableIptables: cfg.EnableIptables, + iptablesManager: NewIptablesManager(options.HubAgentDummyIfIP, strconv.Itoa(options.YurtHubProxyPort)), + dummyIfIP: net.ParseIP(options.HubAgentDummyIfIP), + dummyIfName: options.HubAgentDummyIfName, + enableIptables: options.EnableIptables, } // secure port - _, securePort, err := net.SplitHostPort(cfg.YurtHubProxyServerSecureDummyAddr) - if err != nil { - return nil, err - } - m.iptablesManager.rules = append(m.iptablesManager.rules, makeupIptablesRules(ip, securePort)...) - if err = m.configureNetwork(); err != nil { + m.iptablesManager.rules = append(m.iptablesManager.rules, makeupIptablesRules(options.HubAgentDummyIfIP, strconv.Itoa(options.YurtHubProxySecurePort))...) + if err := m.configureNetwork(); err != nil { return nil, err } diff --git a/pkg/yurthub/proxy/util/util_test.go b/pkg/yurthub/proxy/util/util_test.go index 13f929c93ff..5a9fc7b328d 100644 --- a/pkg/yurthub/proxy/util/util_test.go +++ b/pkg/yurthub/proxy/util/util_test.go @@ -439,10 +439,9 @@ func TestWithSaTokenSubsitute(t *testing.T) { } resolver := newTestRequestInfoResolver() - orgs := []string{"system:bootstrappers:openyurt:tenant:myspace"} stopCh := make(<-chan struct{}) - tenantMgr := tenant.New(orgs, nil, stopCh) + tenantMgr := tenant.New("myspace", nil, stopCh) data := make(map[string][]byte) data["token"] = []byte(tenantToken) @@ -530,10 +529,9 @@ func TestWithSaTokenSubsituteTenantTokenEmpty(t *testing.T) { } resolver := newTestRequestInfoResolver() - orgs := []string{"system:bootstrappers:openyurt:tenant:myspace"} stopCh := make(<-chan struct{}) - tenantMgr := tenant.New(orgs, nil, stopCh) + tenantMgr := tenant.New("myspace", nil, stopCh) data := make(map[string][]byte) data["token"] = []byte(tenantToken) diff --git a/pkg/yurthub/server/certificate_test.go b/pkg/yurthub/server/certificate_test.go index aca35250e41..fe2a7a30445 100644 --- a/pkg/yurthub/server/certificate_test.go +++ b/pkg/yurthub/server/certificate_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" - "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/token/testdata" ) @@ -39,7 +38,6 @@ var ( ) func TestUpdateTokenHandler(t *testing.T) { - stopCh := make(chan struct{}) u, _ := url.Parse("https://10.10.10.113:6443") remoteServers := []*url.URL{u} certIPs := []net.IP{net.ParseIP("127.0.0.1")} @@ -48,13 +46,14 @@ func TestUpdateTokenHandler(t *testing.T) { t.Errorf("failed to create cert fake client, %v", err) return } - certManager, err := token.NewYurtHubCertManager(client, &config.YurtHubConfiguration{ + certManager, err := token.NewYurtHubCertManager(&token.CertificateManagerConfiguration{ NodeName: "foo", RemoteServers: remoteServers, CertIPs: certIPs, RootDir: testDir, JoinToken: "123456.abcdef1234567890", - }, stopCh) + Client: client, + }) if err != nil { t.Errorf("failed to create certManager, %v", err) return diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index fafe8b7c38c..d7b9b6e98ed 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -17,137 +17,63 @@ limitations under the License. package server import ( - "crypto/tls" "fmt" - "net" "net/http" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" - "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/profile" - "github.com/openyurtio/openyurt/pkg/util/certmanager" - "github.com/openyurtio/openyurt/pkg/yurthub/certificate" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate" + "github.com/openyurtio/openyurt/pkg/yurthub/util" ) -// Server is an interface for providing http service for yurthub -type Server interface { - Run() -} - -// yutHubServer includes hubServer and proxyServer, -// and hubServer handles requests by hub agent itself, like profiling, metrics, healthz -// and proxyServer does not handle requests locally and proxy requests to kube-apiserver -type yurtHubServer struct { - hubServer *http.Server - proxyServer *http.Server - secureProxyServer *http.Server - dummyProxyServer *http.Server - dummySecureProxyServer *http.Server -} - -// NewYurtHubServer creates a Server object -func NewYurtHubServer(cfg *config.YurtHubConfiguration, - certificateMgr certificate.YurtCertificateManager, +// RunYurtHubServers is used to start up all servers for yurthub +func RunYurtHubServers(cfg *config.YurtHubConfiguration, proxyHandler http.Handler, - rest *rest.RestConfigManager) (Server, error) { - hubMux := mux.NewRouter() - registerHandlers(hubMux, cfg, certificateMgr, rest) - hubServer := &http.Server{ - Addr: cfg.YurtHubServerAddr, - Handler: hubMux, - MaxHeaderBytes: 1 << 20, - } - - proxyHandler = wrapNonResourceHandler(proxyHandler, cfg, rest) - proxyServer := &http.Server{ - Addr: cfg.YurtHubProxyServerAddr, - Handler: proxyHandler, + rest *rest.RestConfigManager, + stopCh <-chan struct{}) error { + hubServerHandler := mux.NewRouter() + registerHandlers(hubServerHandler, cfg, rest) + + // start yurthub http server for serving metrics, pprof. + if cfg.YurtHubServerServing != nil { + if err := cfg.YurtHubServerServing.Serve(hubServerHandler, 0, stopCh); err != nil { + return err + } } - secureProxyServer := &http.Server{ - Addr: cfg.YurtHubProxyServerSecureAddr, - Handler: proxyHandler, - TLSConfig: cfg.TLSConfig, - TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), - MaxHeaderBytes: 1 << 20, + // start yurthub proxy servers for forwarding requests to cloud kube-apiserver + if cfg.WorkingMode == util.WorkingModeEdge { + proxyHandler = wrapNonResourceHandler(proxyHandler, cfg, rest) } - - var dummyProxyServer, secureDummyProxyServer *http.Server - if cfg.EnableDummyIf { - if _, err := net.InterfaceByName(cfg.HubAgentDummyIfName); err != nil { - return nil, err - } - - dummyProxyServer = &http.Server{ - Addr: cfg.YurtHubProxyServerDummyAddr, - Handler: proxyHandler, - MaxHeaderBytes: 1 << 20, - } - - secureDummyProxyServer = &http.Server{ - Addr: cfg.YurtHubProxyServerSecureDummyAddr, - Handler: proxyHandler, - TLSConfig: cfg.TLSConfig, - TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), - MaxHeaderBytes: 1 << 20, + if cfg.YurtHubProxyServerServing != nil { + if err := cfg.YurtHubProxyServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + return err } } - return &yurtHubServer{ - hubServer: hubServer, - proxyServer: proxyServer, - secureProxyServer: secureProxyServer, - dummyProxyServer: dummyProxyServer, - dummySecureProxyServer: secureDummyProxyServer, - }, nil -} - -// Run will start hub server and proxy server -func (s *yurtHubServer) Run() { - go func() { - err := s.hubServer.ListenAndServe() - if err != nil { - panic(err) + if cfg.YurtHubDummyProxyServerServing != nil { + if err := cfg.YurtHubDummyProxyServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + return err } - }() - - if s.dummyProxyServer != nil { - go func() { - err := s.dummyProxyServer.ListenAndServe() - if err != nil { - panic(err) - } - }() - go func() { - err := s.dummySecureProxyServer.ListenAndServeTLS("", "") - if err != nil { - panic(err) - } - }() } - go func() { - err := s.secureProxyServer.ListenAndServeTLS("", "") - if err != nil { - panic(err) + if cfg.YurtHubSecureProxyServerServing != nil { + if _, err := cfg.YurtHubSecureProxyServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + return err } - }() - - err := s.proxyServer.ListenAndServe() - if err != nil { - panic(err) } + + return nil } // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. -func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr certificate.YurtCertificateManager, rest *rest.RestConfigManager) { +func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, rest *rest.RestConfigManager) { // register handlers for update join token - c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT") + c.Handle("/v1/token", updateTokenHandler(cfg.CertManager)).Methods("POST", "PUT") // register handler for health check c.HandleFunc("/v1/healthz", healthz).Methods("GET") @@ -171,19 +97,3 @@ func healthz(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "OK") } - -// GenUseCertMgrAndTLSConfig create a certificate manager for the yurthub server and generate a TLS configuration -func GenUseCertMgrAndTLSConfig(certificateMgr certificate.YurtCertificateManager) (*tls.Config, error) { - // generate the TLS configuration based on the latest certificate - rootCert, err := certmanager.GenCertPoolUseCA(certificateMgr.GetCaFile()) - if err != nil { - klog.Errorf("could not generate a x509 CertPool based on the given CA file, %v", err) - return nil, err - } - tlsCfg, err := certmanager.GenTLSConfigUseCurrentCertAndCertPool(certificateMgr.GetHubServerCert, rootCert, "server") - if err != nil { - return nil, err - } - - return tlsCfg, nil -} diff --git a/pkg/yurthub/tenant/tenant.go b/pkg/yurthub/tenant/tenant.go index 431ca08e4f5..477a6207480 100644 --- a/pkg/yurthub/tenant/tenant.go +++ b/pkg/yurthub/tenant/tenant.go @@ -22,8 +22,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - - "github.com/openyurtio/openyurt/pkg/yurthub/util" ) type Interface interface { @@ -64,9 +62,7 @@ func (mgr *tenantManager) WaitForCacheSync() bool { return mgr.IsSynced } -func New(orgs []string, factory informers.SharedInformerFactory, stopCh <-chan struct{}) Interface { - - tenantNs := util.ParseTenantNsFromOrgs(orgs) +func New(tenantNs string, factory informers.SharedInformerFactory, stopCh <-chan struct{}) Interface { klog.Infof("parse tenant ns: %s", tenantNs) if tenantNs == "" { return nil diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index b7617b07233..e99af4d9f75 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -62,9 +62,9 @@ const ( YurtHubNamespace = "kube-system" CacheUserAgentsKey = "cache_agents" - YurtHubProxyPort = "10261" - YurtHubPort = "10267" - YurtHubProxySecurePort = "10268" + YurtHubProxyPort = 10261 + YurtHubPort = 10267 + YurtHubProxySecurePort = 10268 ) var (