Skip to content

Commit

Permalink
add node.IdleTimeout and close idle connections after a timeout (blue…
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Aug 7, 2023
1 parent 0e7760b commit 0c48e81
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 116 deletions.
10 changes: 2 additions & 8 deletions endpoint_broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
)

var _ endpointChannelSingle = (*endpointUDPBroadcast)(nil)
Expand All @@ -27,13 +26,8 @@ func (rw *readWriterFromFuncs) Write(p []byte) (int, error) {
}

func TestEndpointBroadcast(t *testing.T) {
dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointUDPBroadcast{"127.255.255.255:5602", ":5601"}},
Expand All @@ -51,7 +45,7 @@ func TestEndpointBroadcast(t *testing.T) {
require.NoError(t, err)
defer pc.Close()

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

rw, err := frame.NewReadWriter(frame.ReadWriterConf{
Expand Down
8 changes: 7 additions & 1 deletion endpoint_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,20 @@ func initEndpointClient(node *Node, conf endpointClientConf) (Endpoint, error) {
}
return "tcp4"
}()

timedContext, timedContextClose := context.WithTimeout(ctx, node.conf.ReadTimeout)
nconn, err := (&net.Dialer{}).DialContext(timedContext, network, conf.getAddress())
timedContextClose()

if err != nil {
return nil, err
}

return timednetconn.New(node.conf.WriteTimeout, nconn), nil
return timednetconn.New(
node.conf.IdleTimeout,
node.conf.WriteTimeout,
nconn,
), nil
},
),
}
Expand Down
114 changes: 96 additions & 18 deletions endpoint_client_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
package gomavlib

import (
"io"
"net"
"testing"
"time"

"github.com/pion/transport/v2/udp"
"github.com/stretchr/testify/require"

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
)

var _ endpointChannelSingle = (*endpointClient)(nil)

