Skip to content

Commit

Permalink
Merge pull request openyurtio#482 from SataQiu/fix-health-check
Browse files Browse the repository at this point in the history
yurthub: stop renew node lease when kubelet's heartbeat is stopped
  • Loading branch information
rambohe-ch authored Sep 24, 2021
2 parents 4953d62 + 29369d2 commit 1be5351
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 13 deletions.
2 changes: 2 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type YurtHubConfiguration struct {
SharedFactory informers.SharedInformerFactory
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -158,6 +159,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
Filters: filters,
SharedFactory: sharedFactory,
YurtSharedFactory: yurtSharedFactory,
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
}

return cfg, nil
Expand Down
4 changes: 4 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"path/filepath"
"time"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
Expand Down Expand Up @@ -63,6 +64,7 @@ type YurtHubOptions struct {
EnableResourceFilter bool
DisabledResourceFilters []string
WorkingMode string
KubeletHealthGracePeriod time.Duration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -92,6 +94,7 @@ func NewYurtHubOptions() *YurtHubOptions {
EnableResourceFilter: true,
DisabledResourceFilters: make([]string, 0),
WorkingMode: string(util.WorkingModeEdge),
KubeletHealthGracePeriod: time.Second * 40,
}
return o
}
Expand Down Expand Up @@ -156,6 +159,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&o.DisabledResourceFilters, "disabled-resource-filters", o.DisabledResourceFilters, "disable resource filters to handle response")
fs.StringVar(&o.NodePoolName, "nodepool-name", o.NodePoolName, "the name of node pool that runs hub agent")
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).")
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
}

// verifyDummyIP verify the specified ip is valid or not
Expand Down
5 changes: 5 additions & 0 deletions pkg/yurthub/healthchecker/fake_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package healthchecker

import (
"net/url"
"time"
)

