Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] fix transport race conditions in yurthub #683

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 38 additions & 30 deletions pkg/yurthub/proxy/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ import (

// RemoteProxy is an reverse proxy for remote server
type RemoteProxy struct {
checker healthchecker.HealthChecker
reverseProxy *httputil.ReverseProxy
cacheMgr cachemanager.CacheManager
remoteServer *url.URL
filterChain filter.Interface
currentTransport http.RoundTripper
bearerTransport http.RoundTripper
upgradeHandler *proxy.UpgradeAwareHandler
stopCh <-chan struct{}
checker healthchecker.HealthChecker
reverseProxy *httputil.ReverseProxy
cacheMgr cachemanager.CacheManager
remoteServer *url.URL
filterChain filter.Interface
currentTransport http.RoundTripper
bearerTransport http.RoundTripper
upgradeHandler *proxy.UpgradeAwareHandler
bearerUpgradeHandler *proxy.UpgradeAwareHandler
stopCh <-chan struct{}
}

type responder struct{}
Expand All @@ -74,22 +75,25 @@ func NewRemoteProxy(remoteServer *url.URL,
return nil, fmt.Errorf("could not get bearer transport when init proxy backend(%s)", remoteServer.String())
}

upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, nil, false, true, &responder{})
upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, currentTransport, false, true, &responder{})
upgradeAwareHandler.UseRequestLocation = true
bearerUpgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, bearerTransport, false, true, &responder{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transport passed to NewUpgradeAwareHandler is assigned to UpgradeAwareHandler.Transport. It's used to handle non-upgrade request. So, it doesn't matter here. I think we should assign currentTransport and bearerTransport to UpgradeAwareHandler.UpgradeTransport.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Congrool From the code: https://github.com/kubernetes/apimachinery/blob/v0.18.8/pkg/util/proxy/upgradeaware.go#L398-L400

when UpgradeTransport is nil, UpgradeAwareHandler will use Transport to handle upgrade request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can group upgradeAwareHandler and bearerUpgradeAwareHandler into one.

upgradeAwareHandler and bearerUpgradeAwareHandler here are used to distinguish bearer and non-bearer requests and use different transport for them. The RemoteProxy is exactly what we need:

func (rp *RemoteProxy) RoundTrip(req *http.Request) (*http.Response, error) {
	// when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub,
	// Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access
	// yurthub, Authorization header in request will be empty.
	if isBearerRequest(req) {
		return rp.bearerTransport.RoundTrip(req)
	}

	return rp.currentTransport.RoundTrip(req)
}

I think we can remove bearerUpgradeAwareHandler and use proxyBackend as the transport param in NewUpgradeAwareHandler.

The codes should look like:

	proxyBackend := &RemoteProxy{
		checker:          healthChecker,
		reverseProxy:     httputil.NewSingleHostReverseProxy(remoteServer),
		cacheMgr:         cacheMgr,
		remoteServer:     remoteServer,
		filterChain:      filterChain,
		currentTransport: currentTransport,
		bearerTransport:  bearerTransport,
		stopCh:           stopCh,
	}

	upgradeAwareHandler := proxy.NewUpgradeAwareHandler(remoteServer, proxyBackend, false, true, &responder{})
	upgradeAwareHandler.UseRequestLocation = true

	proxyBackend.reverseProxy.Transport = proxyBackend
	proxyBackend.reverseProxy.ModifyResponse = proxyBackend.modifyResponse
	proxyBackend.reverseProxy.FlushInterval = -1
	proxyBackend.reverseProxy.ErrorHandler = proxyBackend.errorHandler
	proxyBackend.upgradeHandler = upgradeAwareHandler

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DrmagicE A good idea, i will modify it.

Copy link
Member Author

@rambohe-ch rambohe-ch Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DrmagicE I have checked the codes of NewUpgradeAwareHandler, and only *http.Transport as http. RoundTripper is supported, so use proxyBackend as http. RoundTripper can not work.
maybe we need to leave the current changed unchanged.

the code link is here: https://github.com/kubernetes/apimachinery/blob/master/pkg/util/net/http.go#L215-L239

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rambohe-ch Oh, what a pity. I got it.

bearerUpgradeAwareHandler.UseRequestLocation = true

proxyBackend := &RemoteProxy{
checker: healthChecker,
reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer),
cacheMgr: cacheMgr,
remoteServer: remoteServer,
filterChain: filterChain,
currentTransport: currentTransport,
bearerTransport: bearerTransport,
upgradeHandler: upgradeAwareHandler,
stopCh: stopCh,
checker: healthChecker,
reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer),
cacheMgr: cacheMgr,
remoteServer: remoteServer,
filterChain: filterChain,
currentTransport: currentTransport,
bearerTransport: bearerTransport,
upgradeHandler: upgradeAwareHandler,
bearerUpgradeHandler: bearerUpgradeAwareHandler,
stopCh: stopCh,
}

