Skip to content
This repository has been archived by the owner on Apr 3, 2021. It is now read-only.

Commit

Permalink
Remove intermediate buffering and solve the Read blocking issue
Browse files Browse the repository at this point in the history
  • Loading branch information
eycorsican committed Nov 27, 2020
1 parent 301549c commit 425fb2e
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 50 deletions.
4 changes: 3 additions & 1 deletion core/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ type TCPConn interface {
Sent(len uint16) error

// Receive will be called when data arrives from TUN.
Receive(data []byte) error
Receive() (<-chan []byte, error)

ReceiveDone(int)

// Err will be called when a fatal error has occurred on the connection.
// The corresponding pcb is already freed when this callback is called
Expand Down
32 changes: 21 additions & 11 deletions core/tcp_callback_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,7 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err
}
}

var buf []byte
var totlen = int(p.tot_len)
if p.tot_len == p.len {
buf = (*[1 << 30]byte)(unsafe.Pointer(p.payload))[:totlen:totlen]
} else {
buf = NewBytes(totlen)
defer FreeBytes(buf)
C.pbuf_copy_partial(p, unsafe.Pointer(&buf[0]), p.tot_len, 0)
}

rerr := conn.(TCPConn).Receive(buf[:totlen])
readCh, rerr := conn.(TCPConn).Receive()
if rerr != nil {
switch rerr.(*lwipError).Code {
case LWIP_ERR_ABRT:
Expand All @@ -109,6 +99,26 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err
}
}

select {
case buf, ok := <-readCh:
if !ok {
C.tcp_recved(tpcb, p.tot_len)
C.tcp_shutdown(tpcb, 1, 0)
return C.ERR_OK
}
if len(buf) < int(p.tot_len) {
conn.(TCPConn).ReceiveDone(-1)
shouldFreePbuf = false
return C.ERR_CONN
}
C.pbuf_copy_partial(p, unsafe.Pointer(&buf[0]), p.tot_len, 0)
conn.(TCPConn).ReceiveDone(int(p.tot_len))
C.tcp_recved(tpcb, p.tot_len)
default:
shouldFreePbuf = false
return C.ERR_CONN
}

return C.ERR_OK
}

Expand Down
65 changes: 33 additions & 32 deletions core/tcp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ type tcpConn struct {
connKey uint32
canWrite *sync.Cond // Condition variable to implement TCP backpressure.
state tcpConnState
sndPipeReader *io.PipeReader
sndPipeWriter *io.PipeWriter
readCh chan []byte
readDoneCh chan int
closeOnce sync.Once
readCloseOnce sync.Once
closeErr error
}

Expand All @@ -86,18 +87,17 @@ func newTCPConn(pcb *C.struct_tcp_pcb, handler TCPConnHandler) (TCPConn, error)
setTCPErrCallback(pcb)
setTCPPollCallback(pcb, C.u8_t(TCP_POLL_INTERVAL))

pipeReader, pipeWriter := io.Pipe()
conn := &tcpConn{
pcb: pcb,
handler: handler,
localAddr: ParseTCPAddr(ipAddrNTOA(pcb.remote_ip), uint16(pcb.remote_port)),
remoteAddr: ParseTCPAddr(ipAddrNTOA(pcb.local_ip), uint16(pcb.local_port)),
connKeyArg: connKeyArg,
connKey: connKey,
canWrite: sync.NewCond(&sync.Mutex{}),
state: tcpNewConn,
sndPipeReader: pipeReader,
sndPipeWriter: pipeWriter,
pcb: pcb,
handler: handler,
localAddr: ParseTCPAddr(ipAddrNTOA(pcb.remote_ip), uint16(pcb.remote_port)),
remoteAddr: ParseTCPAddr(ipAddrNTOA(pcb.local_ip), uint16(pcb.local_port)),
connKeyArg: connKeyArg,
connKey: connKey,
canWrite: sync.NewCond(&sync.Mutex{}),
state: tcpNewConn,
readCh: make(chan []byte, 1),
readDoneCh: make(chan int, 1),
}

// Associate conn with key and save to the global map.
Expand Down Expand Up @@ -180,16 +180,15 @@ func (conn *tcpConn) receiveCheck() error {
return nil
}

func (conn *tcpConn) Receive(data []byte) error {
func (conn *tcpConn) Receive() (<-chan []byte, error) {
if err := conn.receiveCheck(); err != nil {
return err
}
n, err := conn.sndPipeWriter.Write(data)
if err != nil {
return NewLWIPError(LWIP_ERR_CLSD)
return nil, err
}
C.tcp_recved(conn.pcb, C.u16_t(n))
return NewLWIPError(LWIP_ERR_OK)
return conn.readCh, nil
}

func (conn *tcpConn) ReceiveDone(n int) {
conn.readDoneCh <- n
}

func (conn *tcpConn) Read(data []byte) (int, error) {
Expand All @@ -204,12 +203,12 @@ func (conn *tcpConn) Read(data []byte) (int, error) {
}
conn.Unlock()

// Handler should get EOF.
n, err := conn.sndPipeReader.Read(data)
if err == io.ErrClosedPipe {
err = io.EOF
conn.readCh <- data
n := <-conn.readDoneCh
if n == -1 {
return 0, errors.New("insufficient read buffer")
}
return n, err
return n, nil
}

// writeInternal enqueues data to snd_buf, and treats ERR_MEM returned by tcp_write not an error,
Expand Down Expand Up @@ -312,7 +311,8 @@ func (conn *tcpConn) CloseWrite() error {
}

func (conn *tcpConn) CloseRead() error {
return conn.sndPipeReader.Close()
conn.readCloseOnce.Do(conn.closeReadCh)
return nil
}

func (conn *tcpConn) Sent(len uint16) error {
Expand Down Expand Up @@ -386,6 +386,11 @@ func (conn *tcpConn) close() {
}
}

func (conn *tcpConn) closeReadCh() {
close(conn.readCh)
close(conn.readDoneCh)
}

func (conn *tcpConn) setLocalClosed() error {
conn.Lock()
defer conn.Unlock()
Expand All @@ -394,9 +399,6 @@ func (conn *tcpConn) setLocalClosed() error {
return nil
}

// Causes the read half of the pipe returns.
conn.sndPipeWriter.Close()

if conn.state == tcpWriteClosed {
conn.state = tcpClosing
} else {
Expand Down Expand Up @@ -464,8 +466,7 @@ func (conn *tcpConn) release() {
freeConnKeyArg(conn.connKeyArg)
tcpConns.Delete(conn.connKey)
}
conn.sndPipeWriter.Close()
conn.sndPipeReader.Close()
conn.readCloseOnce.Do(conn.closeReadCh)
conn.state = tcpClosed
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ go 1.13

require (
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b
golang.org/x/net v0.0.0-20191021144547-ec77196f6094
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
)
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b h1:+y4hCMc/WKsDbAPsOQZgBSaSZ26uh2afyaWeVg/3s/c=
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b/go.mod h1:P5HUIBuIWKbyjl083/loAegFkfbFNx5i2qEP4CNbm7E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191021144547-ec77196f6094 h1:5O4U9trLjNpuhpynaDsqwCk+Tw6seqJz1EbqbnzHrc8=
golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

0 comments on commit 425fb2e

Please sign in to comment.