From adbf5723c546f2189ba13e24fbd282fbaa8a2852 Mon Sep 17 00:00:00 2001 From: rambohe Date: Thu, 10 Nov 2022 10:51:13 +0800 Subject: [PATCH] add yurthub leader election and coordinator framework (#1035) * improve health checker for adapting coordinator * add yurthub leader election and coordinator framework --- cmd/yurthub/app/config/config.go | 4 + cmd/yurthub/app/options/options.go | 42 +++++ cmd/yurthub/app/start.go | 21 ++- pkg/yurthub/poolcoordinator/coordinator.go | 89 +++++++++++ .../poolcoordinator/leader_election.go | 143 ++++++++++++++++++ 5 files changed, 288 insertions(+), 11 deletions(-) create mode 100644 pkg/yurthub/poolcoordinator/coordinator.go create mode 100644 pkg/yurthub/poolcoordinator/leader_election.go diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index c2e2e6ece7a..263eec39692 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -24,6 +24,8 @@ import ( "strings" "time" + componentbaseconfig "k8s.io/component-base/config" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -92,6 +94,7 @@ type YurtHubConfiguration struct { CertIPs []net.IP CoordinatorServer *url.URL MinRequestTimeout time.Duration + LeaderElection componentbaseconfig.LeaderElectionConfiguration } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -180,6 +183,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { FilterManager: filterManager, CertIPs: certIPs, MinRequestTimeout: options.MinRequestTimeout, + LeaderElection: options.LeaderElection, } return cfg, nil diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index 171989c91f9..8be6094f773 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -23,6 +23,9 @@ import ( "time" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection/resourcelock" + componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -74,6 +77,7 @@ type YurtHubOptions struct { KubeletHealthGracePeriod time.Duration EnableNodePool bool MinRequestTimeout time.Duration + LeaderElection componentbaseconfig.LeaderElectionConfiguration } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -106,6 +110,15 @@ func NewYurtHubOptions() *YurtHubOptions { KubeletHealthGracePeriod: time.Second * 40, EnableNodePool: true, MinRequestTimeout: time.Second * 1800, + LeaderElection: componentbaseconfig.LeaderElectionConfiguration{ + LeaderElect: true, + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceLock: resourcelock.LeasesResourceLock, + ResourceName: projectinfo.GetHubName(), + ResourceNamespace: "kube-system", + }, } return o } @@ -171,6 +184,35 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease") fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)") fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.") + bindFlags(&o.LeaderElection, fs) +} + +// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset +func bindFlags(l *componentbaseconfig.LeaderElectionConfiguration, fs *pflag.FlagSet) { + fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+ + "Start a leader election client and gain leadership based on pool coordinator") + fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+ + "The duration that non-leader candidates will wait after observing a leadership "+ + "renewal until attempting to acquire leadership of a led but unrenewed leader "+ + "slot. This is effectively the maximum duration that a leader can be stopped "+ + "before it is replaced by another candidate. This is only applicable if leader "+ + "election is enabled.") + fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+ + "The interval between attempts by the acting master to renew a leadership slot "+ + "before it stops leading. This must be less than or equal to the lease duration. "+ + "This is only applicable if leader election is enabled.") + fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+ + "The duration the clients should wait between attempting acquisition and renewal "+ + "of a leadership. This is only applicable if leader election is enabled.") + fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+ + "The type of resource object that is used for locking during "+ + "leader election. Supported options are `leases` (default), `endpoints` and `configmaps`.") + fs.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+ + "The name of resource object that is used for locking during "+ + "leader election.") + fs.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+ + "The namespace of resource object that is used for locking during "+ + "leader election.") } // verifyDummyIP verify the specified ip is valid or not and set the default ip if empty diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 5b9a7308354..08983809218 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -226,17 +226,16 @@ func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url healthCheckerClientsForCloud[remoteServers[i].String()] = c } - // comment the following code temporarily - //cfg := &rest.Config{ - // Host: coordinatorServer.String(), - // Transport: tp.CurrentTransport(), - // Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second, - //} - //c, err := kubernetes.NewForConfig(cfg) - //if err != nil { - // return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err - //} - //healthCheckerClientForCoordinator = c + cfg := &rest.Config{ + Host: coordinatorServer.String(), + Transport: tp.CurrentTransport(), + Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second, + } + c, err := kubernetes.NewForConfig(cfg) + if err != nil { + return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err + } + healthCheckerClientForCoordinator = c return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil } diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go new file mode 100644 index 00000000000..6afdc8ffd84 --- /dev/null +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -0,0 +1,89 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poolcoordinator + +import ( + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" +) + +type Coordinator struct { + coordinatorHealthChecker healthchecker.HealthChecker + hubElector *HubElector + informerStarted bool +} + +func NewCoordinator(coordinatorHealthChecker healthchecker.HealthChecker, elector *HubElector, stopCh <-chan struct{}) *Coordinator { + return &Coordinator{ + coordinatorHealthChecker: coordinatorHealthChecker, + hubElector: elector, + } +} + +func (coordinator *Coordinator) Run(stopCh <-chan struct{}) { + for { + select { + case <-stopCh: + klog.Infof("exit normally in coordinator loop.") + if coordinator.informerStarted { + // stop shared informer + // + coordinator.informerStarted = false + } + return + case electorStatus, ok := <-coordinator.hubElector.StatusChan(): + if !ok { + return + } + + if electorStatus != PendingHub && !coordinator.cacheIsUploaded() { + // upload local cache, and make sure yurthub pod is the last resource uploaded + } + + if electorStatus == LeaderHub { + if !coordinator.informerStarted { + coordinator.informerStarted = true + // start shared informer for pool-scope data + // make sure + + // start shared informer for lease delegating + // + } + break + } + + if electorStatus == FollowerHub { + if coordinator.informerStarted { + // stop shared informer + // + coordinator.informerStarted = false + } + } + } + } +} + +func (coordinator *Coordinator) cacheIsUploaded() bool { + // check yurthub pod is uploaded + return true +} + +func (coordinator *Coordinator) IsReady() bool { + + return true +} diff --git a/pkg/yurthub/poolcoordinator/leader_election.go b/pkg/yurthub/poolcoordinator/leader_election.go new file mode 100644 index 00000000000..ed9eae6e620 --- /dev/null +++ b/pkg/yurthub/poolcoordinator/leader_election.go @@ -0,0 +1,143 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poolcoordinator + +import ( + "context" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" +) + +const ( + InitHub int32 = iota // 0 + LeaderHub + FollowerHub + PendingHub +) + +type HubElector struct { + coordinatorClient kubernetes.Interface + coordinatorHealthChecker healthchecker.HealthChecker + cloudAPIServerHealthChecker healthchecker.HealthChecker + electorStatus chan int32 + le *leaderelection.LeaderElector + inElecting bool +} + +func NewHubElector(cfg *config.YurtHubConfiguration, coordinatorClient kubernetes.Interface, cloudAPIServerHealthyChecker healthchecker.HealthChecker, stopCh <-chan struct{}) (*HubElector, error) { + coordinatorHealthyChecker, err := healthchecker.NewCoordinatorHealthChecker(cfg, coordinatorClient, cloudAPIServerHealthyChecker, stopCh) + if err != nil { + return nil, err + } + + he := &HubElector{ + coordinatorClient: coordinatorClient, + coordinatorHealthChecker: coordinatorHealthyChecker, + cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker, + electorStatus: make(chan int32), + } + + rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock, + cfg.LeaderElection.ResourceNamespace, + cfg.LeaderElection.ResourceName, + coordinatorClient.CoreV1(), + coordinatorClient.CoordinationV1(), + resourcelock.ResourceLockConfig{Identity: cfg.NodeName}) + if err != nil { + return nil, err + } + + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: cfg.LeaderElection.LeaseDuration.Duration, + RenewDeadline: cfg.LeaderElection.RenewDeadline.Duration, + RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.Infof("yurthub of %s became lease", cfg.NodeName) + he.electorStatus <- LeaderHub + }, + OnStoppedLeading: func() { + + }, + }, + }) + if err != nil { + return nil, err + } + he.le = le + he.electorStatus <- PendingHub + + return he, nil +} + +func (he *HubElector) Run(stopCh <-chan struct{}) { + intervalTicker := time.NewTicker(5 * time.Second) + defer intervalTicker.Stop() + defer close(he.electorStatus) + + var ctx context.Context + var cancel context.CancelFunc + for { + select { + case <-stopCh: + klog.Infof("exit normally in leader election loop.") + + if cancel != nil { + cancel() + he.inElecting = false + } + return + case <-intervalTicker.C: + if !he.coordinatorHealthChecker.IsHealthy() { + if he.inElecting && cancel != nil { + cancel() + he.inElecting = false + he.electorStatus <- PendingHub + } + break + } + + if !he.cloudAPIServerHealthChecker.IsHealthy() { + if he.inElecting && cancel != nil { + cancel() + he.inElecting = false + he.electorStatus <- FollowerHub + } + break + } + + if !he.inElecting { + he.electorStatus <- FollowerHub + ctx, cancel = context.WithCancel(context.TODO()) + go he.le.Run(ctx) + he.inElecting = true + } + } + } +} + +func (he *HubElector) StatusChan() chan int32 { + return he.electorStatus +}