Skip to content

Commit

Permalink
bug: fix the blocking issue on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Nov 11, 2024
1 parent 3e1efa6 commit c898b3f
Showing 1 changed file with 60 additions and 34 deletions.
94 changes: 60 additions & 34 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *conn) Next(n int) (buf []byte, err error) {
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) >= n {
if len(head) == n {
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
Expand Down Expand Up @@ -198,8 +198,8 @@ func (c *conn) Peek(n int) (buf []byte, err error) {
return c.buffer.B[:n], err
}
head, tail := c.inboundBuffer.Peek(n)
if len(head) >= n {
return head[:n], err
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
Expand Down Expand Up @@ -435,13 +435,24 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
// func (c *conn) Gfd() gfd.GFD { return gfd.GFD{} }

func (c *conn) AsyncWrite(buf []byte, cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
}
_, err := c.Write(buf)
c.loop.ch <- func() error {
return cb(c, err)

callback := func() error {
if cb != nil {
_ = cb(c, err)
}
return err
}

select {
case c.loop.ch <- callback:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- callback
}()
}

return nil
}

Expand All @@ -460,46 +471,61 @@ func (c *conn) AsyncWritev(bs [][]byte, cb AsyncCallback) error {
}

func (c *conn) Wake(cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
}
c.loop.ch <- func() (err error) {
defer func() {
defer func() {
if err == nil {
err = cb(c, nil)
return
}
_ = cb(c, err)
}()
wakeFn := func() (err error) {
err = c.loop.wake(c)
if cb != nil {
_ = cb(c, err)
}
return
}

select {
case c.loop.ch <- wakeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- wakeFn
}()
return c.loop.wake(c)
}

return nil
}

func (c *conn) Close() error {
c.loop.ch <- func() error {
err := c.loop.close(c, nil)
return err
closeFn := func() error {
return c.loop.close(c, nil)
}

select {
case c.loop.ch <- closeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- closeFn
}()
}

return nil
}

func (c *conn) CloseWithCallback(cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
}
c.loop.ch <- func() (err error) {
defer func() {
if err == nil {
err = cb(c, nil)
return
}
closeFn := func() (err error) {
err = c.loop.close(c, nil)
if cb != nil {
_ = cb(c, err)
}
return
}

select {
case c.loop.ch <- closeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
c.loop.ch <- closeFn
}()
return c.loop.close(c, nil)
}

return nil
}

Expand Down

0 comments on commit c898b3f

Please sign in to comment.