func TestEndpointClient(t *testing.T) {
for _, ca := range []string{"tcp", "udp"} {
t.Run(ca, func(t *testing.T) {
dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

var ln net.Listener
if ca == "tcp" {
var err error
Expand All @@ -37,14 +33,6 @@ func TestEndpointClient(t *testing.T) {

defer ln.Close()

connOpened := make(chan net.Conn)

go func() {
conn, err := ln.Accept()
require.NoError(t, err)
connOpened <- conn
}()

var e EndpointConf
if ca == "tcp" {
e = EndpointTCPClient{"127.0.0.1:5601"}
Expand All @@ -53,7 +41,7 @@ func TestEndpointClient(t *testing.T) {
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{e},
Expand All @@ -67,7 +55,6 @@ func TestEndpointClient(t *testing.T) {
Channel: evt.(*EventChannelOpen).Channel,
}, evt)

var conn net.Conn
var rw *frame.ReadWriter

for i := 0; i < 3; i++ {
Expand All @@ -82,10 +69,11 @@ func TestEndpointClient(t *testing.T) {
node.WriteMessageAll(msg)

if i == 0 {
conn = <-connOpened
conn, err := ln.Accept()
require.NoError(t, err)
defer conn.Close()

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

rw, err = frame.NewReadWriter(frame.ReadWriterConf{
Expand Down Expand Up @@ -133,3 +121,93 @@ func TestEndpointClient(t *testing.T) {
})
}
}

func TestEndpointClientIdleTimeout(t *testing.T) {
for _, ca := range []string{"tcp"} {
t.Run(ca, func(t *testing.T) {
var ln net.Listener
var err error
ln, err = net.Listen("tcp", "127.0.0.1:5603")
require.NoError(t, err)

defer ln.Close()

var e EndpointConf
if ca == "tcp" {
e = EndpointTCPClient{"127.0.0.1:5603"}
} else {
e = EndpointUDPClient{"127.0.0.1:5603"}
}

node, err := NewNode(NodeConf{
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{e},
HeartbeatDisable: true,
IdleTimeout: 500 * time.Millisecond,
})
require.NoError(t, err)
defer node.Close()

evt := <-node.Events()
require.Equal(t, &EventChannelOpen{
Channel: evt.(*EventChannelOpen).Channel,
}, evt)

msg := &MessageHeartbeat{
Type: 1,
Autopilot: 2,
BaseMode: 3,
CustomMode: 6,
SystemStatus: 4,
MavlinkVersion: 5,
}
node.WriteMessageAll(msg)

conn, err := ln.Accept()
require.NoError(t, err)

dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

rw, err := frame.NewReadWriter(frame.ReadWriterConf{
ReadWriter: conn,
DialectRW: dialectRW,
OutVersion: frame.V2,
OutSystemID: 11,
})
require.NoError(t, err)

fr, err := rw.Read()
require.NoError(t, err)
require.Equal(t, &frame.V2Frame{
SequenceID: 0,
SystemID: 10,
ComponentID: 1,
Message: msg,
Checksum: fr.GetChecksum(),
}, fr)

closed := make(chan struct{})

go func() {
_, err = rw.Read()
require.Equal(t, io.EOF, err)
conn.Close()
close(closed)
}()

select {
case <-closed:
case <-time.After(1 * time.Second):
t.Errorf("should not happen")
}

// the client reconnects to the server due to autoReconnector
conn, err = ln.Accept()
require.NoError(t, err)
conn.Close()
})
}
}
10 changes: 2 additions & 8 deletions endpoint_custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
)

var _ endpointChannelSingle = (*endpointCustom)(nil)
Expand Down Expand Up @@ -65,15 +64,10 @@ func (e *dummyEndpoint) Write(p []byte) (int, error) {
}

func TestEndpointCustom(t *testing.T) {
dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

de := newDummyEndpoint()

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointCustom{de}},
Expand All @@ -87,7 +81,7 @@ func TestEndpointCustom(t *testing.T) {
Channel: evt.(*EventChannelOpen).Channel,
}, evt)

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

var buf bytes.Buffer
Expand Down
19 changes: 4 additions & 15 deletions endpoint_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/bluenviron/gomavlib/v2/pkg/dialect"
"github.com/bluenviron/gomavlib/v2/pkg/frame"
"github.com/bluenviron/gomavlib/v2/pkg/message"
)

var _ endpointChannelSingle = (*endpointSerial)(nil)
Expand All @@ -22,13 +21,8 @@ func TestEndpointSerial(t *testing.T) {
return de, nil
}

dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointSerial{
Expand All @@ -49,7 +43,7 @@ func TestEndpointSerial(t *testing.T) {

de := <-endpointCreated

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

var buf bytes.Buffer
Expand Down Expand Up @@ -120,13 +114,8 @@ func TestEndpointSerialReconnect(t *testing.T) {
return de, nil
}

dial := &dialect.Dialect{
Version: 3,
Messages: []message.Message{&MessageHeartbeat{}},
}

node, err := NewNode(NodeConf{
Dialect: dial,
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointSerial{
Expand All @@ -147,7 +136,7 @@ func TestEndpointSerialReconnect(t *testing.T) {

de := <-endpointCreated

dialectRW, err := dialect.NewReadWriter(dial)
dialectRW, err := dialect.NewReadWriter(testDialect)
require.NoError(t, err)

var buf bytes.Buffer
Expand Down
7 changes: 6 additions & 1 deletion endpoint_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type endpointServer struct {
conf endpointServerConf
listener net.Listener
writeTimeout time.Duration
idleTimeout time.Duration

// in
terminate chan struct{}
Expand Down Expand Up @@ -94,6 +95,7 @@ func initEndpointServer(node *Node, conf endpointServerConf) (Endpoint, error) {
t := &endpointServer{
conf: conf,
writeTimeout: node.conf.WriteTimeout,
idleTimeout: node.conf.IdleTimeout,
listener: ln,
terminate: make(chan struct{}),
}
Expand Down Expand Up @@ -127,7 +129,10 @@ func (t *endpointServer) accept() (string, io.ReadWriteCloser, error) {
return "tcp"
}(), nconn.RemoteAddr())

conn := timednetconn.New(t.writeTimeout, nconn)
conn := timednetconn.New(
t.idleTimeout,
t.writeTimeout,
nconn)

return label, conn, nil
}
Loading

0 comments on commit 0c48e81

Please sign in to comment.