Skip to content

Commit

Permalink
Merge pull request #658 from redboltz/add_remove_outgoing_store
Browse files Browse the repository at this point in the history
Added removeOutgoingStore function.
  • Loading branch information
mcollina authored Jul 31, 2017
2 parents bdebe51 + 4988a48 commit 374667d
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 0 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ See `mqtt help <command>` for the command help.
* <a href="#subscribe"><code>mqtt.Client#<b>subscribe()</b></code></a>
* <a href="#unsubscribe"><code>mqtt.Client#<b>unsubscribe()</b></code></a>
* <a href="#end"><code>mqtt.Client#<b>end()</b></code></a>
* <a href="#removeOutgoingMessage"><code>mqtt.Client#<b>removeOutgoingMessage()</b></code></a>
* <a href="#handleMessage"><code>mqtt.Client#<b>handleMessage()</b></code></a>
* <a href="#connected"><code>mqtt.Client#<b>connected</b></code></a>
* <a href="#reconnecting"><code>mqtt.Client#<b>reconnecting</b></code></a>
Expand Down Expand Up @@ -354,6 +355,17 @@ Close the client, accepts the following options:
* `cb`: will be called when the client is closed. This parameter is
optional.

-------------------------------------------------------
<a name="removeOutgoingMessage"></a>
### mqtt.Client#removeOutgoingMessage(mid)

Remove a message from the outgoingStore.
The outgoing callback will be called withe Error('Message removed') if the message is removed.

After this function is called, the messageId is released and becomes reusable.

* `mid`: The messageId of the message in the outgoingStore.

-------------------------------------------------------
<a name="handleMessage"></a>
### mqtt.Client#handleMessage(packet, callback)
Expand Down
19 changes: 19 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,25 @@ MqttClient.prototype.end = function (force, cb) {
return this
}

/**
* removeOutgoingMessage - remove a message in outgoing store
* the outgoing callback will be called withe Error('Message removed') if the message is removed
*
* @param {Number} mid - messageId to remove message
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.removeOutgoingMessage(client.getLastMessageId());
*/
MqttClient.prototype.removeOutgoingMessage = function (mid) {
var cb = this.outgoing[mid]
delete this.outgoing[mid]
this.outgoingStore.del({messageId: mid}, function () {
cb(new Error('Message removed'))
})
return this
}

/**
* _reconnect - implement reconnection
* @api privateish
Expand Down
66 changes: 66 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,72 @@ module.exports = function (server, config) {
}
})

it('should not resend in-flight QoS 1 removed publish messages from the client', function (done) {
var client = connect({reconnectPeriod: 200})
var clientCalledBack = false

server.once('client', function (serverClient) {
serverClient.on('connect', function () {
setImmediate(function () {
serverClient.stream.destroy()
})
})

server.once('client', function (serverClientNew) {
serverClientNew.on('publish', function () {
should.fail()
done()
})
})
})

client.publish('hello', 'world', { qos: 1 }, function (err) {
clientCalledBack = true
should(err.message).be.equal('Message removed')
})
should(Object.keys(client.outgoing).length).be.equal(1)
should(Object.keys(client.outgoingStore._inflights).length).be.equal(1)
client.removeOutgoingMessage(client.getLastMessageId())
should(Object.keys(client.outgoing).length).be.equal(0)
should(Object.keys(client.outgoingStore._inflights).length).be.equal(0)
clientCalledBack.should.be.true()
client.end()
done()
})

it('should not resend in-flight QoS 2 removed publish messages from the client', function (done) {
var client = connect({reconnectPeriod: 200})
var clientCalledBack = false

server.once('client', function (serverClient) {
serverClient.on('connect', function () {
setImmediate(function () {
serverClient.stream.destroy()
})
})

server.once('client', function (serverClientNew) {
serverClientNew.on('publish', function () {
should.fail()
done()
})
})
})

client.publish('hello', 'world', { qos: 2 }, function (err) {
clientCalledBack = true
should(err.message).be.equal('Message removed')
})
should(Object.keys(client.outgoing).length).be.equal(1)
should(Object.keys(client.outgoingStore._inflights).length).be.equal(1)
client.removeOutgoingMessage(client.getLastMessageId())
should(Object.keys(client.outgoing).length).be.equal(0)
should(Object.keys(client.outgoingStore._inflights).length).be.equal(0)
clientCalledBack.should.be.true()
client.end()
done()
})

it('should resubscribe when reconnecting', function (done) {
var client = connect({ reconnectPeriod: 100 })
var tryReconnect = true
Expand Down
12 changes: 12 additions & 0 deletions types/lib/client.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ export declare class MqttClient extends events.EventEmitter {
*/
public end (force?: boolean, cb?: CloseCallback): this

/**
* removeOutgoingMessage - remove a message in outgoing store
* the outgoing callback will be called withe Error('Message removed') if the message is removed
*
* @param {Number} mid - messageId to remove message
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.removeOutgoingMessage(client.getLastMessageId());
*/
public removeOutgoingMessage (mid: number): this

/**
* Handle messages with backpressure support, one at a time.
* Override at will.
Expand Down

0 comments on commit 374667d

Please sign in to comment.