Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync.Pool to cache *bufio.Reader in tunnel server #381

Merged
merged 3 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ The major OpenYurt components consist of:

## Getting started

OpenYurt supports Kubernetes versions up to 1.16. Using higher Kubernetes versions may cause
OpenYurt supports Kubernetes versions up to 1.18. Using higher Kubernetes versions may cause
compatibility issues.

You can setup the OpenYurt cluster [manually](docs/tutorial/manually-setup.md), but we recommend to start
Expand Down
2 changes: 1 addition & 1 deletion README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ OpenYurt 的主要组件包括:
[资源和系统要求](./docs/resource-and-system-requirements-cn.md)

## 开始使用
OpenYurt 支持最高版本为1.16的 Kubernetes 。使用更高版本的 Kubernetes 可能会导致兼容性问题。
OpenYurt 支持最高版本为1.18的 Kubernetes 。使用更高版本的 Kubernetes 可能会导致兼容性问题。
您可以[手动](docs/tutorial/manually-setup.md)设置 OpenYurt 集群,但是我们建议使用 `yurtctl` 命令行工具启动 OpenYurt 。要快速构建和安装设置 `yurtctl` ,在编译系统已安装了 golang 1.13+ 和 bash 的前提下你可以执行以下命令来完成安装:

```bash
Expand Down
30 changes: 27 additions & 3 deletions pkg/yurttunnel/server/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"

"github.com/openyurtio/openyurt/pkg/yurttunnel/constants"
Expand All @@ -39,8 +40,26 @@ var (
supportedHeaders = []string{constants.ProxyHostHeaderKey, "User-Agent"}
HeaderTransferEncoding = "Transfer-Encoding"
HeaderChunked = "chunked"
bufioReaderPool sync.Pool
)

// newBufioReader retrieves a cached Reader from the pool if the pool is not empty,
// otherwise creates a new one
func newBufioReader(r io.Reader) *bufio.Reader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments for newBufioReader func.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

if v := bufioReaderPool.Get(); v != nil {
br := v.(*bufio.Reader)
br.Reset(r)
return br
}
return bufio.NewReader(r)
}

// putBufioReader puts the Reader to the pool.
func putBufioReader(br *bufio.Reader) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments for putBufioReader func.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

br.Reset(nil)
bufioReaderPool.Put(br)
}

// ReqRequestInterceptor intercepts http/https requests sent from the master,
// prometheus and metric server, setup proxy tunnel to kubelet, sends requests
// through the tunnel and sends responses back to the master
Expand Down Expand Up @@ -70,7 +89,8 @@ func NewRequestInterceptor(udsSockFile string, cfg *tls.Config) *RequestIntercep
}

fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: %s%s\r\n\r\n", addr, "127.0.0.1", connectHeaders)
br := bufio.NewReader(proxyConn)
br := newBufioReader(proxyConn)
defer putBufioReader(br)
res, err := http.ReadResponse(br, nil)
if err != nil {
proxyConn.Close()
Expand Down Expand Up @@ -151,7 +171,9 @@ func (ri *RequestInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request)
func getResponse(r io.Reader) (*http.Response, []byte, error) {
rawResponse := bytes.NewBuffer(make([]byte, 0, 256))
// Save the bytes read while reading the response headers into the rawResponse buffer
resp, err := http.ReadResponse(bufio.NewReader(io.TeeReader(r, rawResponse)), nil)
br := newBufioReader(io.TeeReader(r, rawResponse))
defer putBufioReader(br)
resp, err := http.ReadResponse(br, nil)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -250,7 +272,9 @@ func isChunked(response *http.Response) bool {

// serverRequest serves the normal requests, e.g., kubectl logs
func serveRequest(tunnelConn net.Conn, w http.ResponseWriter, r *http.Request) {
tunnelHTTPResp, err := http.ReadResponse(bufio.NewReader(tunnelConn), r)
br := newBufioReader(tunnelConn)
defer putBufioReader(br)
tunnelHTTPResp, err := http.ReadResponse(br, r)
if err != nil {
klogAndHTTPError(w, http.StatusServiceUnavailable, "fail to read response from the tunnel: %v", err)
return
Expand Down