Skip to content

Commit

Permalink
New pinger implementation
Browse files Browse the repository at this point in the history
Resolves a range of issues with the old pinger.

Closes #77
Closes #137
  • Loading branch information
MattBrittan authored Jan 13, 2024
2 parents 4063955 + b5a4814 commit e379c50
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 114 deletions.
24 changes: 19 additions & 5 deletions internal/basictestserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,28 @@ func (t *TestServer) Stop() {

func (t *TestServer) Run() {
defer close(t.done)

incoming := make(chan *packets.ControlPacket, 65535)

// read incoming packets in a separate goroutine to avoid deadlocks due to unbuffered t.conn
go func() {
for {
recv, err := packets.ReadPacket(t.conn)
if err != nil {
t.logger.Println("error in test server reading packet", err)
close(incoming)
return
}
incoming <- recv
}
}()

for {
select {
case <-t.stop:
return
default:
recv, err := packets.ReadPacket(t.conn)
if err != nil {
t.logger.Println("error in test server reading packet", err)
case recv, ok := <-incoming:
if !ok {
return
}
t.logger.Println("test server received a control packet:", recv.PacketType())
Expand Down Expand Up @@ -179,7 +193,7 @@ func (t *TestServer) Run() {
t.logger.Println("test server sending pingresp")
pr := packets.NewControlPacket(packets.PINGRESP)
if _, err := pr.WriteTo(t.conn); err != nil {
t.logger.Println("error writing pingreq", err)
t.logger.Println("error writing pingresp", err)
}
}
}
Expand Down
34 changes: 20 additions & 14 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ type (
Session session.SessionManager
autoCloseSession bool

AuthHandler Auther
PingHandler Pinger
AuthHandler Auther
PingHandler Pinger
defaultPinger bool

// Router - new inbound messages will be passed to the `Route(*packets.Publish)` function.
//
Expand Down Expand Up @@ -199,9 +200,8 @@ func NewClient(conf ClientConfig) *Client {
c.onPublishReceivedTracker = make([]int, len(c.onPublishReceived)) // Must have the same number of elements as onPublishReceived

if c.config.PingHandler == nil {
c.config.PingHandler = DefaultPingerWithCustomFailHandler(func(e error) {
go c.error(e)
})
c.config.defaultPinger = true
c.config.PingHandler = NewDefaultPinger()
}
if c.config.OnClientError == nil {
c.config.OnClientError = func(e error) {}
Expand Down Expand Up @@ -340,15 +340,15 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
c.serverProps.SharedSubAvailable = ca.Properties.SharedSubAvailable
}

if keepalive > 0 { // "Keep Alive value of 0 has the effect of turning off..."
c.debug.Println("received CONNACK, starting PingHandler")
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from ping handler worker")
c.config.PingHandler.Start(c.config.Conn, time.Duration(keepalive)*time.Second)
}()
}
c.debug.Println("received CONNACK, starting PingHandler")
c.workers.Add(1)
go func() {
defer c.workers.Done()
defer c.debug.Println("returning from ping handler worker")
if err := c.config.PingHandler.Run(c.config.Conn, keepalive); err != nil {
go c.error(fmt.Errorf("ping handler error: %w", err))
}
}()

c.debug.Println("starting publish packets loop")
c.workers.Add(1)
Expand Down Expand Up @@ -502,6 +502,7 @@ func (c *Client) incoming() {
go c.error(err)
return
}
c.config.PingHandler.PacketSent()
}
}
case packets.PUBLISH:
Expand Down Expand Up @@ -619,6 +620,7 @@ func (c *Client) Authenticate(ctx context.Context, a *Auth) (*AuthResponse, erro
if _, err := a.Packet().WriteTo(c.config.Conn); err != nil {
return nil, err
}
c.config.PingHandler.PacketSent()

var rp packets.ControlPacket
select {
Expand Down Expand Up @@ -679,6 +681,7 @@ func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) {
// The packet will remain in the session state until `Session` is notified of the disconnection.
return nil, err
}
c.config.PingHandler.PacketSent()

c.debug.Println("waiting for SUBACK")
subCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout)
Expand Down Expand Up @@ -743,6 +746,7 @@ func (c *Client) Unsubscribe(ctx context.Context, u *Unsubscribe) (*Unsuback, er
// The packet will remain in the session state until `Session` is notified of the disconnection.
return nil, err
}
c.config.PingHandler.PacketSent()

unsubCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout)
defer cf()
Expand Down Expand Up @@ -849,6 +853,7 @@ func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOp
go c.error(err)
return nil, err
}
c.config.PingHandler.PacketSent()
return nil, nil
case 1, 2:
return c.publishQoS12(ctx, pb, o)
Expand All @@ -875,6 +880,7 @@ func (c *Client) publishQoS12(ctx context.Context, pb *packets.Publish, o Publis
return nil, ErrNetworkErrorAfterStored // Async send, so we don't wait for the response (may add callbacks in the future to enable user to obtain status)
}
}
c.config.PingHandler.PacketSent()

