Skip to content

Commit

Permalink
prevent a channel from blocking all others by using a ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Aug 18, 2023
1 parent 0d8c480 commit 35cb584
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 29 deletions.
36 changes: 16 additions & 20 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func randomByte() (byte, error) {
// For instance, a TCP client endpoint creates a single channel, while a TCP
// server endpoint creates a channel for each incoming connection.
type Channel struct {
e Endpoint
label string
rwc io.Closer
n *Node
frw *frame.ReadWriter
running bool
e Endpoint
label string
rwc io.Closer
n *Node
frw *frame.ReadWriter
running bool
writeBuffer *ringbuffer.RingBuffer

// in
Expand Down Expand Up @@ -69,13 +69,13 @@ func newChannel(n *Node, e Endpoint, label string, rwc io.ReadWriteCloser) (*Cha
}

return &Channel{
e: e,
label: label,
rwc: rwc,
n: n,
frw: frw,
e: e,
label: label,
rwc: rwc,
n: n,
frw: frw,
writeBuffer: writeBuffer,
terminate: make(chan struct{}),
terminate: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -104,25 +104,21 @@ func (ch *Channel) run() {

select {
case <-readerDone:
ch.n.pushEvent(&EventChannelClose{ch})
ch.n.closeChannel(ch)

<-ch.terminate
ch.rwc.Close()

ch.writeBuffer.Close()
<-writerDone

ch.rwc.Close()

case <-ch.terminate:
ch.n.pushEvent(&EventChannelClose{ch})

ch.writeBuffer.Close()
<-writerDone

ch.rwc.Close()
<-readerDone
}

ch.n.pushEvent(&EventChannelClose{ch})
ch.n.closeChannel(ch)
}

func (ch *Channel) runReader(readerDone chan struct{}) {
Expand Down
17 changes: 8 additions & 9 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ type Node struct {
chWriteTo chan writeToReq
chWriteAll chan interface{}
chWriteExcept chan writeExceptReq
terminate chan struct{}
terminate chan struct{}

// out
chEvent chan Event
done chan struct{}
done chan struct{}
}

// NewNode allocates a Node. See NodeConf for the options.
Expand Down Expand Up @@ -169,13 +169,13 @@ func NewNode(conf NodeConf) (*Node, error) {
dialectRW: dialectRW,
channelAccepters: make(map[*channelAccepter]struct{}),
channels: make(map[*Channel]struct{}),
chNewChannel: make(chan *Channel),
chCloseChannel: make(chan *Channel),
chWriteTo: make(chan writeToReq),
chWriteAll: make(chan interface{}),
chWriteExcept: make(chan writeExceptReq),
chNewChannel: make(chan *Channel),
chCloseChannel: make(chan *Channel),
chWriteTo: make(chan writeToReq),
chWriteAll: make(chan interface{}),
chWriteExcept: make(chan writeExceptReq),
terminate: make(chan struct{}),
chEvent: make(chan Event),
chEvent: make(chan Event),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -262,7 +262,6 @@ outer:

case ch := <-n.chCloseChannel:
delete(n.channels, ch)
ch.close()

case req := <-n.chWriteTo:
if _, ok := n.channels[req.ch]; !ok {
Expand Down

0 comments on commit 35cb584

Please sign in to comment.