-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New pinger implementation #222
Changes from 2 commits
8d29f4e
a6d2894
06f9be5
b5a4814
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,8 +70,9 @@ type ( | |
Session session.SessionManager | ||
autoCloseSession bool | ||
|
||
AuthHandler Auther | ||
PingHandler Pinger | ||
AuthHandler Auther | ||
PingHandler Pinger | ||
defaultPinger bool | ||
|
||
// Router - new inbound messages will be passed to the `Route(*packets.Publish)` function. | ||
// | ||
|
@@ -199,9 +200,8 @@ func NewClient(conf ClientConfig) *Client { | |
c.onPublishReceivedTracker = make([]int, len(c.onPublishReceived)) // Must have the same number of elements as onPublishReceived | ||
|
||
if c.config.PingHandler == nil { | ||
c.config.PingHandler = DefaultPingerWithCustomFailHandler(func(e error) { | ||
go c.error(e) | ||
}) | ||
c.config.defaultPinger = true | ||
c.config.PingHandler = NewDefaultPinger() | ||
} | ||
if c.config.OnClientError == nil { | ||
c.config.OnClientError = func(e error) {} | ||
|
@@ -340,15 +340,15 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { | |
c.serverProps.SharedSubAvailable = ca.Properties.SharedSubAvailable | ||
} | ||
|
||
if keepalive > 0 { // "Keep Alive value of 0 has the effect of turning off..." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the keepalive check as the Pinger Run() method is supposed to return immediately if given a keepalive of 0, so this check is unnecessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense - the check was put there because the previous pinger panicked on a 0 keepalive. |
||
c.debug.Println("received CONNACK, starting PingHandler") | ||
c.workers.Add(1) | ||
go func() { | ||
defer c.workers.Done() | ||
defer c.debug.Println("returning from ping handler worker") | ||
c.config.PingHandler.Start(c.config.Conn, time.Duration(keepalive)*time.Second) | ||
}() | ||
} | ||
c.debug.Println("received CONNACK, starting PingHandler") | ||
c.workers.Add(1) | ||
go func() { | ||
defer c.workers.Done() | ||
defer c.debug.Println("returning from ping handler worker") | ||
if err := c.config.PingHandler.Run(c.config.Conn, keepalive); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this approach - really easy to see what is happening. |
||
go c.error(fmt.Errorf("ping handler error: %w", err)) | ||
} | ||
}() | ||
|
||
c.debug.Println("starting publish packets loop") | ||
c.workers.Add(1) | ||
|
@@ -502,6 +502,7 @@ func (c *Client) incoming() { | |
go c.error(err) | ||
return | ||
} | ||
c.config.PingHandler.PacketSent() | ||
} | ||
} | ||
case packets.PUBLISH: | ||
|
@@ -619,6 +620,7 @@ func (c *Client) Authenticate(ctx context.Context, a *Auth) (*AuthResponse, erro | |
if _, err := a.Packet().WriteTo(c.config.Conn); err != nil { | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
var rp packets.ControlPacket | ||
select { | ||
|
@@ -679,6 +681,7 @@ func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) { | |
// The packet will remain in the session state until `Session` is notified of the disconnection. | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
c.debug.Println("waiting for SUBACK") | ||
subCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) | ||
|
@@ -743,6 +746,7 @@ func (c *Client) Unsubscribe(ctx context.Context, u *Unsubscribe) (*Unsuback, er | |
// The packet will remain in the session state until `Session` is notified of the disconnection. | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
unsubCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) | ||
defer cf() | ||
|
@@ -849,6 +853,7 @@ func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOp | |
go c.error(err) | ||
return nil, err | ||
} | ||
c.config.PingHandler.PacketSent() | ||
return nil, nil | ||
case 1, 2: | ||
return c.publishQoS12(ctx, pb, o) | ||
|
@@ -875,6 +880,7 @@ func (c *Client) publishQoS12(ctx context.Context, pb *packets.Publish, o Publis | |
return nil, ErrNetworkErrorAfterStored // Async send, so we don't wait for the response (may add callbacks in the future to enable user to obtain status) | ||
} | ||
} | ||
c.config.PingHandler.PacketSent() | ||
|
||
if o.Method == PublishMethod_AsyncSend { | ||
return nil, nil // Async send, so we don't wait for the response (may add callbacks in the future to enable user to obtain status) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? (it's set but never read; I suspect it was used in a previous iteration?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. This was supposed to be used to determine whether the client's
SetDebugLogger()
should callSetDebug()
on the pinger. So right now, thePinger
s logger will won't be set if the user is relying on the default one. This would be a very small fix, so could be tacked on to some other PR.