Skip to content

Commit

Permalink
fix: resource leak in http tunnel
Browse files Browse the repository at this point in the history
  • Loading branch information
nange committed Jun 20, 2024
1 parent 0bf25d0 commit fec64d5
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 74 deletions.
8 changes: 4 additions & 4 deletions remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (es *EasyServer) handleConn(conn net.Conn, tryReuse bool) {
if err != nil {
if errors.Is(err, io.EOF) {
log.Debug("[REMOTE] got EOF error when handshake with client-server, maybe the connection pool closed the idle conn")
} else if !errors.Is(err, netpipe.ErrDeadline) {
} else if !errors.Is(err, netpipe.ErrReadDeadline) {
log.Warn("[REMOTE] handshake with client", "err", err)
}
return
Expand Down Expand Up @@ -190,9 +190,9 @@ func (es *EasyServer) handShakeWithClient(conn net.Conn) (hsRes, error) {
}
cs := csStream.(*cipherstream.CipherStream)

_ = csStream.SetDeadline(time.Now().Add(es.MaxConnWaitTimeout()))
_ = csStream.SetReadDeadline(time.Now().Add(es.MaxConnWaitTimeout()))
defer func() {
_ = csStream.SetDeadline(time.Time{})
_ = csStream.SetReadDeadline(time.Time{})
cs.Release()
}()

Expand All @@ -203,7 +203,7 @@ func (es *EasyServer) handShakeWithClient(conn net.Conn) (hsRes, error) {
return res, err
}

_ = csStream.SetDeadline(time.Now().Add(es.MaxConnWaitTimeout()))
_ = csStream.SetReadDeadline(time.Now().Add(es.MaxConnWaitTimeout()))

if frame.IsPingFrame() {
log.Debug("[REMOTE] got ping message",
Expand Down
2 changes: 1 addition & 1 deletion remote_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (es *EasyServer) remoteUDPHandle(conn net.Conn, addrStr, method string, isD
if errors.Is(err, cipherstream.ErrFINRSTStream) {
_tryReuse = true
log.Debug("[REMOTE_UDP] received FIN when reading data from client, try to reuse the connection")
} else if !errors.Is(err, io.EOF) && !errors.Is(err, netpipe.ErrDeadline) {
} else if !errors.Is(err, io.EOF) && !errors.Is(err, netpipe.ErrReadDeadline) {
log.Warn("[REMOTE_UDP] read data from client connection", "err", err)
}

Expand Down
2 changes: 1 addition & 1 deletion util/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestIP(t *testing.T) {
}

func TestLookupIPV4From(t *testing.T) {
ips, err := LookupIPV4From("8.8.8.8:53", "dnspod.cn")
ips, err := LookupIPV4From("119.29.29.29:53", "dnspod.cn")
assert.Nil(t, err)
assert.Greater(t, len(ips), 0)
}
4 changes: 4 additions & 0 deletions util/netpipe/dup_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func Pipe(maxSize int, addrs ...net.Addr) (net.Conn, net.Conn) {
sp := &pipe{
buf: ringbuffer.NewBuffer(buf1),
back: buf1,
rdChan: make(chan struct{}),
wdChan: make(chan struct{}),
maxSize: maxSize,
remoteAddr: remoteAddr,
localAddr: localAddr,
Expand All @@ -80,6 +82,8 @@ func Pipe(maxSize int, addrs ...net.Addr) (net.Conn, net.Conn) {
rp := &pipe{
buf: ringbuffer.NewBuffer(buf2),
back: buf2,
rdChan: make(chan struct{}),
wdChan: make(chan struct{}),
maxSize: maxSize,
remoteAddr: remoteAddr,
localAddr: localAddr,
Expand Down
18 changes: 15 additions & 3 deletions util/netpipe/dup_pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ func TestDupPipe_Timeout(t *testing.T) {
b := make([]byte, 10)
n, err := p1.Read(b)
assert.Equal(t, 0, n)
assert.Equal(t, ErrDeadline, err)
assert.Equal(t, ErrReadDeadline, err)
n, err = p2.Read(b)
assert.Equal(t, 0, n)
assert.Equal(t, ErrDeadline, err)
assert.Equal(t, ErrReadDeadline, err)

err = p1.SetReadDeadline(time.Time{})
assert.Nil(t, err)
Expand All @@ -134,7 +134,7 @@ func TestDupPipe_Timeout(t *testing.T) {
assert.Nil(t, err)
n, err = p1.Write([]byte("hello2"))
assert.Equal(t, 0, n)
assert.Equal(t, ErrDeadline, err)
assert.Equal(t, ErrWriteDeadline, err)

err = p1.SetWriteDeadline(time.Time{})
assert.Nil(t, err)
Expand All @@ -149,3 +149,15 @@ func TestDupPipe_Timeout(t *testing.T) {
assert.Equal(t, 6, n)
assert.Nil(t, err)
}

func TestDupPipe_Read_Then_SetDeadline(t *testing.T) {
p1, _ := Pipe(10)
go func() {
time.Sleep(time.Second)
err := p1.SetDeadline(time.Now().Add(time.Second))
assert.Nil(t, err)
}()
n, err := p1.Read(make([]byte, 10))
assert.Equal(t, 0, n)
assert.Equal(t, ErrReadDeadline, err)
}
144 changes: 79 additions & 65 deletions util/netpipe/pipe.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package netpipe

import (
"errors"
"fmt"
"net"
"sync"
Expand All @@ -15,10 +16,12 @@ var _ net.Conn = (*pipe)(nil) // ensure to implements net.Conn
// Pipe is buffered version of net.Pipe. Reads
// will block until data is available.
type pipe struct {
buf *ringbuffer.RingBuffer
back []byte
cond sync.Cond
mu sync.Mutex
buf *ringbuffer.RingBuffer
back []byte
rdChan chan struct{}
wdChan chan struct{}
cond sync.Cond
mu sync.Mutex

maxSize int
rLate bool
Expand All @@ -30,7 +33,8 @@ type pipe struct {
localAddr net.Addr
}

var ErrDeadline = fmt.Errorf("pipe deadline exceeded")
var ErrReadDeadline = fmt.Errorf("pipe read deadline exceeded")
var ErrWriteDeadline = fmt.Errorf("pipe write deadline exceeded")
var ErrPipeClosed = fmt.Errorf("pipe closed")
var ErrExceedMaxSize = fmt.Errorf("exceed max size")

Expand All @@ -41,30 +45,7 @@ func (p *pipe) Read(b []byte) (n int, err error) {
defer p.cond.L.Unlock()

if p.rLate {
return 0, ErrDeadline
}

if !p.readDeadline.IsZero() {
now := time.Now()
dur := p.readDeadline.Sub(now)
if dur <= 0 {
p.rLate = true
return 0, ErrDeadline
}
nextReadDone := make(chan struct{})
defer close(nextReadDone)
go func(dur time.Duration) {
timer := time.NewTimer(dur)
defer timer.Stop()
select {
case <-timer.C:
p.cond.L.Lock()
p.rLate = true
p.cond.L.Unlock()
p.cond.Broadcast()
case <-nextReadDone:
}
}(dur)
return 0, ErrReadDeadline
}

defer p.cond.Broadcast()
Expand All @@ -75,7 +56,7 @@ func (p *pipe) Read(b []byte) (n int, err error) {
}

if p.rLate {
return 0, ErrDeadline
return 0, ErrReadDeadline
}

return p.buf.Read(b)
Expand All @@ -92,31 +73,9 @@ func (p *pipe) Write(b []byte) (n int, err error) {
defer p.cond.L.Unlock()

if p.wLate {
return 0, ErrDeadline
}

if !p.writeDeadline.IsZero() {
now := time.Now()
dur := p.writeDeadline.Sub(now)
if dur <= 0 {
p.wLate = true
return 0, ErrDeadline
}
nextWriteDone := make(chan struct{})
defer close(nextWriteDone)
go func(dur time.Duration) {
timer := time.NewTimer(dur)
defer timer.Stop()
select {
case <-timer.C:
p.cond.L.Lock()
p.wLate = true
p.cond.L.Unlock()
p.cond.Broadcast()
case <-nextWriteDone:
}
}(dur)
return 0, ErrWriteDeadline
}

defer p.cond.Broadcast()

for p.buf.Free() < len(b) && !p.closed && !p.wLate {
Expand All @@ -125,7 +84,7 @@ func (p *pipe) Write(b []byte) (n int, err error) {
}

if p.wLate {
return 0, ErrDeadline
return 0, ErrWriteDeadline
}

return p.buf.Write(b)
Expand Down Expand Up @@ -179,26 +138,81 @@ func (a addr) Network() string { return "pipe" }
func (p *pipe) SetDeadline(t time.Time) error {
err := p.SetReadDeadline(t)
err2 := p.SetWriteDeadline(t)
if err != nil {
return err
}
return err2
return errors.Join(err, err2)
}

// SetWriteDeadline implements the net.Conn method
func (p *pipe) SetWriteDeadline(t time.Time) error {
// Let the previous goroutine exit, if it exists.
select {
case p.wdChan <- struct{}{}:
default:
}

p.cond.L.Lock()
p.writeDeadline = t
p.wLate = false
p.cond.L.Unlock()
defer p.cond.L.Unlock()

if t.IsZero() || t.After(time.Now()) {
p.wLate = false
} else {
p.wLate = true
p.cond.Broadcast()
return nil
}

if !t.IsZero() {
go func() {
timer := time.NewTimer(t.Sub(time.Now()))

Check failure on line 165 in util/netpipe/pipe.go

View workflow job for this annotation

GitHub Actions / Lint-Test-Build-MacOS

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)

Check failure on line 165 in util/netpipe/pipe.go

View workflow job for this annotation

GitHub Actions / Lint-Test-Build-Windows

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)

Check failure on line 165 in util/netpipe/pipe.go

View workflow job for this annotation

GitHub Actions / Lint-Test-Build-Linux

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)
defer timer.Stop()

select {
case <-timer.C:
p.cond.L.Lock()
p.wLate = true
p.cond.Broadcast()
p.cond.L.Unlock()
case <-p.wdChan:
}
}()
}

return nil
}

// SetReadDeadline implements the net.Conn method
func (p *pipe) SetReadDeadline(t time.Time) error {
// Let the previous goroutine exit, if it exists.
select {
case p.rdChan <- struct{}{}:
default:
}

p.cond.L.Lock()
p.readDeadline = t
p.rLate = false
p.cond.L.Unlock()
defer p.cond.L.Unlock()

if t.IsZero() || t.After(time.Now()) {
p.rLate = false
} else {
p.rLate = true
p.cond.Broadcast()
return nil
}

if !t.IsZero() {
go func() {
timer := time.NewTimer(t.Sub(time.Now()))

Check failure on line 203 in util/netpipe/pipe.go

View workflow job for this annotation

GitHub Actions / Lint-Test-Build-MacOS

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)

Check failure on line 203 in util/netpipe/pipe.go

View workflow job for this annotation

GitHub Actions / Lint-Test-Build-Windows

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)

Check failure on line 203 in util/netpipe/pipe.go

View workflow job for this annotation

GitHub Actions / Lint-Test-Build-Linux

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)
defer timer.Stop()

select {
case <-timer.C:
p.cond.L.Lock()
p.rLate = true
p.cond.Broadcast()
p.cond.L.Unlock()
case <-p.rdChan:
}
}()
}

return nil
}

0 comments on commit fec64d5

Please sign in to comment.