Skip to content

Commit

Permalink
feat: mongodb 4 support (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando authored Apr 7, 2022
1 parent b09f8bc commit 610878b
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [10.x, 12.x, 13.x]
node-version: [12.x, 14.x, 16.x]

steps:
- uses: actions/checkout@v1
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@
"concat-stream": "^2.0.0",
"faucet": "0.0.1",
"license-checker": "^25.0.1",
"mongo-clean": "^2.0.0",
"mqemitter-mongodb": "^7.0.1",
"mqemitter-mongodb": "^8.1.0",
"nyc": "^15.0.1",
"pre-commit": "^1.2.2",
"release-it": "^14.0.3",
Expand All @@ -67,7 +66,7 @@
"aedes-cached-persistence": "^8.1.1",
"escape-string-regexp": "^4.0.0",
"fastparallel": "^2.3.0",
"mongodb": "^3.6.0",
"mongodb": "^4.5.0",
"native-url": "^0.3.1",
"pump": "^3.0.0",
"qlobber": "^5.0.0",
Expand Down
23 changes: 14 additions & 9 deletions persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const qlobberOpts = {
match_empty_levels: true
}

function toStream (op) {
return op.stream ? op.stream() : op
}

function MongoPersistence (opts) {
if (!(this instanceof MongoPersistence)) {
return new MongoPersistence(opts)
Expand Down Expand Up @@ -102,9 +106,9 @@ MongoPersistence.prototype._setup = function () {

function initCollections () {
function finishInit () {
subscriptions.find({
toStream(subscriptions.find({
qos: { $gte: 0 }
}).on('data', function (chunk) {
})).on('data', function (chunk) {
that._trie.add(chunk.topic, chunk)
}).on('end', function () {
that.emit('ready')
Expand Down Expand Up @@ -312,9 +316,9 @@ MongoPersistence.prototype.createRetainedStreamCombi = function (patterns) {
regex = regex.join('|')

return pump(
this._cl.retained.find({
toStream(this._cl.retained.find({
topic: new RegExp(regex)
}),
})),
instance
)
}
Expand Down Expand Up @@ -357,6 +361,7 @@ MongoPersistence.prototype.addSubscriptions = function (client, subs, cb) {
function finish (err) {
errored = err
published++
console.log('published', published)
if (published === 2) {
cb(errored, client)
}
Expand Down Expand Up @@ -428,11 +433,11 @@ MongoPersistence.prototype.subscriptionsByClient = function (client, cb) {
MongoPersistence.prototype.countOffline = function (cb) {
var clientsCount = 0
var that = this
this._cl.subscriptions.aggregate([{
toStream(this._cl.subscriptions.aggregate([{
$group: {
_id: '$clientId'
}
}]).on('data', function () {
}])).on('data', function () {
clientsCount++
}).on('end', function () {
cb(null, that._trie.subscriptionsCount, clientsCount)
Expand Down Expand Up @@ -506,7 +511,7 @@ function asPacket (obj, enc, cb) {

MongoPersistence.prototype.outgoingStream = function (client) {
return pump(
this._cl.outgoing.find({ clientId: client.id }),
toStream(this._cl.outgoing.find({ clientId: client.id })),
through.obj(asPacket))
}

Expand Down Expand Up @@ -697,7 +702,7 @@ MongoPersistence.prototype.streamWill = function (brokers) {
if (brokers) {
query['packet.brokerId'] = { $nin: Object.keys(brokers) }
}
return pump(this._cl.will.find(query), through.obj(asPacket))
return pump(toStream(this._cl.will.find(query)), through.obj(asPacket))
}

MongoPersistence.prototype.getClientList = function (topic) {
Expand All @@ -707,7 +712,7 @@ MongoPersistence.prototype.getClientList = function (topic) {
query.topic = topic
}

return pump(this._cl.subscriptions.find(query), through.obj(function asPacket (obj, enc, cb) {
return pump(toStream(this._cl.subscriptions.find(query)), through.obj(function asPacket (obj, enc, cb) {
this.push(obj.clientId)
cb()
}))
Expand Down
44 changes: 27 additions & 17 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ var persistence = require('./')
var MongoClient = require('mongodb').MongoClient
var abs = require('aedes-cached-persistence/abstract')
var mqemitterMongo = require('mqemitter-mongodb')
var clean = require('mongo-clean')
var dbname = 'aedes-test'
var mongourl = 'mongodb://127.0.0.1/' + dbname
var cleanopts = {
action: 'deleteMany'
}
let clean = null

MongoClient.connect(mongourl, { useNewUrlParser: true, useUnifiedTopology: true, w: 1 }, function (err, client) {
if (err) {
Expand All @@ -19,13 +16,26 @@ MongoClient.connect(mongourl, { useNewUrlParser: true, useUnifiedTopology: true,

var db = client.db(dbname)

const collections = [
db.collection('subscriptions'),
db.collection('retained'),
db.collection('will'),
db.collection('outgoing'),
db.collection('incoming')
]

clean = async (cb) => {
await Promise.all(collections.map((c) => c.deleteMany({})))
cb()
}

// set ttl task to run every 2 seconds
db.executeDbAdminCommand({ setParameter: 1, ttlMonitorSleepSecs: 2 }, function (err) {
db.admin().command({ setParameter: 1, ttlMonitorSleepSecs: 2 }, function (err) {
if (err) {
throw err
}

clean(db, cleanopts, function (err, db) {
clean(function (err) {
if (err) {
throw err
}
Expand Down Expand Up @@ -53,7 +63,7 @@ function runTest (client, db) {
return emitter
},
persistence: function build (cb) {
clean(db, cleanopts, function (err) {
clean(function (err) {
if (err) {
return cb(err)
}
Expand Down Expand Up @@ -88,7 +98,7 @@ function runTest (client, db) {
test('multiple persistences', function (t) {
t.plan(12)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dbopts)
Expand Down Expand Up @@ -167,7 +177,7 @@ function runTest (client, db) {
test('multiple persistences with passed db object and url', function (t) {
t.plan(12)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dboptsWithDbObjectAndUrl)
Expand Down Expand Up @@ -245,7 +255,7 @@ function runTest (client, db) {
test('multiple persistences with passed only db object', function (t) {
t.plan(12)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dboptsWithOnlyDbObject)
Expand Down Expand Up @@ -319,7 +329,7 @@ function runTest (client, db) {
test('qos 0 subs restoration', function (t) {
t.plan(10)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

var emitter = mqemitterMongo(dbopts)
Expand Down Expand Up @@ -365,7 +375,7 @@ function runTest (client, db) {
})

test('look up for expire after seconds index', function (t) {
clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
Expand Down Expand Up @@ -416,7 +426,7 @@ function runTest (client, db) {
})

test('look up for query indexes', function (t) {
clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
Expand Down Expand Up @@ -471,7 +481,7 @@ function runTest (client, db) {
})

test('look up for packet with added property', function (t) {
clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
Expand Down Expand Up @@ -541,7 +551,7 @@ function runTest (client, db) {
})
}

clean(db, cleanopts, function (err) {
clean(function (err) {
t.notOk(err, 'no error')

dbopts.ttl = {
Expand Down Expand Up @@ -591,7 +601,7 @@ function runTest (client, db) {
test('look up for expired packets', function (t) {
t.plan(17)

clean(db, cleanopts, function (err) {
clean(function (err) {
t.error(err)

dbopts.ttl = {
Expand Down Expand Up @@ -669,7 +679,7 @@ function runTest (client, db) {
var dboptsWithUrlMongoOptions = {
url: mongourl,
mongoOptions: {
appname: 'TEST'
raw: true // must be a valid mongo option
}
}

Expand Down

0 comments on commit 610878b

Please sign in to comment.