From bf55c9f12c4f4a7f9d47f77ef849fddcff9733e5 Mon Sep 17 00:00:00 2001 From: rambohe Date: Thu, 16 Dec 2021 13:55:20 +0800 Subject: [PATCH] fix transport race conditions in yurthub (#683) --- pkg/yurthub/proxy/remote/remote.go | 68 +++++++++++++++++------------- pkg/yurthub/util/connrotation.go | 12 ++++-- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index 0cb33d598bb..a7ba3fbe7e6 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -40,15 +40,16 @@ import ( // RemoteProxy is an reverse proxy for remote server type RemoteProxy struct { - checker healthchecker.HealthChecker - reverseProxy *httputil.ReverseProxy - cacheMgr cachemanager.CacheManager - remoteServer *url.URL - filterChain filter.Interface - currentTransport http.RoundTripper - bearerTransport http.RoundTripper - upgradeHandler *proxy.UpgradeAwareHandler - stopCh <-chan struct{} + checker healthchecker.HealthChecker + reverseProxy *httputil.ReverseProxy + cacheMgr cachemanager.CacheManager + remoteServer *url.URL + filterChain filter.Interface + currentTransport http.RoundTripper + bearerTransport http.RoundTripper + upgradeHandler *proxy.UpgradeAwareHandler + bearerUpgradeHandler *proxy.UpgradeAwareHandler + stopCh <-chan struct{} } type responder struct{} @@ -74,22 +75,25 @@ func NewRemoteProxy(remoteServer *url.URL, return nil, fmt.Errorf("could not get bearer transport when init proxy backend(%s)", remoteServer.String()) } - upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, nil, false, true, &responder{}) + upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, currentTransport, false, true, &responder{}) upgradeAwareHandler.UseRequestLocation = true + bearerUpgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, bearerTransport, false, true, &responder{}) + bearerUpgradeAwareHandler.UseRequestLocation = true proxyBackend := &RemoteProxy{ - checker: healthChecker, - reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer), - cacheMgr: cacheMgr, - remoteServer: remoteServer, - filterChain: filterChain, - currentTransport: currentTransport, - bearerTransport: bearerTransport, - upgradeHandler: upgradeAwareHandler, - stopCh: stopCh, + checker: healthChecker, + reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer), + cacheMgr: cacheMgr, + remoteServer: remoteServer, + filterChain: filterChain, + currentTransport: currentTransport, + bearerTransport: bearerTransport, + upgradeHandler: upgradeAwareHandler, + bearerUpgradeHandler: bearerUpgradeAwareHandler, + stopCh: stopCh, } - proxyBackend.reverseProxy.Transport = currentTransport + proxyBackend.reverseProxy.Transport = proxyBackend proxyBackend.reverseProxy.ModifyResponse = proxyBackend.modifyResponse proxyBackend.reverseProxy.FlushInterval = -1 proxyBackend.reverseProxy.ErrorHandler = proxyBackend.errorHandler @@ -106,21 +110,13 @@ func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if httpstream.IsUpgradeRequest(req) { klog.V(5).Infof("get upgrade request %s", req.URL) if isBearerRequest(req) { - rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.bearerTransport, proxy.MirrorRequest) + rp.bearerUpgradeHandler.ServeHTTP(rw, req) } else { - rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.currentTransport, proxy.MirrorRequest) + rp.upgradeHandler.ServeHTTP(rw, req) } - rp.upgradeHandler.ServeHTTP(rw, req) return } - rp.reverseProxy.Transport = rp.currentTransport - // when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub, - // Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access - // yurthub, Authorization header in request will be empty. - if isBearerRequest(req) { - rp.reverseProxy.Transport = rp.bearerTransport - } rp.reverseProxy.ServeHTTP(rw, req) } @@ -223,6 +219,18 @@ func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, e rw.WriteHeader(http.StatusBadGateway) } +// RoundTrip is used to implement http.RoundTripper for RemoteProxy. +func (rp *RemoteProxy) RoundTrip(req *http.Request) (*http.Response, error) { + // when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub, + // Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access + // yurthub, Authorization header in request will be empty. + if isBearerRequest(req) { + return rp.bearerTransport.RoundTrip(req) + } + + return rp.currentTransport.RoundTrip(req) +} + func isBearerRequest(req *http.Request) bool { auth := strings.TrimSpace(req.Header.Get("Authorization")) if auth != "" { diff --git a/pkg/yurthub/util/connrotation.go b/pkg/yurthub/util/connrotation.go index b688713a833..eef766fddaf 100644 --- a/pkg/yurthub/util/connrotation.go +++ b/pkg/yurthub/util/connrotation.go @@ -42,7 +42,7 @@ func (c *closableConn) Close() error { remain := len(c.dialer.addrConns[c.addr]) if remain >= 1 { delete(c.dialer.addrConns[c.addr], c) - remain-- + remain = len(c.dialer.addrConns[c.addr]) } c.dialer.mu.Unlock() klog.Infof("close connection from %s to %s for %s dialer, remain %d connections", c.Conn.LocalAddr().String(), c.addr, c.dialer.name, remain) @@ -124,7 +124,12 @@ func (d *Dialer) Dial(network, address string) (net.Conn, error) { func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { conn, err := d.dial(ctx, network, address) if err != nil { - klog.V(3).Infof("%s dialer failed to dial: %v, and total connections: %d", d.name, err, len(d.addrConns[address])) + if klog.V(3).Enabled() { + d.mu.Lock() + size := len(d.addrConns[address]) + d.mu.Unlock() + klog.Infof("%s dialer failed to dial: %v, and total connections: %d", d.name, err, size) + } return nil, err } @@ -140,9 +145,10 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net. d.addrConns[address] = make(map[*closableConn]struct{}) } d.addrConns[address][closable] = struct{}{} + size := len(d.addrConns[address]) d.mu.Unlock() - klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, len(d.addrConns[address]), d.name) + klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, size, d.name) metrics.Metrics.IncClosableConns(address) return closable, nil }