Skip to content

Commit

Permalink
fix: memory leak in the HTTP tunnel module under extreme scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
nange committed Nov 1, 2024
1 parent 99cce33 commit a5778ad
Showing 1 changed file with 84 additions and 15 deletions.
99 changes: 84 additions & 15 deletions httptunnel/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ import (
)

const RelayBufferSize = cipherstream.MaxCipherRelaySize
const DefaultConnCount = 256

type Server struct {
addr string
timeout time.Duration

sync.Mutex
connMap map[string][]net.Conn
sync.RWMutex
connMap map[string]*struct {
conns []net.Conn
ch chan struct{}
timer *time.Timer
isPushCloseRunning bool
}
connCh chan net.Conn
closing chan struct{}
pullWaiting map[string]chan struct{}
Expand All @@ -44,12 +50,17 @@ func NewServer(addr string, timeout time.Duration, tlsConfig *tls.Config) *Serve
}

return &Server{
addr: addr,
timeout: timeout,
connMap: make(map[string][]net.Conn, 256),
addr: addr,
timeout: timeout,
connMap: make(map[string]*struct {
conns []net.Conn
ch chan struct{}
timer *time.Timer
isPushCloseRunning bool
}, DefaultConnCount),
connCh: make(chan net.Conn, 1),
closing: make(chan struct{}, 1),
pullWaiting: make(map[string]chan struct{}, 256),
pullWaiting: make(map[string]chan struct{}, DefaultConnCount),
tlsConfig: tlsConfig,
server: server,
}
Expand Down Expand Up @@ -131,9 +142,9 @@ func (s *Server) pull(w http.ResponseWriter, r *http.Request) {
return
}

s.Lock()
conns := s.connMap[reqID]
s.Unlock()
s.RLock()
conns := s.connMap[reqID].conns
s.RUnlock()
log.Debug("[HTTP_TUNNEL_SERVER] pull", "uuid", reqID)

w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -172,7 +183,7 @@ func (s *Server) pull(w http.ResponseWriter, r *http.Request) {
log.Warn("[HTTP_TUNNEL_SERVER] read from conn", "err", err)
}

s.pullClose(reqID)
s.pullCloseConn(reqID)
log.Info("[HTTP_TUNNEL_SERVER] Pull completed...", "uuid", reqID)
}

Expand Down Expand Up @@ -222,16 +233,43 @@ func (s *Server) push(w http.ResponseWriter, r *http.Request) {
conns, ok := s.connMap[reqID]
if !ok {
conn1, conn2 := netpipe.Pipe(2*cipherstream.MaxPayloadSize, addr)
conns = []net.Conn{conn1, conn2}
conns = &struct {
conns []net.Conn
ch chan struct{}
timer *time.Timer
isPushCloseRunning bool
}{
conns: []net.Conn{conn1, conn2},
ch: make(chan struct{}, 1),
timer: time.NewTimer(s.timeout),
}
s.connMap[reqID] = conns
s.connCh <- conn2
}
s.notifyPull(reqID)
s.Unlock()

defer func() {
s.Lock()
defer s.Unlock()

conns, ok := s.connMap[reqID]
if !ok {
return
}
timer := conns.timer
timer.Reset(s.timeout)
if conns.isPushCloseRunning {
return
}
conns.isPushCloseRunning = true

go s.pushCloseConn(reqID)
}()

if p.Payload == "" {
// client end push
_ = conns[0].(interface{ CloseWrite() error }).CloseWrite()
_ = conns.conns[0].(interface{ CloseWrite() error }).CloseWrite()
return
}
cipher, err := base64.StdEncoding.DecodeString(p.Payload)
Expand All @@ -241,7 +279,7 @@ func (s *Server) push(w http.ResponseWriter, r *http.Request) {
return
}

if _, err = conns[0].Write(cipher); err != nil {
if _, err = conns.conns[0].Write(cipher); err != nil {
log.Warn("[HTTP_TUNNEL_SERVER] write local", "err", err)
writeServiceUnavailableError(w, "write local:"+err.Error())
return
Expand All @@ -250,12 +288,43 @@ func (s *Server) push(w http.ResponseWriter, r *http.Request) {
writeSuccess(w)
}

func (s *Server) pullClose(reqID string) {
func (s *Server) pushCloseConn(reqID string) {
s.RLock()
conns, ok := s.connMap[reqID]
if !ok {
s.RUnlock()
return
}

timer := conns.timer
ch := conns.ch
s.RUnlock()

select {
case <-s.closing:
case <-timer.C:
case <-ch:
}

s.Lock()
defer s.Unlock()
s.closeConn(reqID)
}

func (s *Server) pullCloseConn(reqID string) {
s.Lock()
defer s.Unlock()

if conns, ok := s.connMap[reqID]; ok {
_ = conns[0].Close()
close(conns.ch)
}

s.closeConn(reqID)
}

func (s *Server) closeConn(reqID string) {
if conns, ok := s.connMap[reqID]; ok {
_ = conns.conns[0].Close()
}

s.connMap[reqID] = nil
Expand Down

0 comments on commit a5778ad

Please sign in to comment.