Skip to content

Commit

Permalink
prevent a channel from blocking all others by using a ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Aug 18, 2023
1 parent 2f27e25 commit d7b3e0e
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 120 deletions.
157 changes: 79 additions & 78 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
"github.com/bluenviron/gomavlib/v2/pkg/ringbuffer"
)

const (
// this is low in order to avoid accumulating messages
// when a channel is reconnecting
writeBufferSize = 8
)

func randomByte() (byte, error) {
Expand All @@ -19,15 +26,15 @@ func randomByte() (byte, error) {
// For instance, a TCP client endpoint creates a single channel, while a TCP
// server endpoint creates a channel for each incoming connection.
type Channel struct {
e Endpoint
label string
rwc io.ReadWriteCloser
n *Node
frw *frame.ReadWriter
running bool
e Endpoint
label string
rwc io.Closer
n *Node
frw *frame.ReadWriter
running bool
writeBuffer *ringbuffer.RingBuffer

// in
write chan interface{}
terminate chan struct{}
}

Expand Down Expand Up @@ -56,14 +63,19 @@ func newChannel(n *Node, e Endpoint, label string, rwc io.ReadWriteCloser) (*Cha
return nil, err
}

writeBuffer, err := ringbuffer.New(writeBufferSize)
if err != nil {
return nil, err
}

return &Channel{
e: e,
label: label,
rwc: rwc,
n: n,
frw: frw,
write: make(chan interface{}),
terminate: make(chan struct{}),
e: e,
label: label,
rwc: rwc,
n: n,
frw: frw,
writeBuffer: writeBuffer,
terminate: make(chan struct{}),
}, nil
}

Expand All @@ -85,88 +97,73 @@ func (ch *Channel) run() {
defer ch.n.channelsWg.Done()

readerDone := make(chan struct{})
go func() {
defer close(readerDone)

// wait client here, in order to allow the writer goroutine to start
// and allow clients to write messages before starting listening to events
select {
case ch.n.events <- &EventChannelOpen{ch}:
case <-ch.n.terminate:
}
go ch.runReader(readerDone)

for {
fr, err := ch.frw.Read()
if err != nil {
// ignore parse errors
if _, ok := err.(*frame.ReadError); ok {
select {
case ch.n.events <- &EventParseError{err, ch}:
case <-ch.n.terminate:
}
continue
}
return
}
writerDone := make(chan struct{})
go ch.runWriter(writerDone)

evt := &EventFrame{fr, ch}
select {
case <-readerDone:
ch.rwc.Close()

if ch.n.nodeStreamRequest != nil {
ch.n.nodeStreamRequest.onEventFrame(evt)
}
ch.writeBuffer.Close()
<-writerDone

select {
case ch.n.events <- evt:
case <-ch.n.terminate:
}
}
}()
case <-ch.terminate:
ch.writeBuffer.Close()
<-writerDone

writerDone := make(chan struct{})
go func() {
defer close(writerDone)
ch.rwc.Close()
<-readerDone
}

ch.n.pushEvent(&EventChannelClose{ch})
ch.n.closeChannel(ch)
}

func (ch *Channel) runReader(readerDone chan struct{}) {
defer close(readerDone)

for what := range ch.write {
switch wh := what.(type) {
case message.Message:
ch.frw.WriteMessage(wh) //nolint:errcheck
// wait client here, in order to allow the writer goroutine to start
// and allow clients to write messages before starting listening to events
ch.n.pushEvent(&EventChannelOpen{ch})

case frame.Frame:
ch.frw.WriteFrame(wh) //nolint:errcheck
for {
fr, err := ch.frw.Read()
if err != nil {
if _, ok := err.(*frame.ReadError); ok {
ch.n.pushEvent(&EventParseError{err, ch})
continue
}
return
}
}()

select {
case <-readerDone:
select {
case ch.n.events <- &EventChannelClose{ch}:
case <-ch.n.terminate:
}
evt := &EventFrame{fr, ch}

select {
case ch.n.channelClose <- ch:
case <-ch.n.terminate:
if ch.n.nodeStreamRequest != nil {
ch.n.nodeStreamRequest.onEventFrame(evt)
}

<-ch.terminate

close(ch.write)
<-writerDone
ch.n.pushEvent(evt)
}
}

ch.rwc.Close()
func (ch *Channel) runWriter(writerDone chan struct{}) {
defer close(writerDone)

case <-ch.terminate:
select {
case ch.n.events <- &EventChannelClose{ch}:
case <-ch.n.terminate:
for {
what, ok := ch.writeBuffer.Pull()
if !ok {
return
}

close(ch.write)
<-writerDone
switch wh := what.(type) {
case message.Message:
ch.frw.WriteMessage(wh) //nolint:errcheck

ch.rwc.Close()
<-readerDone
case frame.Frame:
ch.frw.WriteFrame(wh) //nolint:errcheck
}
}
}

Expand All @@ -179,3 +176,7 @@ func (ch *Channel) String() string {
func (ch *Channel) Endpoint() Endpoint {
return ch.e
}

func (ch *Channel) write(what interface{}) {
ch.writeBuffer.Push(what)
}
9 changes: 3 additions & 6 deletions channel_accepter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func (ca *channelAccepter) close() {

func (ca *channelAccepter) start() {
ca.n.channelAcceptersWg.Add(1)
go ca.runSingle()
go ca.run()
}

func (ca *channelAccepter) runSingle() {
func (ca *channelAccepter) run() {
defer ca.n.channelAcceptersWg.Done()

for {
Expand All @@ -42,9 +42,6 @@ func (ca *channelAccepter) runSingle() {
panic(fmt.Errorf("newChannel unexpected error: %s", err))
}

select {
case ca.n.channelNew <- ch:
case <-ca.n.terminate:
}
ca.n.newChannel(ch)
}
}
Loading

0 comments on commit d7b3e0e

Please sign in to comment.