-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New pinger implementation #222
Changes from all commits
8d29f4e
a6d2894
06f9be5
b5a4814
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,8 +70,9 @@ type ( | |
Session session.SessionManager | ||
autoCloseSession bool | ||
|
||
AuthHandler Auther | ||
PingHandler Pinger | ||
AuthHandler Auther | ||
PingHandler Pinger | ||
defaultPinger bool | ||
|
||
// Router - new inbound messages will be passed to the `Route(*packets.Publish)` function. | ||
// | ||
|
@@ -199,9 +200,8 @@ func NewClient(conf ClientConfig) *Client { | |
c.onPublishReceivedTracker = make([]int, len(c.onPublishReceived)) // Must have the same number of elements as onPublishReceived | ||
|
||
if c.config.PingHandler == nil { | ||
c.config.PingHandler = DefaultPingerWithCustomFailHandler(func(e error) { | ||
go c.error(e) | ||
}) | ||
c.config.defaultPinger = true | ||
c.config.PingHandler = NewDefaultPinger() | ||
} | ||
if c.config.OnClientError == nil { | ||
c.config.OnClientError = func(e error) {} | ||
|
@@ -340,15 +340,15 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { | |
c.serverProps.SharedSubAvailable = ca.Properties.SharedSubAvailable | ||
} | ||
|
||
if keepalive > 0 { // "Keep Alive value of 0 has the effect of turning off..." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the keepalive check as the Pinger Run() method is supposed to return immediately if given a keepalive of 0, so this check is unnecessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense - the check was put there because the previous pinger panicked on a 0 keepalive. |
||
c.debug.Println("received CONNACK, starting PingHandler") | ||
c.workers.Add(1) | ||
go func() { | ||
defer c.workers.Done() | ||
defer c.debug.Println("returning from ping handler worker") | ||
c.config.PingHandler.Start(c.config.Conn, time.Duration(keepalive)*time.Second) | ||
}() | ||
} | ||
c.debug.Println("received CONNACK, starting PingHandler") | ||
c.workers.Add(1) | ||
go func() { | ||
defer c.workers.Done() | ||
defer c.debug.Println("returning from ping handler worker") | ||
if err := c.config.PingHandler.Run(c.config.Conn, keepalive); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this approach - really easy to see what is happening. |
||
go c.error(fmt.Errorf("ping handler error: %w", err)) | ||
} | ||
}() | ||
|
||
c.debug.Println("starting publish packets loop") | ||
c.workers.Add(1) | ||
|
@@ -502,6 +502,7 @@ func (c *Client) incoming() { | |
go c.error(err) | ||
return | ||
} | ||
c.config.PingHandler.PacketSent() | ||
} | ||
} | ||
case packets.PUBLISH: | ||
|
@@ -619,6 +620,7 @@ func (c *Client) Authenticate(ctx context.Context, a *Auth) (*AuthResponse, erro | |
if _, err := a.Packet().WriteTo(c.config.Conn); err != nil { | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
var rp packets.ControlPacket | ||
select { | ||
|
@@ -679,6 +681,7 @@ func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) { | |
// The packet will remain in the session state until `Session` is notified of the disconnection. | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
c.debug.Println("waiting for SUBACK") | ||
subCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) | ||
|
@@ -743,6 +746,7 @@ func (c *Client) Unsubscribe(ctx context.Context, u *Unsubscribe) (*Unsuback, er | |
// The packet will remain in the session state until `Session` is notified of the disconnection. | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
unsubCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) | ||
defer cf() | ||
|
@@ -849,6 +853,7 @@ func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOp | |
go c.error(err) | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
return nil, nil | ||
case 1, 2: | ||
return c.publishQoS12(ctx, pb, o) | ||
|
@@ -875,6 +880,7 @@ func (c *Client) publishQoS12(ctx context.Context, pb *packets.Publish, o Publis | |
return nil, ErrNetworkErrorAfterStored // Async send, so we don't wait for the response (may add callbacks in the future to enable user to obtain status) | ||
} | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
if o.Method == PublishMethod_AsyncSend { | ||
return nil, nil // Async send, so we don't wait for the response (may add callbacks in the future to enable user to obtain status) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* Copyright (c) 2024 Contributors to the Eclipse Foundation | ||
* | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v2.0 | ||
* and Eclipse Distribution License v1.0 which accompany this distribution. | ||
* | ||
* The Eclipse Public License is available at | ||
* https://www.eclipse.org/legal/epl-2.0/ | ||
* and the Eclipse Distribution License is available at | ||
* http://www.eclipse.org/org/documents/edl-v10.php. | ||
* | ||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause | ||
*/ | ||
|
||
package paho | ||
|
||
import ( | ||
"net" | ||
"testing" | ||
"time" | ||
|
||
"github.com/eclipse/paho.golang/packets" | ||
paholog "github.com/eclipse/paho.golang/paho/log" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestDefaultPingerTimeout(t *testing.T) { | ||
fakeServerConn, fakeClientConn := net.Pipe() | ||
|
||
go func() { | ||
// keep reading from fakeServerConn and throw away the data | ||
buf := make([]byte, 1024) | ||
for { | ||
_, err := fakeServerConn.Read(buf) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
}() | ||
defer fakeServerConn.Close() | ||
|
||
pinger := NewDefaultPinger() | ||
pinger.SetDebug(paholog.NewTestLogger(t, "DefaultPinger:")) | ||
|
||
pingResult := make(chan error, 1) | ||
go func() { | ||
pingResult <- pinger.Run(fakeClientConn, 1) | ||
}() | ||
defer pinger.Stop() | ||
|
||
select { | ||
case err := <-pingResult: | ||
require.NotNil(t, err) | ||
assert.EqualError(t, err, "PINGRESP timed out") | ||
case <-time.After(10 * time.Second): | ||
t.Error("expected DefaultPinger to detect timeout and return error") | ||
} | ||
} | ||
|
||
func TestDefaultPingerSuccess(t *testing.T) { | ||
fakeClientConn, fakeServerConn := net.Pipe() | ||
|
||
pinger := NewDefaultPinger() | ||
pinger.SetDebug(paholog.NewTestLogger(t, "DefaultPinger:")) | ||
|
||
pingResult := make(chan error, 1) | ||
go func() { | ||
pingResult <- pinger.Run(fakeClientConn, 3) | ||
}() | ||
defer pinger.Stop() | ||
|
||
go func() { | ||
// keep reading from fakeServerConn and call PingResp() when a PINGREQ is received | ||
for { | ||
recv, err := packets.ReadPacket(fakeServerConn) | ||
if err != nil { | ||
return | ||
} | ||
if recv.Type == packets.PINGREQ { | ||
pinger.PingResp() | ||
} | ||
} | ||
}() | ||
defer fakeServerConn.Close() | ||
|
||
select { | ||
case err := <-pingResult: | ||
t.Errorf("expected DefaultPinger to not return error, got %v", err) | ||
case <-time.After(10 * time.Second): | ||
// PASS | ||
} | ||
} | ||
|
||
func TestDefaultPingerPacketSent(t *testing.T) { | ||
fakeClientConn, fakeServerConn := net.Pipe() | ||
|
||
pinger := NewDefaultPinger() | ||
pinger.SetDebug(paholog.NewTestLogger(t, "DefaultPinger:")) | ||
|
||
pingResult := make(chan error, 1) | ||
go func() { | ||
pingResult <- pinger.Run(fakeClientConn, 3) | ||
}() | ||
defer pinger.Stop() | ||
|
||
// keep calling PacketSent() in a goroutine to check that the Pinger avoids sending PINGREQs when not needed | ||
stop := make(chan struct{}) | ||
go func() { | ||
for { | ||
select { | ||
case <-stop: | ||
return | ||
default: | ||
} | ||
// keep calling PacketSent() | ||
pinger.PacketSent() | ||
} | ||
}() | ||
defer close(stop) | ||
|
||
// keep reading from fakeServerConn and call PingResp() when a PINGREQ is received | ||
// if more than one PINGREQ is received, the test will fail | ||
count := 0 | ||
tooManyPingreqs := make(chan struct{}) | ||
go func() { | ||
for { | ||
recv, err := packets.ReadPacket(fakeServerConn) | ||
if err != nil { | ||
return | ||
} | ||
if recv.Type == packets.PINGREQ { | ||
count++ | ||
pinger.PingResp() | ||
if count > 1 { // we allow the count to be 1 because the first PINGREQ is sent immediately | ||
close(tooManyPingreqs) | ||
} | ||
} | ||
} | ||
}() | ||
defer fakeServerConn.Close() | ||
|
||
select { | ||
case <-tooManyPingreqs: | ||
t.Error("expected DefaultPinger to not send PINGREQs when not needed") | ||
case err := <-pingResult: | ||
t.Errorf("expected DefaultPinger to not return error, got %v", err) | ||
case <-time.After(10 * time.Second): | ||
// PASS | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? (it's set but never read; I suspect it was used in a previous iteration?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. This was supposed to be used to determine whether the client's
SetDebugLogger()
should callSetDebug()
on the pinger. So right now, thePinger
s logger will won't be set if the user is relying on the default one. This would be a very small fix, so could be tacked on to some other PR.