proxyBackend.reverseProxy.Transport = currentTransport
proxyBackend.reverseProxy.Transport = proxyBackend
proxyBackend.reverseProxy.ModifyResponse = proxyBackend.modifyResponse
proxyBackend.reverseProxy.FlushInterval = -1
proxyBackend.reverseProxy.ErrorHandler = proxyBackend.errorHandler
Expand All @@ -106,21 +110,13 @@ func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if httpstream.IsUpgradeRequest(req) {
klog.V(5).Infof("get upgrade request %s", req.URL)
if isBearerRequest(req) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in https://github.com/openyurtio/openyurt/pull/683/files#r769771572, if we group upgradeAwareHandler and bearerUpgradeAwareHandler into one, the codes here can be simplified as follow:

	if httpstream.IsUpgradeRequest(req) {
		klog.V(5).Infof("get upgrade request %s", req.URL)
		rp.upgradeHandler.ServeHTTP(rw, req)
		return
	}

	rp.reverseProxy.ServeHTTP(rw, req)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As replied above

rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.bearerTransport, proxy.MirrorRequest)
rp.bearerUpgradeHandler.ServeHTTP(rw, req)
} else {
rp.upgradeHandler.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(rp.currentTransport, proxy.MirrorRequest)
rp.upgradeHandler.ServeHTTP(rw, req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

}
rp.upgradeHandler.ServeHTTP(rw, req)
return
}

rp.reverseProxy.Transport = rp.currentTransport
// when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub,
// Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access
// yurthub, Authorization header in request will be empty.
if isBearerRequest(req) {
rp.reverseProxy.Transport = rp.bearerTransport
}
rp.reverseProxy.ServeHTTP(rw, req)
}

Expand Down Expand Up @@ -223,6 +219,18 @@ func (rp *RemoteProxy) errorHandler(rw http.ResponseWriter, req *http.Request, e
rw.WriteHeader(http.StatusBadGateway)
}

// RoundTrip is used to implement http.RoundTripper for RemoteProxy.
func (rp *RemoteProxy) RoundTrip(req *http.Request) (*http.Response, error) {
// when edge client(like kube-proxy, flannel, etc) use service account(default InClusterConfig) to access yurthub,
// Authorization header will be set in request. and when edge client(like kubelet) use x509 certificate to access
// yurthub, Authorization header in request will be empty.
if isBearerRequest(req) {
return rp.bearerTransport.RoundTrip(req)
}

return rp.currentTransport.RoundTrip(req)
}

func isBearerRequest(req *http.Request) bool {
auth := strings.TrimSpace(req.Header.Get("Authorization"))
if auth != "" {
Expand Down
12 changes: 9 additions & 3 deletions pkg/yurthub/util/connrotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *closableConn) Close() error {
remain := len(c.dialer.addrConns[c.addr])
if remain >= 1 {
delete(c.dialer.addrConns[c.addr], c)
remain--
remain = len(c.dialer.addrConns[c.addr])
}
c.dialer.mu.Unlock()
klog.Infof("close connection from %s to %s for %s dialer, remain %d connections", c.Conn.LocalAddr().String(), c.addr, c.dialer.name, remain)
Expand Down Expand Up @@ -124,7 +124,12 @@ func (d *Dialer) Dial(network, address string) (net.Conn, error) {
func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
conn, err := d.dial(ctx, network, address)
if err != nil {
klog.V(3).Infof("%s dialer failed to dial: %v, and total connections: %d", d.name, err, len(d.addrConns[address]))
if klog.V(3).Enabled() {
d.mu.Lock()
size := len(d.addrConns[address])
d.mu.Unlock()
klog.Infof("%s dialer failed to dial: %v, and total connections: %d", d.name, err, size)
}
return nil, err
}

Expand All @@ -140,9 +145,10 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
d.addrConns[address] = make(map[*closableConn]struct{})
}
d.addrConns[address][closable] = struct{}{}
size := len(d.addrConns[address])
d.mu.Unlock()

klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, len(d.addrConns[address]), d.name)
klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, size, d.name)
metrics.Metrics.IncClosableConns(address)
return closable, nil
}