Skip to content

Commit

Permalink
Upgrade aedes-persistence to 9.0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousme committed May 7, 2022
1 parent 60b8648 commit 1f407fa
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 77 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"tsd": "^0.14.0"
},
"dependencies": {
"aedes-persistence": "^8.1.3",
"aedes-persistence": "^9.0.3",
"fastparallel": "^2.4.0",
"multistream": "^4.0.1",
"qlobber": "^5.0.3"
Expand Down
117 changes: 41 additions & 76 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,91 +1,56 @@
'use strict'

const test = require('tape').test
const CachedPersistence = require('./')
const util = require('util')
const Memory = require('aedes-persistence')
const abs = require('./abstract')

function MyPersistence () {
if (!(this instanceof MyPersistence)) {
return new MyPersistence()
class MyPersistence extends CachedPersistence {
constructor (opts) {
super(opts)
this.backend = opts.backend

// link methods
const methods = ['storeRetained', 'countOffline', 'outgoingEnqueue',
'outgoingUpdate', 'outgoingClearMessageId',
'incomingStorePacket', 'incomingGetPacket',
'incomingDelPacket', 'delWill',
'createRetainedStream',
'outgoingStream', 'subscriptionsByClient',
'getWill', 'streamWill', 'getClientList', 'destroy']
methods.forEach((key) => {
this[key] = this.backend[key].bind(this.backend)
})
// putWill is a special because it needs this.broker.id
this.putWill = (client, packet, cb) => {
this.backend.broker = this.broker
this.backend.putWill(client, packet, cb)
}
}

// copied from Memory
this._retained = []
this._subscriptions = new Map()
this._clientsCount = 0
this._outgoing = {}
this._incoming = {}
this._wills = {}

CachedPersistence.call(this)
}

util.inherits(MyPersistence, CachedPersistence)

// copy over methods
;['storeRetained', 'countOffline', 'outgoingEnqueue',
'outgoingUpdate', 'outgoingClearMessageId',
'incomingStorePacket', 'incomingGetPacket',
'incomingDelPacket', 'putWill', 'delWill',
'createRetainedStream',
'outgoingStream', 'subscriptionsByClient',
'getWill', 'streamWill', 'getClientList', 'destroy'].forEach(function (key) {
MyPersistence.prototype[key] = Memory.prototype[key]
})

MyPersistence.prototype.addSubscriptions = function (client, subs, cb) {
let stored = this._subscriptions.get(client.id)

if (!stored) {
stored = new Map()
this._subscriptions.set(client.id, stored)
this._clientsCount++
addSubscriptions (client, subs, cb) {
this.backend.addSubscriptions(client, subs, (err) => {
if (err) {
return cb(err)
}
this._addedSubscriptions(client, subs, cb)
})
}

const subsObjs = subs.map(function mapSub (sub) {
stored.set(sub.topic, sub.qos)
return {
clientId: client.id,
topic: sub.topic,
qos: sub.qos
}
})

this._addedSubscriptions(client, subsObjs, cb)
}

MyPersistence.prototype.removeSubscriptions = function (client, subs, cb) {
const stored = this._subscriptions.get(client.id)
const removed = []

if (stored) {
for (let i = 0; i < subs.length; i += 1) {
const topic = subs[i]
const qos = stored.get(topic)
if (qos !== undefined) {
if (qos > 0) {
removed.push({
clientId: client.id,
topic: topic,
qos: qos
})
}
stored.delete(topic)
removeSubscriptions (client, topics, cb) {
this.backend.removeSubscriptions(client, topics, (err) => {
if (err) {
return cb(err)
}
}

if (stored.size === 0) {
this._clientsCount--
this._subscriptions.delete(client.id)
}
const subsObjs = topics.map(function mapSub (topic) {
return { topic }
})
this._removedSubscriptions(client, subsObjs, cb)
})
}

this._removedSubscriptions(client, removed, cb)
}

const persistence = () => new MyPersistence({ backend: Memory() })

abs({
test: test,
persistence: MyPersistence
test,
persistence
})

0 comments on commit 1f407fa

Please sign in to comment.