Skip to content

Commit

Permalink
add yurthub leader election and coordinator framework (#1035)
Browse files Browse the repository at this point in the history
* improve health checker for adapting coordinator

* add yurthub leader election and coordinator framework
  • Loading branch information
rambohe-ch committed Nov 23, 2022
1 parent 08aa8e3 commit 0db2b87
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 11 deletions.
4 changes: 4 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

componentbaseconfig "k8s.io/component-base/config"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -92,6 +94,7 @@ type YurtHubConfiguration struct {
CertIPs []net.IP
CoordinatorServer *url.URL
MinRequestTimeout time.Duration
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -180,6 +183,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
FilterManager: filterManager,
CertIPs: certIPs,
MinRequestTimeout: options.MinRequestTimeout,
LeaderElection: options.LeaderElection,
}

return cfg, nil
Expand Down
42 changes: 42 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"time"

"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"

Expand Down Expand Up @@ -74,6 +77,7 @@ type YurtHubOptions struct {
KubeletHealthGracePeriod time.Duration
EnableNodePool bool
MinRequestTimeout time.Duration
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -106,6 +110,15 @@ func NewYurtHubOptions() *YurtHubOptions {
KubeletHealthGracePeriod: time.Second * 40,
EnableNodePool: true,
MinRequestTimeout: time.Second * 1800,
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceName: projectinfo.GetHubName(),
ResourceNamespace: "kube-system",
},
}
return o
}
Expand Down Expand Up @@ -171,6 +184,35 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)")
fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.")
bindFlags(&o.LeaderElection, fs)
}

// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset
func bindFlags(l *componentbaseconfig.LeaderElectionConfiguration, fs *pflag.FlagSet) {
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership based on pool coordinator")
fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+
"The type of resource object that is used for locking during "+
"leader election. Supported options are `leases` (default), `endpoints` and `configmaps`.")
fs.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+
"The name of resource object that is used for locking during "+
"leader election.")
fs.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+
"The namespace of resource object that is used for locking during "+
"leader election.")
}

// verifyDummyIP verify the specified ip is valid or not and set the default ip if empty
Expand Down
21 changes: 10 additions & 11 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,16 @@ func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url
healthCheckerClientsForCloud[remoteServers[i].String()] = c
}

// comment the following code temporarily
//cfg := &rest.Config{
// Host: coordinatorServer.String(),
// Transport: tp.CurrentTransport(),
// Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
//}
//c, err := kubernetes.NewForConfig(cfg)
//if err != nil {
// return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
//}
//healthCheckerClientForCoordinator = c
cfg := &rest.Config{
Host: coordinatorServer.String(),
Transport: tp.CurrentTransport(),
Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
}
c, err := kubernetes.NewForConfig(cfg)
if err != nil {
return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
}
healthCheckerClientForCoordinator = c

return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil
}
89 changes: 89 additions & 0 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2022 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poolcoordinator

import (
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
)

type Coordinator struct {
coordinatorHealthChecker healthchecker.HealthChecker
hubElector *HubElector
informerStarted bool
}

func NewCoordinator(coordinatorHealthChecker healthchecker.HealthChecker, elector *HubElector, stopCh <-chan struct{}) *Coordinator {
return &Coordinator{
coordinatorHealthChecker: coordinatorHealthChecker,
hubElector: elector,
}
}

func (coordinator *Coordinator) Run(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
klog.Infof("exit normally in coordinator loop.")
if coordinator.informerStarted {
// stop shared informer
//
coordinator.informerStarted = false
}
return
case electorStatus, ok := <-coordinator.hubElector.StatusChan():
if !ok {
return
}

if electorStatus != PendingHub && !coordinator.cacheIsUploaded() {
// upload local cache, and make sure yurthub pod is the last resource uploaded
}

if electorStatus == LeaderHub {
if !coordinator.informerStarted {
coordinator.informerStarted = true
// start shared informer for pool-scope data
// make sure

// start shared informer for lease delegating
//
}
break
}

if electorStatus == FollowerHub {
if coordinator.informerStarted {
// stop shared informer
//
coordinator.informerStarted = false
}
}
}
}
}

func (coordinator *Coordinator) cacheIsUploaded() bool {
// check yurthub pod is uploaded
return true
}

func (coordinator *Coordinator) IsReady() bool {

return true
}
143 changes: 143 additions & 0 deletions pkg/yurthub/poolcoordinator/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
Copyright 2022 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poolcoordinator

import (
"context"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
)

const (
InitHub int32 = iota // 0
LeaderHub
FollowerHub
PendingHub
)

type HubElector struct {
coordinatorClient kubernetes.Interface
coordinatorHealthChecker healthchecker.HealthChecker
cloudAPIServerHealthChecker healthchecker.HealthChecker
electorStatus chan int32
le *leaderelection.LeaderElector
inElecting bool
}

func NewHubElector(cfg *config.YurtHubConfiguration, coordinatorClient kubernetes.Interface, cloudAPIServerHealthyChecker healthchecker.HealthChecker, stopCh <-chan struct{}) (*HubElector, error) {
coordinatorHealthyChecker, err := healthchecker.NewCoordinatorHealthChecker(cfg, coordinatorClient, cloudAPIServerHealthyChecker, stopCh)
if err != nil {
return nil, err
}

he := &HubElector{
coordinatorClient: coordinatorClient,
coordinatorHealthChecker: coordinatorHealthyChecker,
cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker,
electorStatus: make(chan int32),
}

rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock,
cfg.LeaderElection.ResourceNamespace,
cfg.LeaderElection.ResourceName,
coordinatorClient.CoreV1(),
coordinatorClient.CoordinationV1(),
resourcelock.ResourceLockConfig{Identity: cfg.NodeName})
if err != nil {
return nil, err
}

le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: cfg.LeaderElection.LeaseDuration.Duration,
RenewDeadline: cfg.LeaderElection.RenewDeadline.Duration,
RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("yurthub of %s became lease", cfg.NodeName)
he.electorStatus <- LeaderHub
},
OnStoppedLeading: func() {

},
},
})
if err != nil {
return nil, err
}
he.le = le
he.electorStatus <- PendingHub

return he, nil
}

func (he *HubElector) Run(stopCh <-chan struct{}) {
intervalTicker := time.NewTicker(5 * time.Second)
defer intervalTicker.Stop()
defer close(he.electorStatus)

var ctx context.Context
var cancel context.CancelFunc
for {
select {
case <-stopCh:
klog.Infof("exit normally in leader election loop.")

if cancel != nil {
cancel()
he.inElecting = false
}
return
case <-intervalTicker.C:
if !he.coordinatorHealthChecker.IsHealthy() {
if he.inElecting && cancel != nil {
cancel()
he.inElecting = false
he.electorStatus <- PendingHub
}
break
}

if !he.cloudAPIServerHealthChecker.IsHealthy() {
if he.inElecting && cancel != nil {
cancel()
he.inElecting = false
he.electorStatus <- FollowerHub
}
break
}

if !he.inElecting {
he.electorStatus <- FollowerHub
ctx, cancel = context.WithCancel(context.TODO())
go he.le.Run(ctx)
he.inElecting = true
}
}
}
}

func (he *HubElector) StatusChan() chan int32 {
return he.electorStatus
}

0 comments on commit 0db2b87

Please sign in to comment.