Skip to content

Commit

Permalink
add yurthub leader election and coordinator framework
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch committed Oct 24, 2022
1 parent de6703b commit 06e3154
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 11 deletions.
4 changes: 4 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

componentbaseconfig "k8s.io/component-base/config"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -94,6 +96,7 @@ type YurtHubConfiguration struct {
FilterManager *filter.Manager
CertIPs []net.IP
CoordinatorServer *url.URL
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -178,6 +181,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterManager: filterManager,
CertIPs: certIPs,
LeaderElection: options.LeaderElection,
}

return cfg, nil
Expand Down
42 changes: 42 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"time"

"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"

Expand Down Expand Up @@ -73,6 +76,7 @@ type YurtHubOptions struct {
WorkingMode string
KubeletHealthGracePeriod time.Duration
EnableNodePool bool
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -104,6 +108,15 @@ func NewYurtHubOptions() *YurtHubOptions {
WorkingMode: string(util.WorkingModeEdge),
KubeletHealthGracePeriod: time.Second * 40,
EnableNodePool: true,
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceName: projectinfo.GetHubName(),
ResourceNamespace: "kube-system",
},
}
return o
}
Expand Down Expand Up @@ -168,6 +181,35 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).")
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)")
bindFlags(&o.LeaderElection, fs)
}

// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset
func bindFlags(l *componentbaseconfig.LeaderElectionConfiguration, fs *pflag.FlagSet) {
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership based on pool coordinator")
fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+
"The type of resource object that is used for locking during "+
"leader election. Supported options are `leases` (default), `endpoints` and `configmaps`.")
fs.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+
"The name of resource object that is used for locking during "+
"leader election.")
fs.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+
"The namespace of resource object that is used for locking during "+
"leader election.")
}

// verifyDummyIP verify the specified ip is valid or not and set the default ip if empty
Expand Down
21 changes: 10 additions & 11 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,16 @@ func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url
healthCheckerClientsForCloud[remoteServers[i].String()] = c
}

// comment the following code temporarily
//cfg := &rest.Config{
// Host: coordinatorServer.String(),
// Transport: tp.CurrentTransport(),
// Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
//}
//c, err := kubernetes.NewForConfig(cfg)
//if err != nil {
// return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
//}
//healthCheckerClientForCoordinator = c
cfg := &rest.Config{
Host: coordinatorServer.String(),
Transport: tp.CurrentTransport(),
Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
}
c, err := kubernetes.NewForConfig(cfg)
if err != nil {
return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
}
healthCheckerClientForCoordinator = c

return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil
}
89 changes: 89 additions & 0 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2022 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poolcoordinator

import (
"time"

"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
)

type Coordinator struct {
coordinatorHealthChecker healthchecker.HealthChecker
hubElector *HubElector
informerStarted bool
}

func NewCoordinator(coordinatorHealthChecker healthchecker.HealthChecker, elector *HubElector, stopCh <-chan struct{}) *Coordinator {
return &Coordinator{
coordinatorHealthChecker: coordinatorHealthChecker,
hubElector: elector,
}
}

func (coordinator *Coordinator) Run(stopCh <-chan struct{}) {
intervalTicker := time.NewTicker(5 * time.Second)
defer intervalTicker.Stop()

for {
select {
case <-stopCh:
klog.Infof("exit normally in coordinator loop.")
return
case <-intervalTicker.C:
if !coordinator.coordinatorHealthChecker.IsHealthy() {
coordinator.informerStarted = false
// stop shared informer
break
}

if !coordinator.cacheIsUploaded() {
// upload local cache, and make sure yurthub pod is the last resource uploaded
}

if coordinator.hubElector.Status() != LeaderHub {
if coordinator.informerStarted {
// stop shared informer
//
coordinator.informerStarted = false
}
break
} else {
if !coordinator.informerStarted {
coordinator.informerStarted = true
// start shared informer for pool-scope data
// make sure

// start shared informer for lease delegating
//
}
}
}
}
}

func (coordinator *Coordinator) cacheIsUploaded() bool {
// check yurthub pod is uploaded
return true
}

func (coordinator *Coordinator) IsReady() bool {

return true
}
141 changes: 141 additions & 0 deletions pkg/yurthub/poolcoordinator/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2022 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poolcoordinator

import (
"context"
"sync/atomic"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
)

const (
InitHub int32 = iota // 0
LeaderHub
FollowerHub
PendingHub
)

type HubElector struct {
coordinatorClient kubernetes.Interface
coordinatorHealthChecker healthchecker.HealthChecker
cloudAPIServerHealthChecker healthchecker.HealthChecker
electorStatus int32
le *leaderelection.LeaderElector
inElecting bool
}

func NewHubElector(cfg *config.YurtHubConfiguration, coordinatorClient kubernetes.Interface, cloudAPIServerHealthyChecker healthchecker.HealthChecker, stopCh <-chan struct{}) (*HubElector, error) {
coordinatorHealthyChecker, err := healthchecker.NewCoordinatorHealthChecker(cfg, coordinatorClient, cloudAPIServerHealthyChecker, stopCh)
if err != nil {
return nil, err
}

he := &HubElector{
coordinatorClient: coordinatorClient,
coordinatorHealthChecker: coordinatorHealthyChecker,
cloudAPIServerHealthChecker: cloudAPIServerHealthyChecker,
electorStatus: InitHub,
}

rl, err := resourcelock.New(cfg.LeaderElection.ResourceLock,
cfg.LeaderElection.ResourceNamespace,
cfg.LeaderElection.ResourceName,
coordinatorClient.CoreV1(),
coordinatorClient.CoordinationV1(),
resourcelock.ResourceLockConfig{Identity: cfg.NodeName})
if err != nil {
return nil, err
}

le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: cfg.LeaderElection.LeaseDuration.Duration,
RenewDeadline: cfg.LeaderElection.RenewDeadline.Duration,
RetryPeriod: cfg.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("yurthub of %s became lease", cfg.NodeName)
atomic.StoreInt32(&he.electorStatus, LeaderHub)
},
OnStoppedLeading: func() {

},
},
})
if err != nil {
return nil, err
}
he.le = le

return he, nil
}

func (he *HubElector) Run(stopCh <-chan struct{}) {
intervalTicker := time.NewTicker(5 * time.Second)
defer intervalTicker.Stop()

var ctx context.Context
var cancel context.CancelFunc
for {
select {
case <-stopCh:
klog.Infof("exit normally in leader election loop.")

if cancel != nil {
cancel()
he.inElecting = false
}
return
case <-intervalTicker.C:
if !he.coordinatorHealthChecker.IsHealthy() {
atomic.StoreInt32(&he.electorStatus, PendingHub)
if he.inElecting && cancel != nil {
cancel()
he.inElecting = false
}
break
}

if !he.cloudAPIServerHealthChecker.IsHealthy() {
atomic.StoreInt32(&he.electorStatus, FollowerHub)
if he.inElecting && cancel != nil {
cancel()
he.inElecting = false
}
break
}

if !he.inElecting {
ctx, cancel = context.WithCancel(context.TODO())
go he.le.Run(ctx)
he.inElecting = true
}
}
}
}

func (he *HubElector) Status() int32 {
return atomic.LoadInt32(&he.electorStatus)
}

0 comments on commit 06e3154

Please sign in to comment.