if o.Method == PublishMethod_AsyncSend {
return nil, nil // Async send, so we don't wait for the response (may add callbacks in the future to enable user to obtain status)
Expand Down
20 changes: 10 additions & 10 deletions paho/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestClientSubscribe(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestClientUnsubscribe(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

Expand Down Expand Up @@ -214,7 +214,7 @@ func TestClientPublishQoS0(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

Expand Down Expand Up @@ -256,7 +256,7 @@ func TestClientPublishQoS1(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

Expand Down Expand Up @@ -301,7 +301,7 @@ func TestClientPublishQoS2(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

Expand Down Expand Up @@ -348,7 +348,7 @@ func TestClientReceiveQoS0(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})
go c.routePublishPackets()
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestClientReceiveQoS1(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})
go c.routePublishPackets()
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestClientReceiveQoS2(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})
go c.routePublishPackets()
Expand Down Expand Up @@ -660,7 +660,7 @@ func TestReceiveServerDisconnect(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

Expand Down Expand Up @@ -701,7 +701,7 @@ func TestAuthenticate(t *testing.T) {
}()
go func() {
defer c.workers.Done()
c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
c.config.PingHandler.Run(c.config.Conn, 30)
}()
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

Expand Down
152 changes: 152 additions & 0 deletions paho/default_pinger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/

package paho

import (
"net"
"testing"
"time"

"github.com/eclipse/paho.golang/packets"
paholog "github.com/eclipse/paho.golang/paho/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDefaultPingerTimeout(t *testing.T) {
fakeServerConn, fakeClientConn := net.Pipe()

go func() {
// keep reading from fakeServerConn and throw away the data
buf := make([]byte, 1024)
for {
_, err := fakeServerConn.Read(buf)
if err != nil {
return
}
}
}()
defer fakeServerConn.Close()

pinger := NewDefaultPinger()
pinger.SetDebug(paholog.NewTestLogger(t, "DefaultPinger:"))

pingResult := make(chan error, 1)
go func() {
pingResult <- pinger.Run(fakeClientConn, 1)
}()
defer pinger.Stop()

select {
case err := <-pingResult:
require.NotNil(t, err)
assert.EqualError(t, err, "PINGRESP timed out")
case <-time.After(10 * time.Second):
t.Error("expected DefaultPinger to detect timeout and return error")
}
}

func TestDefaultPingerSuccess(t *testing.T) {
fakeClientConn, fakeServerConn := net.Pipe()

pinger := NewDefaultPinger()
pinger.SetDebug(paholog.NewTestLogger(t, "DefaultPinger:"))

pingResult := make(chan error, 1)
go func() {
pingResult <- pinger.Run(fakeClientConn, 3)
}()
defer pinger.Stop()

go func() {
// keep reading from fakeServerConn and call PingResp() when a PINGREQ is received
for {
recv, err := packets.ReadPacket(fakeServerConn)
if err != nil {
return
}
if recv.Type == packets.PINGREQ {
pinger.PingResp()
}
}
}()
defer fakeServerConn.Close()

select {
case err := <-pingResult:
t.Errorf("expected DefaultPinger to not return error, got %v", err)
case <-time.After(10 * time.Second):
// PASS
}
}

func TestDefaultPingerPacketSent(t *testing.T) {
fakeClientConn, fakeServerConn := net.Pipe()

pinger := NewDefaultPinger()
pinger.SetDebug(paholog.NewTestLogger(t, "DefaultPinger:"))

pingResult := make(chan error, 1)
go func() {
pingResult <- pinger.Run(fakeClientConn, 3)
}()
defer pinger.Stop()

// keep calling PacketSent() in a goroutine to check that the Pinger avoids sending PINGREQs when not needed
stop := make(chan struct{})
go func() {
for {
select {
case <-stop:
return
default:
}
// keep calling PacketSent()
pinger.PacketSent()
}
}()
defer close(stop)

// keep reading from fakeServerConn and call PingResp() when a PINGREQ is received
// if more than one PINGREQ is received, the test will fail
count := 0
tooManyPingreqs := make(chan struct{})
go func() {
for {
recv, err := packets.ReadPacket(fakeServerConn)
if err != nil {
return
}
if recv.Type == packets.PINGREQ {
count++
pinger.PingResp()
if count > 1 { // we allow the count to be 1 because the first PINGREQ is sent immediately
close(tooManyPingreqs)
}
}
}
}()
defer fakeServerConn.Close()

select {
case <-tooManyPingreqs:
t.Error("expected DefaultPinger to not send PINGREQs when not needed")
case err := <-pingResult:
t.Errorf("expected DefaultPinger to not return error, got %v", err)
case <-time.After(10 * time.Second):
// PASS
}
}
2 changes: 1 addition & 1 deletion paho/packet_ids_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestPackedIdNoExhaustion(t *testing.T) {
c.stop = make(chan struct{})
c.publishPackets = make(chan *packets.Publish)
go c.incoming()
go c.config.PingHandler.Start(c.config.Conn, 30*time.Second)
go c.config.PingHandler.Run(c.config.Conn, 30)
c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{})

for i := 0; i < 70000; i++ {
Expand Down
Loading

0 comments on commit e379c50

Please sign in to comment.