Skip to content

Commit

Permalink
Merge pull request #6 from GavinDmello/master
Browse files Browse the repository at this point in the history
Saving clientIds against topics
  • Loading branch information
mcollina authored Sep 4, 2016
2 parents c103483 + 858d7de commit 9325c3f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 1 deletion.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ npm install aedes-persistence --save
* <a href="#getWill"><code>instance.<b>getWill()</b></code></a>
* <a href="#delWill"><code>instance.<b>delWill()</b></code></a>
* <a href="#streamWill"><code>instance.<b>streamWill()</b></code></a>

* <a href="#getClientList"><code>instance.<b>getClientList()</b></code></a>
-------------------------------------------------------
<a name="constructor"></a>
### persistence([opts])
Expand Down Expand Up @@ -219,6 +219,13 @@ format:
}
```

-------------------------------------------------------
<a name="getCLientList"></a>
### instance.getClientList(topic)

Returns a stream which has all the clientIds subscribed to the
specified topic

<a name="implement"></a>
## Implement another persistence

Expand Down
45 changes: 45 additions & 0 deletions abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,51 @@ function abstractPersistence (opts) {
})
})

testInstance('get client list after subscriptions', function (t, instance) {
var client1 = { id: 'abcde' }
var client2 = { id: 'efghi' }
var subs = [{
topic: 'helloagain',
qos: 1
}]

instance.addSubscriptions(client1, subs, function (err) {
t.notOk(err, 'no error for client 1')
instance.addSubscriptions(client2, subs, function (err) {
t.notOk(err, 'no error for client 2')
var stream = instance.getClientList(subs[0].topic)
stream.pipe(concat({encoding: 'object'}, function (out) {
t.deepEqual(out, [client1.id, client2.id])
instance.destroy(t.end.bind(t))
}))
})
})
})

testInstance('get client list after an unsubscribe', function (t, instance) {
var client1 = { id: 'abcde' }
var client2 = { id: 'efghi' }
var subs = [{
topic: 'helloagain',
qos: 1
}]

instance.addSubscriptions(client1, subs, function (err) {
t.notOk(err, 'no error for client 1')
instance.addSubscriptions(client2, subs, function (err) {
t.notOk(err, 'no error for client 2')
instance.removeSubscriptions(client2, [subs[0].topic], function (err, reClient) {
t.notOk(err, 'no error for removeSubscriptions')
var stream = instance.getClientList(subs[0].topic)
stream.pipe(concat({encoding: 'object'}, function (out) {
t.deepEqual(out, [client1.id])
instance.destroy(t.end.bind(t))
}))
})
})
})
})

testInstance('QoS 0 subscriptions, restored but not matched', function (t, instance) {
var client = { id: 'abcde' }
var subs = [{
Expand Down
22 changes: 22 additions & 0 deletions persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,28 @@ MemoryPersistence.prototype.streamWill = function (brokers) {
})
}

MemoryPersistence.prototype.getClientList = function (topic) {
var clientSubs = this._subscriptions
var keys = Object.keys(clientSubs)
return from2.obj(function match (size, next) {
var clientKey
while ((clientKey = keys.shift()) != null) {
var subs = clientSubs[clientKey]
var current = 0
while (current < subs.length) {
if (subs[current].topic === topic) {
setImmediate(next, null, subs[current].clientId)
current++
return
}
}
}
if (!clientKey) {
next(null, null)
}
})
}

MemoryPersistence.prototype.destroy = function (cb) {
this._retained = null
if (cb) {
Expand Down

0 comments on commit 9325c3f

Please sign in to comment.