diff --git a/aedes.js b/aedes.js index 029f4298..75d5914d 100644 --- a/aedes.js +++ b/aedes.js @@ -5,14 +5,13 @@ const util = require('util') const parallel = require('fastparallel') const series = require('fastseries') const { v4: uuidv4 } = require('uuid') -const bulk = require('bulk-write-stream') const reusify = require('reusify') const { pipeline } = require('stream') const Packet = require('aedes-packet') const memory = require('aedes-persistence') const mqemitter = require('mqemitter') const Client = require('./lib/client') -const { $SYS_PREFIX } = require('./lib/utils') +const { $SYS_PREFIX, bulk } = require('./lib/utils') module.exports = Aedes.Server = Aedes @@ -102,7 +101,7 @@ function Aedes (opts) { pipeline( that.persistence.streamWill(that.brokers), - bulk.obj(receiveWills), + bulk(receiveWills), function done (err) { if (err) { that.emit('error', err) diff --git a/lib/utils.js b/lib/utils.js index 0f51d51c..ea7c8dbf 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,6 +1,6 @@ 'use strict' -const { Transform } = require('stream') +const { Transform, Writable } = require('stream') function validateTopic (topic, message) { const end = topic.length - 1 @@ -37,8 +37,18 @@ function through (transform) { }) } +function bulk (fn) { + return new Writable({ + objectMode: true, + writev: function (chunks, cb) { + fn(chunks.map(chunk => chunk.chunk), cb) + } + }) +} + module.exports = { validateTopic, through, + bulk, $SYS_PREFIX: '$SYS/' } diff --git a/package.json b/package.json index 45f81873..c682942f 100644 --- a/package.json +++ b/package.json @@ -120,7 +120,6 @@ "dependencies": { "aedes-packet": "^2.3.1", "aedes-persistence": "^8.1.3", - "bulk-write-stream": "^2.0.1", "end-of-stream": "^1.4.4", "fastfall": "^1.5.1", "fastparallel": "^2.4.1",