Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add yurthub leader election and coordinator framework #1035

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
improve health checker for adapting coordinator
rambohe-ch committed Oct 24, 2022

Verified

This commit was signed with the committer’s verified signature.
webknjaz 🇺🇦 Sviatoslav Sydorenko (Святослав Сидоренко)
commit de6703b9a26f33f108b0690d9094efc1468a4aad
3 changes: 3 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ type YurtHubConfiguration struct {
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
@@ -92,6 +93,7 @@ type YurtHubConfiguration struct {
KubeletHealthGracePeriod time.Duration
FilterManager *filter.Manager
CertIPs []net.IP
CoordinatorServer *url.URL
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
@@ -159,6 +161,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
HeartbeatFailedRetry: options.HeartbeatFailedRetry,
HeartbeatHealthyThreshold: options.HeartbeatHealthyThreshold,
HeartbeatTimeoutSeconds: options.HeartbeatTimeoutSeconds,
HeartbeatIntervalSeconds: options.HeartbeatIntervalSeconds,
MaxRequestInFlight: options.MaxRequestInFlight,
JoinToken: options.JoinToken,
RootDir: options.RootDir,
3 changes: 3 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ type YurtHubOptions struct {
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
@@ -89,6 +90,7 @@ func NewYurtHubOptions() *YurtHubOptions {
HeartbeatFailedRetry: 3,
HeartbeatHealthyThreshold: 2,
HeartbeatTimeoutSeconds: 2,
HeartbeatIntervalSeconds: 10,
MaxRequestInFlight: 250,
RootDir: filepath.Join("/var/lib/", projectinfo.GetHubName()),
EnableProfiling: true,
@@ -148,6 +150,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.")
fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.")
fs.IntVar(&o.HeartbeatTimeoutSeconds, "heartbeat-timeout-seconds", o.HeartbeatTimeoutSeconds, " number of seconds after which the heartbeat times out.")
fs.IntVar(&o.HeartbeatIntervalSeconds, "heartbeat-interval-seconds", o.HeartbeatIntervalSeconds, " number of seconds for omitting one time heartbeat to remote server.")
fs.IntVar(&o.MaxRequestInFlight, "max-requests-in-flight", o.MaxRequestInFlight, "the maximum number of parallel requests.")
fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.")
fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).")
52 changes: 46 additions & 6 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
@@ -18,12 +18,15 @@ package app

import (
"fmt"
"net/url"
"path/filepath"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
@@ -33,7 +36,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
@@ -112,10 +115,17 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

var healthChecker healthchecker.HealthChecker
klog.Infof("%d. prepare for health checker clients", trace)
healthCheckerClientsForCloud, _, err := createHealthCheckerClient(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServer, transportManager)
if err != nil {
return fmt.Errorf("failed to create health checker clients, %w", err)
}
trace++

var healthChecker healthchecker.MultipleBackendsHealthChecker
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checker for remote servers ", trace)
healthChecker, err = healthchecker.NewHealthChecker(cfg, transportManager, stopCh)
healthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, healthCheckerClientsForCloud, stopCh)
if err != nil {
return fmt.Errorf("could not new health checker, %w", err)
}
@@ -125,11 +135,10 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
// This fake checker will always report that the remote server is healthy.
healthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
}
healthChecker.Run()
trace++

klog.Infof("%d. new restConfig manager for %s mode", trace, cfg.CertMgrMode)
restConfigMgr, err := rest.NewRestConfigManager(cfg, certManager, healthChecker)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg, certManager, healthChecker)
if err != nil {
return fmt.Errorf("could not new restConfig manager, %w", err)
}
@@ -172,7 +181,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, tenantMgr, stopCh)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, tenantMgr, stopCh)

if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
@@ -203,3 +212,34 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
klog.Infof("hub agent exited")
return nil
}

func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, kubernetes.Interface, error) {
var healthCheckerClientForCoordinator kubernetes.Interface
healthCheckerClientsForCloud := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Host: remoteServers[i].String(),
Transport: tp.CurrentTransport(),
Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
}
c, err := kubernetes.NewForConfig(restConf)
if err != nil {
return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
}
healthCheckerClientsForCloud[remoteServers[i].String()] = c
}

// comment the following code temporarily
//cfg := &rest.Config{
// Host: coordinatorServer.String(),
// Transport: tp.CurrentTransport(),
// Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
//}
//c, err := kubernetes.NewForConfig(cfg)
//if err != nil {
// return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
//}
//healthCheckerClientForCoordinator = c

return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil
}
13 changes: 6 additions & 7 deletions pkg/yurthub/healthchecker/fake_checker.go
Original file line number Diff line number Diff line change
@@ -18,16 +18,15 @@ package healthchecker

import (
"net/url"
"time"
)

type fakeChecker struct {
healthy bool
settings map[string]int
}

// IsHealthy returns healthy status of server
func (fc *fakeChecker) IsHealthy(server *url.URL) bool {
// BackendHealthyStatus returns healthy status of server
func (fc *fakeChecker) BackendHealthyStatus(server *url.URL) bool {
s := server.String()
if _, ok := fc.settings[s]; !ok {
return fc.healthy
@@ -45,16 +44,16 @@ func (fc *fakeChecker) IsHealthy(server *url.URL) bool {
return fc.healthy
}

func (fc *fakeChecker) Run() {
return
func (fc *fakeChecker) IsHealthy() bool {
return fc.healthy
}

func (fc *fakeChecker) UpdateLastKubeletLeaseReqTime(time.Time) {
func (fc *fakeChecker) RenewKubeletLeaseTime() {
return
}

// NewFakeChecker creates a fake checker
func NewFakeChecker(healthy bool, settings map[string]int) HealthChecker {
func NewFakeChecker(healthy bool, settings map[string]int) MultipleBackendsHealthChecker {
return &fakeChecker{
settings: settings,
healthy: healthy,
Loading