From c45d2066d25f66cd72d907a50b6713ffd1ad7dca Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Thu, 15 Sep 2022 14:20:33 +0200 Subject: [PATCH] fix: avoid to lose items --- packages/analytics/src/expand.js | 27 ++++++--------------------- packages/analytics/test/expand.js | 7 ++++--- 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/packages/analytics/src/expand.js b/packages/analytics/src/expand.js index ffd390b9..3787171d 100644 --- a/packages/analytics/src/expand.js +++ b/packages/analytics/src/expand.js @@ -32,8 +32,6 @@ async function mergeWith(data, feed) { const { store, cachePath, - stack, - bufferID, } = this.getEnv(); const { id, value } = data; const path = this.getParam('path'); @@ -47,7 +45,6 @@ async function mergeWith(data, feed) { if (cachePath && source) { await cachePut(cachePath, source, value); } - delete stack[bufferID][id]; set(obj, path, value); return feed.send(obj); } catch (e) { @@ -60,12 +57,9 @@ async function drainWith(data, feed) { if (this.isLast()) { const { store, - stack, - bufferID, } = this.getEnv(); - return each( - Object.keys(stack[bufferID]), + Object.keys(store), async (cur, next) => { let obj; try { @@ -163,43 +157,35 @@ export default async function expand(data, feed) { } if (!this.buffer2stream) { - this.buffer2stream = (bufferID) => { + this.buffer2stream = () => { const statements = this.createStatements(); const stream = ezs.createStream(ezs.objectMode()); const output = ezs.createPipeline(stream, statements) .pipe(ezs(mergeWith, { path }, { store: this.store, cachePath: this.cachePath, - stack: this.stack, - bufferID, })) .pipe(ezs(drainWith, { path }, { store: this.store, cachePath: this.cachePath, - stack: this.stack, - bufferID, - })); + })) + ; const input = Array.from(this.buffer); this.buffer = []; - this.bufferID += 1; - this.stack[this.bufferID] = {}; each(input, (cur, next) => ezs.writeTo(stream, cur, next), () => stream.end()); return output; }; } if (!this.buffer) { - this.stack = []; this.buffer = []; - this.bufferID = 0; - this.stack[this.bufferID] = {}; } // Processing if (this.isLast()) { if (this.buffer && this.buffer.length > 0) { - const strm = this.buffer2stream(this.bufferID); + const strm = this.buffer2stream(); this.flows.push(feed.flow(strm)); } await Promise.all(this.flows); @@ -222,10 +208,9 @@ export default async function expand(data, feed) { this.store[id] = data; - this.stack[this.bufferID][id] = true; this.buffer.push(core(id, value)); if (this.buffer.length >= size) { - const strm = this.buffer2stream(this.bufferID); + const strm = this.buffer2stream(); return this.flows.push(feed.flow(strm)); } return feed.end(); diff --git a/packages/analytics/test/expand.js b/packages/analytics/test/expand.js index f76de5ce..3df268dd 100644 --- a/packages/analytics/test/expand.js +++ b/packages/analytics/test/expand.js @@ -633,12 +633,13 @@ test('with a script that loses some items', (done) => { output.push(chunk); }) .on('end', () => { - expect(output.length).toEqual(5); + expect(output.length).toEqual(6); expect(output[0].b).toEqual('A'); expect(output[1].b).toEqual('B'); expect(output[2].b).toEqual('D'); - expect(output[3].b).toEqual('E'); - expect(output[4].b).toEqual('F'); + expect(output[3].b).toEqual('c'); + expect(output[4].b).toEqual('E'); + expect(output[5].b).toEqual('F'); done(); }); });