Skip to content

Commit

Permalink
fix: avoid to lose items
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Sep 15, 2022
1 parent 51729db commit c45d206
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 24 deletions.
27 changes: 6 additions & 21 deletions packages/analytics/src/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ async function mergeWith(data, feed) {
const {
store,
cachePath,
stack,
bufferID,
} = this.getEnv();
const { id, value } = data;
const path = this.getParam('path');
Expand All @@ -47,7 +45,6 @@ async function mergeWith(data, feed) {
if (cachePath && source) {
await cachePut(cachePath, source, value);
}
delete stack[bufferID][id];
set(obj, path, value);
return feed.send(obj);
} catch (e) {
Expand All @@ -60,12 +57,9 @@ async function drainWith(data, feed) {
if (this.isLast()) {
const {
store,
stack,
bufferID,
} = this.getEnv();

return each(
Object.keys(stack[bufferID]),
Object.keys(store),
async (cur, next) => {
let obj;
try {
Expand Down Expand Up @@ -163,43 +157,35 @@ export default async function expand(data, feed) {
}

if (!this.buffer2stream) {
this.buffer2stream = (bufferID) => {
this.buffer2stream = () => {
const statements = this.createStatements();
const stream = ezs.createStream(ezs.objectMode());
const output = ezs.createPipeline(stream, statements)
.pipe(ezs(mergeWith, { path }, {
store: this.store,
cachePath: this.cachePath,
stack: this.stack,
bufferID,
}))
.pipe(ezs(drainWith, { path }, {
store: this.store,
cachePath: this.cachePath,
stack: this.stack,
bufferID,
}));
}))
;
const input = Array.from(this.buffer);
this.buffer = [];
this.bufferID += 1;
this.stack[this.bufferID] = {};

each(input, (cur, next) => ezs.writeTo(stream, cur, next), () => stream.end());
return output;
};
}
if (!this.buffer) {
this.stack = [];
this.buffer = [];
this.bufferID = 0;
this.stack[this.bufferID] = {};
}

// Processing

if (this.isLast()) {
if (this.buffer && this.buffer.length > 0) {
const strm = this.buffer2stream(this.bufferID);
const strm = this.buffer2stream();
this.flows.push(feed.flow(strm));
}
await Promise.all(this.flows);
Expand All @@ -222,10 +208,9 @@ export default async function expand(data, feed) {

this.store[id] = data;

this.stack[this.bufferID][id] = true;
this.buffer.push(core(id, value));
if (this.buffer.length >= size) {
const strm = this.buffer2stream(this.bufferID);
const strm = this.buffer2stream();
return this.flows.push(feed.flow(strm));
}
return feed.end();
Expand Down
7 changes: 4 additions & 3 deletions packages/analytics/test/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,13 @@ test('with a script that loses some items', (done) => {
output.push(chunk);
})
.on('end', () => {
expect(output.length).toEqual(5);
expect(output.length).toEqual(6);
expect(output[0].b).toEqual('A');
expect(output[1].b).toEqual('B');
expect(output[2].b).toEqual('D');
expect(output[3].b).toEqual('E');
expect(output[4].b).toEqual('F');
expect(output[3].b).toEqual('c');
expect(output[4].b).toEqual('E');
expect(output[5].b).toEqual('F');
done();
});
});
Expand Down

0 comments on commit c45d206

Please sign in to comment.