diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 82db73f8489..dbb426c5db5 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -91,6 +91,7 @@ type YurtHubConfiguration struct { FilterManager *manager.Manager CertIPs []net.IP CoordinatorServer *url.URL + MinRequestTimeout time.Duration } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -174,6 +175,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { KubeletHealthGracePeriod: options.KubeletHealthGracePeriod, FilterManager: filterManager, CertIPs: certIPs, + MinRequestTimeout: options.MinRequestTimeout, } return cfg, nil diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index 7670f77c1c2..171989c91f9 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -73,6 +73,7 @@ type YurtHubOptions struct { WorkingMode string KubeletHealthGracePeriod time.Duration EnableNodePool bool + MinRequestTimeout time.Duration } // NewYurtHubOptions creates a new YurtHubOptions with a default config. @@ -104,6 +105,7 @@ func NewYurtHubOptions() *YurtHubOptions { WorkingMode: string(util.WorkingModeEdge), KubeletHealthGracePeriod: time.Second * 40, EnableNodePool: true, + MinRequestTimeout: time.Second * 1800, } return o } @@ -168,6 +170,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.WorkingMode, "working-mode", o.WorkingMode, "the working mode of yurthub(edge, cloud).") fs.DurationVar(&o.KubeletHealthGracePeriod, "kubelet-health-grace-period", o.KubeletHealthGracePeriod, "the amount of time which we allow kubelet to be unresponsive before stop renew node lease") fs.BoolVar(&o.EnableNodePool, "enable-node-pool", o.EnableNodePool, "enable list/watch nodepools resource or not for filters(only used for testing)") + fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.") } // verifyDummyIP verify the specified ip is valid or not and set the default ip if empty diff --git a/pkg/yurthub/proxy/local/local.go b/pkg/yurthub/proxy/local/local.go index ebb82be7789..b322150b034 100644 --- a/pkg/yurthub/proxy/local/local.go +++ b/pkg/yurthub/proxy/local/local.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "strconv" "time" @@ -48,15 +49,17 @@ type IsHealthy func() bool // LocalProxy is responsible for handling requests when remote servers are unhealthy type LocalProxy struct { - cacheMgr manager.CacheManager - isHealthy IsHealthy + cacheMgr manager.CacheManager + isHealthy IsHealthy + minRequestTimeout time.Duration } // NewLocalProxy creates a *LocalProxy -func NewLocalProxy(cacheMgr manager.CacheManager, isHealthy IsHealthy) *LocalProxy { +func NewLocalProxy(cacheMgr manager.CacheManager, isHealthy IsHealthy, minRequestTimeout time.Duration) *LocalProxy { return &LocalProxy{ - cacheMgr: cacheMgr, - isHealthy: isHealthy, + cacheMgr: cacheMgr, + isHealthy: isHealthy, + minRequestTimeout: minRequestTimeout, } } @@ -173,8 +176,9 @@ func (lp *LocalProxy) localWatch(w http.ResponseWriter, req *http.Request) error timeout := time.Duration(0) if opts.TimeoutSeconds != nil { timeout = time.Duration(*opts.TimeoutSeconds) * time.Second - } else { - return nil + } + if timeout == 0 && lp.minRequestTimeout > 0 { + timeout = time.Duration(float64(lp.minRequestTimeout) * (rand.Float64() + 1.0)) } watchTimer := time.NewTimer(timeout) diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index 6d08ce372e6..a1274de6a4a 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -66,7 +66,7 @@ func TestServeHTTPForWatch(t *testing.T) { return false } - lp := NewLocalProxy(cacheM, fn) + lp := NewLocalProxy(cacheM, fn, 0) testcases := map[string]struct { userAgent string @@ -160,7 +160,7 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) { return cnt > 2 // after 6 seconds, become healthy } - lp := NewLocalProxy(cacheM, fn) + lp := NewLocalProxy(cacheM, fn, 0) testcases := map[string]struct { userAgent string @@ -230,7 +230,98 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) { t.Errorf("Got error %v, unable to remove path %s", err, rootDir) } } +func TestServeHTTPForWatchWithMinRequestTimeout(t *testing.T) { + dStorage, err := disk.NewDiskStorage(rootDir) + if err != nil { + t.Errorf("failed to create disk storage, %v", err) + } + sWrapper := cachemanager.NewStorageWrapper(dStorage) + serializerM := serializer.NewSerializerManager() + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil, nil) + + fn := func() bool { + return false + } + + lp := NewLocalProxy(cacheM, fn, 10*time.Second) + + testcases := map[string]struct { + userAgent string + accept string + verb string + path string + code int + floor time.Duration + ceil time.Duration + }{ + "watch request": { + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/nodes?watch=true&timeoutSeconds=5", + code: http.StatusOK, + floor: 5 * time.Second, + ceil: 6 * time.Second, + }, + "watch request without timeout": { + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/nodes?watch=true", + code: http.StatusOK, + floor: 10 * time.Second, + ceil: 20 * time.Second, + }, + } + resolver := newTestRequestInfoResolver() + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + req, _ := http.NewRequest(tt.verb, tt.path, nil) + if len(tt.accept) != 0 { + req.Header.Set("Accept", tt.accept) + } + + if len(tt.userAgent) != 0 { + req.Header.Set("User-Agent", tt.userAgent) + } + req.RemoteAddr = "127.0.0.1" + + var start, end time.Time + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + start = time.Now() + lp.ServeHTTP(w, req) + end = time.Now() + }) + + handler = proxyutil.WithRequestClientComponent(handler) + handler = proxyutil.WithRequestContentType(handler) + handler = filters.WithRequestInfo(handler, resolver) + + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + result := resp.Result() + if result.StatusCode != tt.code { + t.Errorf("got status code %d, but expect %d", result.StatusCode, tt.code) + } + + if tt.floor.Seconds() != 0 { + if start.Add(tt.floor).After(end) { + t.Errorf("exec time is less than floor time %v", tt.floor) + } + } + + if start.Add(tt.ceil).Before(end) { + t.Errorf("exec time is more than ceil time %v", tt.ceil) + } + }) + } + + if err = os.RemoveAll(rootDir); err != nil { + t.Errorf("Got error %v, unable to remove path %s", err, rootDir) + } +} func TestServeHTTPForPost(t *testing.T) { dStorage, err := disk.NewDiskStorage(rootDir) if err != nil { @@ -244,7 +335,7 @@ func TestServeHTTPForPost(t *testing.T) { return false } - lp := NewLocalProxy(cacheM, fn) + lp := NewLocalProxy(cacheM, fn, 0) testcases := map[string]struct { userAgent string @@ -324,7 +415,7 @@ func TestServeHTTPForDelete(t *testing.T) { return false } - lp := NewLocalProxy(cacheM, fn) + lp := NewLocalProxy(cacheM, fn, 0) testcases := map[string]struct { userAgent string @@ -391,7 +482,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { return false } - lp := NewLocalProxy(cacheM, fn) + lp := NewLocalProxy(cacheM, fn, 0) testcases := map[string]struct { userAgent string @@ -531,7 +622,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { return false } - lp := NewLocalProxy(cacheM, fn) + lp := NewLocalProxy(cacheM, fn, 0) testcases := map[string]struct { userAgent string diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 203bffd7024..7c75115fef1 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -77,7 +77,7 @@ func NewYurtReverseProxyHandler( // When yurthub is working in cloud mode, cacheMgr will be set to nil which means the local cache is disabled, // so we don't need to create a LocalProxy. if cacheMgr != nil { - localProxy = local.NewLocalProxy(cacheMgr, healthChecker.IsHealthy) + localProxy = local.NewLocalProxy(cacheMgr, healthChecker.IsHealthy, yurtHubCfg.MinRequestTimeout) localProxy = local.WithFakeTokenInject(localProxy, yurtHubCfg.SerializerManager) }