type fakeChecker struct {
Expand Down Expand Up @@ -48,6 +49,10 @@ func (fc *fakeChecker) Run() {
return
}

func (fc *fakeChecker) UpdateLastKubeletLeaseReqTime(time.Time) {
return
}

// NewFakeChecker creates a fake checker
func NewFakeChecker(healthy bool, settings map[string]int) HealthChecker {
return &fakeChecker{
Expand Down
58 changes: 46 additions & 12 deletions pkg/yurthub/healthchecker/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,23 @@ const (
// HealthChecker is an interface for checking healthy stats of server
type HealthChecker interface {
IsHealthy(server *url.URL) bool
UpdateLastKubeletLeaseReqTime(time.Time)
Run()
}

type setNodeLease func(*coordinationv1.Lease) error
type getNodeLease func() *coordinationv1.Lease

type healthCheckerManager struct {
remoteServers []*url.URL
checkers map[string]*checker
latestLease *coordinationv1.Lease
sw cachemanager.StorageWrapper
remoteServerIndex int
stopCh <-chan struct{}
sync.RWMutex
remoteServers []*url.URL
checkers map[string]*checker
latestLease *coordinationv1.Lease
sw cachemanager.StorageWrapper
remoteServerIndex int
lastKubeletLeaseReqTime time.Time
healthCheckGracePeriod time.Duration
stopCh <-chan struct{}
}

type checker struct {
Expand All @@ -76,11 +80,12 @@ func NewHealthChecker(cfg *config.YurtHubConfiguration, tp transport.Interface,
}

hcm := &healthCheckerManager{
checkers: make(map[string]*checker),
remoteServers: cfg.RemoteServers,
remoteServerIndex: 0,
sw: cfg.StorageWrapper,
stopCh: stopCh,
checkers: make(map[string]*checker),
remoteServers: cfg.RemoteServers,
remoteServerIndex: 0,
sw: cfg.StorageWrapper,
healthCheckGracePeriod: cfg.KubeletHealthGracePeriod,
stopCh: stopCh,
}

for _, remoteServer := range cfg.RemoteServers {
Expand Down Expand Up @@ -120,10 +125,23 @@ func (hcm *healthCheckerManager) healthzCheckLoop(stopCh <-chan struct{}) {
}

func (hcm *healthCheckerManager) sync() {
isKubeletStoped := false
lastKubeletLeaseReqTime := hcm.getLastKubeletLeaseReqTime()
if !lastKubeletLeaseReqTime.IsZero() && hcm.healthCheckGracePeriod > 0 {
isKubeletStoped = time.Now().After(lastKubeletLeaseReqTime.Add(hcm.healthCheckGracePeriod))
}

// Ensure that the node heartbeat can be reported when there is a healthy remote server.
//try detect all remote server in a loop, if there is an remote server can update nodeLease, exit the loop.
for i := 0; i < len(hcm.remoteServers); i++ {
c := hcm.getChecker()

if isKubeletStoped {
klog.Warningf("kubelet does not post lease request for more than %v, stop renew node lease and assume remote server is unhealthy", hcm.healthCheckGracePeriod)
c.markAsUnhealthy()
continue
}

if c.check() {
break
}
Expand Down Expand Up @@ -162,6 +180,18 @@ func (hcm *healthCheckerManager) IsHealthy(server *url.URL) bool {
return false
}

func (hcm *healthCheckerManager) UpdateLastKubeletLeaseReqTime(lastKubeletLeaseReqTime time.Time) {
hcm.Lock()
defer hcm.Unlock()
hcm.lastKubeletLeaseReqTime = lastKubeletLeaseReqTime
}

func (hcm *healthCheckerManager) getLastKubeletLeaseReqTime() time.Time {
hcm.RLock()
defer hcm.RUnlock()
return hcm.lastKubeletLeaseReqTime
}

func newChecker(
cfg *config.YurtHubConfiguration,
tp transport.Interface,
Expand Down Expand Up @@ -213,6 +243,11 @@ func (c *checker) check() bool {
}

klog.Infof("failed to update lease: %v, remote server %s", err, c.remoteServer.String())
c.markAsUnhealthy()
return false
}

func (c *checker) markAsUnhealthy() {
if c.onFailureFunc != nil {
c.onFailureFunc(c.remoteServer.Host)
}
Expand All @@ -224,7 +259,6 @@ func (c *checker) check() bool {
c.lastTime = now
metrics.Metrics.ObserveServerHealthy(c.remoteServer.Host, 0)
}
return false
}

func (c *checker) isHealthy() bool {
Expand Down
9 changes: 8 additions & 1 deletion pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package proxy

import (
"net/http"
"time"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
Expand All @@ -39,6 +40,7 @@ import (
type yurtReverseProxy struct {
resolver apirequest.RequestInfoResolver
loadBalancer remote.LoadBalancer
checker healthchecker.HealthChecker
localProxy *local.LocalProxy
cacheMgr cachemanager.CacheManager
maxRequestsInFlight int
Expand Down Expand Up @@ -83,6 +85,7 @@ func NewYurtReverseProxyHandler(
yurtProxy := &yurtReverseProxy{
resolver: resolver,
loadBalancer: lb,
checker: healthChecker,
localProxy: localProxy,
cacheMgr: cacheMgr,
maxRequestsInFlight: yurtHubCfg.MaxRequestInFlight,
Expand All @@ -109,9 +112,13 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler
}

func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if !hubutil.IsKubeletLeaseReq(req) && p.loadBalancer.IsHealthy() || p.localProxy == nil {
isKubeletLeaseReq := hubutil.IsKubeletLeaseReq(req)
if !isKubeletLeaseReq && p.loadBalancer.IsHealthy() || p.localProxy == nil {
p.loadBalancer.ServeHTTP(rw, req)
} else {
if isKubeletLeaseReq {
p.checker.UpdateLastKubeletLeaseReqTime(time.Now())
}
p.localProxy.ServeHTTP(rw, req)
}
}

0 comments on commit 1be5351

Please sign in to comment.