Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Aug 18, 2023
1 parent 2f27e25 commit 0d8c480
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 106 deletions.
139 changes: 72 additions & 67 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
"github.com/bluenviron/gomavlib/v2/pkg/ringbuffer"
)

const (
// this is low in order to avoid accumulating messages
// when a channel is reconnecting
writeBufferSize = 8
)

func randomByte() (byte, error) {
Expand All @@ -21,13 +28,13 @@ func randomByte() (byte, error) {
type Channel struct {
e Endpoint
label string
rwc io.ReadWriteCloser
rwc io.Closer
n *Node
frw *frame.ReadWriter
running bool
writeBuffer *ringbuffer.RingBuffer

// in
write chan interface{}
terminate chan struct{}
}

Expand Down Expand Up @@ -56,13 +63,18 @@ func newChannel(n *Node, e Endpoint, label string, rwc io.ReadWriteCloser) (*Cha
return nil, err
}

writeBuffer, err := ringbuffer.New(writeBufferSize)
if err != nil {
return nil, err
}

return &Channel{
e: e,
label: label,
rwc: rwc,
n: n,
frw: frw,
write: make(chan interface{}),
writeBuffer: writeBuffer,
terminate: make(chan struct{}),
}, nil
}
Expand All @@ -85,91 +97,80 @@ func (ch *Channel) run() {
defer ch.n.channelsWg.Done()

readerDone := make(chan struct{})
go func() {
defer close(readerDone)

// wait client here, in order to allow the writer goroutine to start
// and allow clients to write messages before starting listening to events
select {
case ch.n.events <- &EventChannelOpen{ch}:
case <-ch.n.terminate:
}

for {
fr, err := ch.frw.Read()
if err != nil {
// ignore parse errors
if _, ok := err.(*frame.ReadError); ok {
select {
case ch.n.events <- &EventParseError{err, ch}:
case <-ch.n.terminate:
}
continue
}
return
}

evt := &EventFrame{fr, ch}

if ch.n.nodeStreamRequest != nil {
ch.n.nodeStreamRequest.onEventFrame(evt)
}

select {
case ch.n.events <- evt:
case <-ch.n.terminate:
}
}
}()
go ch.runReader(readerDone)

writerDone := make(chan struct{})
go func() {
defer close(writerDone)

for what := range ch.write {
switch wh := what.(type) {
case message.Message:
ch.frw.WriteMessage(wh) //nolint:errcheck

case frame.Frame:
ch.frw.WriteFrame(wh) //nolint:errcheck
}
}
}()
go ch.runWriter(writerDone)

select {
case <-readerDone:
select {
case ch.n.events <- &EventChannelClose{ch}:
case <-ch.n.terminate:
}

select {
case ch.n.channelClose <- ch:
case <-ch.n.terminate:
}
ch.n.pushEvent(&EventChannelClose{ch})
ch.n.closeChannel(ch)

<-ch.terminate

close(ch.write)
ch.writeBuffer.Close()
<-writerDone

ch.rwc.Close()

case <-ch.terminate:
select {
case ch.n.events <- &EventChannelClose{ch}:
case <-ch.n.terminate:
}
ch.n.pushEvent(&EventChannelClose{ch})

close(ch.write)
ch.writeBuffer.Close()
<-writerDone

ch.rwc.Close()
<-readerDone
}
}

func (ch *Channel) runReader(readerDone chan struct{}) {
defer close(readerDone)

// wait client here, in order to allow the writer goroutine to start
// and allow clients to write messages before starting listening to events
ch.n.pushEvent(&EventChannelOpen{ch})

for {
fr, err := ch.frw.Read()
if err != nil {
if _, ok := err.(*frame.ReadError); ok {
ch.n.pushEvent(&EventParseError{err, ch})
continue
}
return
}

evt := &EventFrame{fr, ch}

if ch.n.nodeStreamRequest != nil {
ch.n.nodeStreamRequest.onEventFrame(evt)
}

ch.n.pushEvent(evt)
}
}

func (ch *Channel) runWriter(writerDone chan struct{}) {
defer close(writerDone)

for {
what, ok := ch.writeBuffer.Pull()
if !ok {
return
}

switch wh := what.(type) {
case message.Message:
ch.frw.WriteMessage(wh) //nolint:errcheck

case frame.Frame:
ch.frw.WriteFrame(wh) //nolint:errcheck
}
}
}

// String implements fmt.Stringer.
func (ch *Channel) String() string {
return ch.label
Expand All @@ -179,3 +180,7 @@ func (ch *Channel) String() string {
func (ch *Channel) Endpoint() Endpoint {
return ch.e
}

func (ch *Channel) write(what interface{}) {
ch.writeBuffer.Push(what)
}
9 changes: 3 additions & 6 deletions channel_accepter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func (ca *channelAccepter) close() {

func (ca *channelAccepter) start() {
ca.n.channelAcceptersWg.Add(1)
go ca.runSingle()
go ca.run()
}

func (ca *channelAccepter) runSingle() {
func (ca *channelAccepter) run() {
defer ca.n.channelAcceptersWg.Done()

for {
Expand All @@ -42,9 +42,6 @@ func (ca *channelAccepter) runSingle() {
panic(fmt.Errorf("newChannel unexpected error: %s", err))
}

select {
case ca.n.channelNew <- ch:
case <-ca.n.terminate:
}
ca.n.newChannel(ch)
}
}
Loading

0 comments on commit 0d8c480

Please sign in to comment.