From 06e3154b7f500fc3700e2b373fbda3f9cb7dfe17 Mon Sep 17 00:00:00 2001 From: rambohe-ch Date: Mon, 24 Oct 2022 18:36:04 +0800 Subject: [PATCH] add yurthub leader election and coordinator framework --- cmd/yurthub/app/config/config.go | 4 + cmd/yurthub/app/options/options.go | 42 ++++++ cmd/yurthub/app/start.go | 21 ++- pkg/yurthub/poolcoordinator/coordinator.go | 89 +++++++++++ .../poolcoordinator/leader_election.go | 141 ++++++++++++++++++ 5 files changed, 286 insertions(+), 11 deletions(-) create mode 100644 pkg/yurthub/poolcoordinator/coordinator.go create mode 100644 pkg/yurthub/poolcoordinator/leader_election.go diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 79f05ce0541..ce6c4620c5c 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -24,6 +24,8 @@ import ( "strings" "time" + componentbaseconfig "k8s.io/component-base/config" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -94,6 +96,7 @@ type YurtHubConfiguration struct { FilterManager *filter.Manager CertIPs []net.IP CoordinatorServer *url.URL + LeaderElection componentbaseconfig.LeaderElectionConfiguration } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -178,6 +181,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { KubeletHealthGracePeriod: options.KubeletHealthGracePeriod, FilterManager: filterManager, CertIPs: certIPs, + LeaderElection: options.LeaderElection, } return cfg, nil diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index 7670f77c1c2..7bb713e29e2 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -23,6 +23,9 @@ import ( "time" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection/resourcelock" + componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" @@ -73,6 +76,7 @@ type YurtHubOptions struct { WorkingMode string KubeletHealthGracePeriod time.Duration EnableNodePool bool + LeaderElection componentbaseconfig.LeaderElectionConfiguration } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -104,6 +108,15 @@ func NewYurtHubOptions() *YurtHubOptions { WorkingMode: string(util.WorkingModeEdge), KubeletHealthGracePeriod: time.Second * 40, EnableNodePool: true, + LeaderElection: componentbaseconfig.LeaderElectionConfiguration{ + LeaderElect: true, + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceLock: resourcelock.LeasesResourceLock, + ResourceName: projectinfo.GetHubName(), + ResourceNamespace: "kube-system", + }, } return o } @@ -168,6 +181,35 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).") fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease") fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)") + bindFlags(&o.LeaderElection, fs) +} + +// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset +func bindFlags(l *componentbaseconfig.LeaderElectionConfiguration, fs *pflag.FlagSet) { + fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+ + "Start a leader election client and gain leadership based on pool coordinator") + fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+ + "The duration that non-leader candidates will wait after observing a leadership "+ + "renewal until attempting to acquire leadership of a led but unrenewed leader "+ + "slot. This is effectively the maximum duration that a leader can be stopped "+ + "before it is replaced by another candidate. This is only applicable if leader "+ + "election is enabled.") + fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+ + "The interval between attempts by the acting master to renew a leadership slot "+ + "before it stops leading. This must be less than or equal to the lease duration. "+ + "This is only applicable if leader election is enabled.") + fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+ + "The duration the clients should wait between attempting acquisition and renewal "+ + "of a leadership. This is only applicable if leader election is enabled.") + fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+ + "The type of resource object that is used for locking during "+ + "leader election. Supported options are `leases` (default), `endpoints` and `configmaps`.") + fs.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+ + "The name of resource object that is used for locking during "+ + "leader election.") + fs.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+ + "The namespace of resource object that is used for locking during "+ + "leader election.") } // verifyDummyIP verify the specified ip is valid or not and set the default ip if empty diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 06163ac5bad..64798f14669 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -229,17 +229,16 @@ func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url healthCheckerClientsForCloud[remoteServers[i].String()] = c } - // comment the following code temporarily - //cfg := &rest.Config{ - // Host: coordinatorServer.String(), - // Transport: tp.CurrentTransport(), - // Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second, - //} - //c, err := kubernetes.NewForConfig(cfg) - //if err != nil { - // return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err - //} - //healthCheckerClientForCoordinator = c + cfg := &rest.Config{ + Host: coordinatorServer.String(), + Transport: tp.CurrentTransport(), + Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second, + } + c, err := kubernetes.NewForConfig(cfg) + if err != nil { + return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err + } + healthCheckerClientForCoordinator = c return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil } diff --git a/pkg/yurthub/poolcoordinator/coordinator.go b/pkg/yurthub/poolcoordinator/coordinator.go new file mode 100644 index 00000000000..d5ca84a07c2 --- /dev/null +++ b/pkg/yurthub/poolcoordinator/coordinator.go @@ -0,0 +1,89 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poolcoordinator + +import ( + "time" + + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" +) + +type Coordinator struct { + coordinatorHealthChecker healthchecker.HealthChecker + hubElector *HubElector + informerStarted bool +} + +func NewCoordinator(coordinatorHealthChecker healthchecker.HealthChecker, elector *HubElector, stopCh <-chan struct{}) *Coordinator { + return &Coordinator{ + coordinatorHealthChecker: coordinatorHealthChecker, + hubElector: elector, + } +} + +func (coordinator *Coordinator) Run(stopCh <-chan struct{}) { + intervalTicker := time.NewTicker(5 * time.Second) + defer intervalTicker.Stop() + + for { + select { + case <-stopCh: + klog.Infof("exit normally in coordinator loop.") + return + case <-intervalTicker.C: + if !coordinator.coordinatorHealthChecker.IsHealthy() { + coordinator.informerStarted = false + // stop shared informer + break + } + + if !coordinator.cacheIsUploaded() { + // upload local cache, and make sure yurthub pod is the last resource uploaded + } + + if coordinator.hubElector.Status() != LeaderHub { + if coordinator.informerStarted { + // stop shared informer + // + coordinator.informerStarted = false + } + break + } else { + if !coordinator.informerStarted { + coordinator.informerStarted = true + // start shared informer for pool-scope data + // make sure + + // start shared informer for lease delegating + // + } + } + } + } +} + +func (coordinator *Coordinator) cacheIsUploaded() bool { + // check yurthub pod is uploaded + return true +} + +func (coordinator *Coordinator) IsReady() bool { + + return true +} diff --git a/pkg/yurthub/poolcoordinator/leader_election.go b/pkg/yurthub/poolcoordinator/leader_election.go new file mode 100644 index 00000000000..614eba31c14 --- /dev/null +++ b/pkg/yurthub/poolcoordinator/leader_election.go @@ -0,0 +1,141 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poolcoordinator + +import ( + "context" + "sync/atomic" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" +) + +const ( + InitHub int32 = iota // 0 + LeaderHub + FollowerHub + PendingHub +) + +type HubElector struct { + coordinatorClient kubernetes.Interface + coordinatorHealthChecker healthchecker.HealthChecker + cloudAPIServerHealthChecker healthchecker.HealthChecker + electorStatus int32 + le *leaderelection.LeaderElector + inElecting bool +} + +func NewHubElector(cfg *config.YurtHubConfiguration, coordinatorClient kubernetes.Interface, cloudAPIServerHealthyChecker healthchecker.HealthChecker, stopCh <-chan struct{}) (*HubElector, error) { + coordinatorHealthyChecker, err := healthchecker.NewCoordinatorHealthChecker(cfg, coordinatorClient, cloudAPIServerHealthyChecker, stopCh) + if err != nil { + return nil, err + } + + he := &HubElector{ + coordinatorClient: coordinatorClient, + coordinatorHealthChecker: coordinatorHealthyChecker, + cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker, + electorStatus: InitHub, + } + + rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock, + cfg.LeaderElection.ResourceNamespace, + cfg.LeaderElection.ResourceName, + coordinatorClient.CoreV1(), + coordinatorClient.CoordinationV1(), + resourcelock.ResourceLockConfig{Identity: cfg.NodeName}) + if err != nil { + return nil, err + } + + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: cfg.LeaderElection.LeaseDuration.Duration, + RenewDeadline: cfg.LeaderElection.RenewDeadline.Duration, + RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.Infof("yurthub of %s became lease", cfg.NodeName) + atomic.StoreInt32(&he.electorStatus, LeaderHub) + }, + OnStoppedLeading: func() { + + }, + }, + }) + if err != nil { + return nil, err + } + he.le = le + + return he, nil +} + +func (he *HubElector) Run(stopCh <-chan struct{}) { + intervalTicker := time.NewTicker(5 * time.Second) + defer intervalTicker.Stop() + + var ctx context.Context + var cancel context.CancelFunc + for { + select { + case <-stopCh: + klog.Infof("exit normally in leader election loop.") + + if cancel != nil { + cancel() + he.inElecting = false + } + return + case <-intervalTicker.C: + if !he.coordinatorHealthChecker.IsHealthy() { + atomic.StoreInt32(&he.electorStatus, PendingHub) + if he.inElecting && cancel != nil { + cancel() + he.inElecting = false + } + break + } + + if !he.cloudAPIServerHealthChecker.IsHealthy() { + atomic.StoreInt32(&he.electorStatus, FollowerHub) + if he.inElecting && cancel != nil { + cancel() + he.inElecting = false + } + break + } + + if !he.inElecting { + ctx, cancel = context.WithCancel(context.TODO()) + go he.le.Run(ctx) + he.inElecting = true + } + } + } +} + +func (he *HubElector) Status() int32 { + return atomic.LoadInt32(&he.electorStatus) +}