Skip to content

Commit

Permalink
add timeout config in yurthub to handle those watch requests without …
Browse files Browse the repository at this point in the history
…timeoutSeconds parameter like apiserver. (#1056)
  • Loading branch information
AndyEWang authored Nov 15, 2022
1 parent 92ae4b1 commit 45266e0
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 14 deletions.
2 changes: 2 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type YurtHubConfiguration struct {
FilterManager *manager.Manager
CertIPs []net.IP
CoordinatorServer *url.URL
MinRequestTimeout time.Duration
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -174,6 +175,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterManager: filterManager,
CertIPs: certIPs,
MinRequestTimeout: options.MinRequestTimeout,
}

return cfg, nil
Expand Down
3 changes: 3 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type YurtHubOptions struct {
WorkingMode string
KubeletHealthGracePeriod time.Duration
EnableNodePool bool
MinRequestTimeout time.Duration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -104,6 +105,7 @@ func NewYurtHubOptions() *YurtHubOptions {
WorkingMode: string(util.WorkingModeEdge),
KubeletHealthGracePeriod: time.Second * 40,
EnableNodePool: true,
MinRequestTimeout: time.Second * 1800,
}
return o
}
Expand Down Expand Up @@ -168,6 +170,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).")
fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease")
fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)")
fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.")
}

// verifyDummyIP verify the specified ip is valid or not and set the default ip if empty
Expand Down
18 changes: 11 additions & 7 deletions pkg/yurthub/proxy/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"strconv"
"time"
Expand Down Expand Up @@ -48,15 +49,17 @@ type IsHealthy func() bool

// LocalProxy is responsible for handling requests when remote servers are unhealthy
type LocalProxy struct {
cacheMgr manager.CacheManager
isHealthy IsHealthy
cacheMgr manager.CacheManager
isHealthy IsHealthy
minRequestTimeout time.Duration
}

// NewLocalProxy creates a *LocalProxy
func NewLocalProxy(cacheMgr manager.CacheManager, isHealthy IsHealthy) *LocalProxy {
func NewLocalProxy(cacheMgr manager.CacheManager, isHealthy IsHealthy, minRequestTimeout time.Duration) *LocalProxy {
return &LocalProxy{
cacheMgr: cacheMgr,
isHealthy: isHealthy,
cacheMgr: cacheMgr,
isHealthy: isHealthy,
minRequestTimeout: minRequestTimeout,
}
}

Expand Down Expand Up @@ -173,8 +176,9 @@ func (lp *LocalProxy) localWatch(w http.ResponseWriter, req *http.Request) error
timeout := time.Duration(0)
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
} else {
return nil
}
if timeout == 0 && lp.minRequestTimeout > 0 {
timeout = time.Duration(float64(lp.minRequestTimeout) * (rand.Float64() + 1.0))
}

watchTimer := time.NewTimer(timeout)
Expand Down
103 changes: 97 additions & 6 deletions pkg/yurthub/proxy/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestServeHTTPForWatch(t *testing.T) {
return false
}

lp := NewLocalProxy(cacheM, fn)
lp := NewLocalProxy(cacheM, fn, 0)

testcases := map[string]struct {
userAgent string
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) {
return cnt > 2 // after 6 seconds, become healthy
}

lp := NewLocalProxy(cacheM, fn)
lp := NewLocalProxy(cacheM, fn, 0)

testcases := map[string]struct {
userAgent string
Expand Down Expand Up @@ -230,7 +230,98 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) {
t.Errorf("Got error %v, unable to remove path %s", err, rootDir)
}
}
func TestServeHTTPForWatchWithMinRequestTimeout(t *testing.T) {
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
sWrapper := cachemanager.NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil, nil)

fn := func() bool {
return false
}

lp := NewLocalProxy(cacheM, fn, 10*time.Second)

testcases := map[string]struct {
userAgent string
accept string
verb string
path string
code int
floor time.Duration
ceil time.Duration
}{
"watch request": {
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/api/v1/nodes?watch=true&timeoutSeconds=5",
code: http.StatusOK,
floor: 5 * time.Second,
ceil: 6 * time.Second,
},
"watch request without timeout": {
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/api/v1/nodes?watch=true",
code: http.StatusOK,
floor: 10 * time.Second,
ceil: 20 * time.Second,
},
}

resolver := newTestRequestInfoResolver()

for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
req, _ := http.NewRequest(tt.verb, tt.path, nil)
if len(tt.accept) != 0 {
req.Header.Set("Accept", tt.accept)
}

if len(tt.userAgent) != 0 {
req.Header.Set("User-Agent", tt.userAgent)
}
req.RemoteAddr = "127.0.0.1"

var start, end time.Time
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
start = time.Now()
lp.ServeHTTP(w, req)
end = time.Now()
})

handler = proxyutil.WithRequestClientComponent(handler)
handler = proxyutil.WithRequestContentType(handler)
handler = filters.WithRequestInfo(handler, resolver)

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
result := resp.Result()
if result.StatusCode != tt.code {
t.Errorf("got status code %d, but expect %d", result.StatusCode, tt.code)
}

if tt.floor.Seconds() != 0 {
if start.Add(tt.floor).After(end) {
t.Errorf("exec time is less than floor time %v", tt.floor)
}
}

if start.Add(tt.ceil).Before(end) {
t.Errorf("exec time is more than ceil time %v", tt.ceil)
}
})
}

if err = os.RemoveAll(rootDir); err != nil {
t.Errorf("Got error %v, unable to remove path %s", err, rootDir)
}
}
func TestServeHTTPForPost(t *testing.T) {
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
Expand All @@ -244,7 +335,7 @@ func TestServeHTTPForPost(t *testing.T) {
return false
}

lp := NewLocalProxy(cacheM, fn)
lp := NewLocalProxy(cacheM, fn, 0)

testcases := map[string]struct {
userAgent string
Expand Down Expand Up @@ -324,7 +415,7 @@ func TestServeHTTPForDelete(t *testing.T) {
return false
}

lp := NewLocalProxy(cacheM, fn)
lp := NewLocalProxy(cacheM, fn, 0)

testcases := map[string]struct {
userAgent string
Expand Down Expand Up @@ -391,7 +482,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) {
return false
}

lp := NewLocalProxy(cacheM, fn)
lp := NewLocalProxy(cacheM, fn, 0)

testcases := map[string]struct {
userAgent string
Expand Down Expand Up @@ -531,7 +622,7 @@ func TestServeHTTPForListReqCache(t *testing.T) {
return false
}

lp := NewLocalProxy(cacheM, fn)
lp := NewLocalProxy(cacheM, fn, 0)

testcases := map[string]struct {
userAgent string
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewYurtReverseProxyHandler(
// When yurthub is working in cloud mode, cacheMgr will be set to nil which means the local cache is disabled,
// so we don't need to create a LocalProxy.
if cacheMgr != nil {
localProxy = local.NewLocalProxy(cacheMgr, healthChecker.IsHealthy)
localProxy = local.NewLocalProxy(cacheMgr, healthChecker.IsHealthy, yurtHubCfg.MinRequestTimeout)
localProxy = local.WithFakeTokenInject(localProxy, yurtHubCfg.SerializerManager)
}

Expand Down

0 comments on commit 45266e0

Please sign in to comment.