Skip to content

Commit

Permalink
fix transport race conditions in yurthub (openyurtio#683)
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch authored Dec 16, 2021
1 parent 59810a9 commit bf55c9f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 33 deletions.
68 changes: 38 additions & 30 deletions pkg/yurthub/proxy/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ import (

// RemoteProxy is an reverse proxy for remote server
type RemoteProxy struct {
checker healthchecker.HealthChecker
reverseProxy *httputil.ReverseProxy
cacheMgr cachemanager.CacheManager
remoteServer *url.URL
filterChain filter.Interface
currentTransport http.RoundTripper
bearerTransport http.RoundTripper
upgradeHandler *proxy.UpgradeAwareHandler
stopCh <-chan struct{}
checker healthchecker.HealthChecker
reverseProxy *httputil.ReverseProxy
cacheMgr cachemanager.CacheManager
remoteServer *url.URL
filterChain filter.Interface
currentTransport http.RoundTripper
bearerTransport http.RoundTripper
upgradeHandler *proxy.UpgradeAwareHandler
bearerUpgradeHandler *proxy.UpgradeAwareHandler
stopCh <-chan struct{}
}

type responder struct{}
Expand All @@ -74,22 +75,25 @@ func NewRemoteProxy(remoteServer *url.URL,
return nil, fmt.Errorf("could not get bearer transport when init proxy backend(%s)", remoteServer.String())
}

upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, nil, false, true, &responder{})
upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, currentTransport, false, true, &responder{})
upgradeAwareHandler.UseRequestLocation = true
bearerUpgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, bearerTransport, false, true, &responder{})
bearerUpgradeAwareHandler.UseRequestLocation = true

proxyBackend := &RemoteProxy{
checker: healthChecker,
reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer),
cacheMgr: cacheMgr,
remoteServer: remoteServer,
filterChain: filterChain,
currentTransport: currentTransport,
bearerTransport: bearerTransport,
upgradeHandler: upgradeAwareHandler,
stopCh: stopCh,
checker: healthChecker,
reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer),
cacheMgr: cacheMgr,
remoteServer: remoteServer,
filterChain: filterChain,
currentTransport: currentTransport,
bearerTransport: bearerTransport,
upgradeHandler: upgradeAwareHandler,
bearerUpgradeHandler: bearerUpgradeAwareHandler,
stopCh: stopCh,
}

proxyBackend.reverseProxy.Transport = currentTransport
proxyBackend.reverseProxy.Transport = proxyBackend
proxyBackend.reverseProxy.ModifyResponse = proxyBackend.modifyResponse
proxyBackend.reverseProxy.FlushInterval = -1
proxyBackend.reverseProxy.ErrorHandler = proxyBackend.errorHandler
Expand All @@ -106,21 +110,13 @@ func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if httpstream.IsUpgradeRequest(req) {
klog.V(5).Infof("get upgrade request %s", req.URL)
if isBearerRequest(req) {
rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.bearerTransport, proxy.MirrorRequest)
rp.bearerUpgradeHandler.ServeHTTP(rw, req)
} else {
rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.currentTransport, proxy.MirrorRequest)
rp.upgradeHandler.ServeHTTP(rw, req)
}
rp.upgradeHandler.ServeHTTP(rw, req)
return
}

rp.reverseProxy.Transport = rp.currentTransport
// when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub,
// Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access
// yurthub, Authorization header in request will be empty.
if isBearerRequest(req) {
rp.reverseProxy.Transport = rp.bearerTransport
}
rp.reverseProxy.ServeHTTP(rw, req)
}

Expand Down Expand Up @@ -223,6 +219,18 @@ func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, e
rw.WriteHeader(http.StatusBadGateway)
}

// RoundTrip is used to implement http.RoundTripper for RemoteProxy.
func (rp *RemoteProxy) RoundTrip(req *http.Request) (*http.Response, error) {
// when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub,
// Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access
// yurthub, Authorization header in request will be empty.
if isBearerRequest(req) {
return rp.bearerTransport.RoundTrip(req)
}

return rp.currentTransport.RoundTrip(req)
}

func isBearerRequest(req *http.Request) bool {
auth := strings.TrimSpace(req.Header.Get("Authorization"))
if auth != "" {
Expand Down
12 changes: 9 additions & 3 deletions pkg/yurthub/util/connrotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *closableConn) Close() error {
remain := len(c.dialer.addrConns[c.addr])
if remain >= 1 {
delete(c.dialer.addrConns[c.addr], c)
remain--
remain = len(c.dialer.addrConns[c.addr])
}
c.dialer.mu.Unlock()
klog.Infof("close connection from %s to %s for %s dialer, remain %d connections", c.Conn.LocalAddr().String(), c.addr, c.dialer.name, remain)
Expand Down Expand Up @@ -124,7 +124,12 @@ func (d *Dialer) Dial(network, address string) (net.Conn, error) {
func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
conn, err := d.dial(ctx, network, address)
if err != nil {
klog.V(3).Infof("%s dialer failed to dial: %v, and total connections: %d", d.name, err, len(d.addrConns[address]))
if klog.V(3).Enabled() {
d.mu.Lock()
size := len(d.addrConns[address])
d.mu.Unlock()
klog.Infof("%s dialer failed to dial: %v, and total connections: %d", d.name, err, size)
}
return nil, err
}

Expand All @@ -140,9 +145,10 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
d.addrConns[address] = make(map[*closableConn]struct{})
}
d.addrConns[address][closable] = struct{}{}
size := len(d.addrConns[address])
d.mu.Unlock()

klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, len(d.addrConns[address]), d.name)
klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, size, d.name)
metrics.Metrics.IncClosableConns(address)
return closable, nil
}

0 comments on commit bf55c9f

Please sign in to comment.