diff --git a/abstract.js b/abstract.js index a6a58fe..6b835ed 100644 --- a/abstract.js +++ b/abstract.js @@ -70,7 +70,7 @@ function abstractPersistence (opts) { async function streamForEach (stream, fn) { for await (const item of iterableStream(stream)) { - fn(item) + await fn(item) } } @@ -936,12 +936,15 @@ function abstractPersistence (opts) { enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => { const stream = instance.outgoingStream(client) - function clearQueue (data) { - instance.outgoingUpdate(client, data, - (err, client, packet) => { - t.notOk(err, 'no error') - queue.push(packet) - }) + async function clearQueue (data) { + return new Promise((resolve, reject) => { + instance.outgoingUpdate(client, data, + (err, client, packet) => { + t.notOk(err, 'no error') + queue.push(packet) + resolve() + }) + }) } streamForEach(stream, clearQueue).then(function done () { t.equal(queue.length, 2) @@ -1162,9 +1165,7 @@ function abstractPersistence (opts) { qos: 1, dup: false, length: 14, - retain: false, - brokerId: instance.broker.id, - brokerCounter: 42 + retain: false } function outStream (instance, client) { @@ -1175,9 +1176,26 @@ function abstractPersistence (opts) { const stream = outStream(instance, client) const total = stream.readableHighWaterMark * 2 - async function submitMessage (id) { + function submitMessage (id) { + return new Promise((resolve, reject) => { + const p = new Packet(packet, instance.broker) + p.messageId = id + instance.outgoingEnqueue(sub, p, (err) => { + if (err) { + return reject(err) + } + instance.outgoingUpdate(client, p, resolve) + }) + }) + } + + function clearMessage (p) { return new Promise((resolve, reject) => { - enqueueAndUpdate(t, instance, client, sub, packet, id, resolve) + instance.outgoingClearMessageId(client, p, (err, received) => { + t.error(err) + t.deepEqual(received, p, 'must return the packet') + resolve() + }) }) } @@ -1194,10 +1212,7 @@ function abstractPersistence (opts) { t.equal(queued, total, `outgoing queue must hold ${total} items`) for await (const p of outStream(instance, client)) { - instance.outgoingClearMessageId(client, p, (err, received) => { - t.error(err) - t.deepEqual(received, p, 'must return the packet') - }) + await clearMessage(p) } let queued2 = 0