Skip to content

Commit

Permalink
fix: 🐛 erratic error
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Apr 1, 2022
1 parent 0ccde3c commit cf15306
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 65 deletions.
69 changes: 45 additions & 24 deletions packages/analytics/src/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,41 @@ async function mergeWith(data, feed) {
set(obj, path, value);
return feed.send(obj);
} catch (e) {
return feed.send(e);
return feed.stop(e);
}
}

async function drainWith(data, feed) {
if (this.isLast()) {
const {
store,
stack,
bufferID,
} = this.getEnv();

return each(
Object.keys(stack[bufferID]),
async (cur, next) => {
let obj;
try {
obj = await store.get(cur);
} catch (e) {
feed.write(e);
}
if (obj === null) {
feed.stop(new Error(`Unable to find ${cur} in the store ${store.id()}`));
} else {
feed.write(obj);
}
next();
},
() => feed.close(),
);
}
return feed.send(data);
}


/**
* Takes an `Object` and substitute a field with the corresponding value found in a external pipeline
* the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value
Expand Down Expand Up @@ -87,7 +118,8 @@ export default async function expand(data, feed) {
if (!this.store) {
const location = this.getParam('location');
this.store = createStore(ezs, 'expand', location);
this.store.reset();
await this.store.reset();
this.flows = [];
}
if (!this.createStatements) {
const commands = ezs.createCommands({
Expand Down Expand Up @@ -116,24 +148,12 @@ export default async function expand(data, feed) {
stack: this.stack,
bufferID,
}))
.pipe(ezs.catch())
.on('end', () => each(
Object.keys(this.stack[bufferID]),
async (cur, next) => {
let obj;
try {
obj = await this.store.get(cur);
} catch (e) {
feed.write(e);
}
if (obj === null) {
feed.write(new Error('id has been lost'));
} else {
feed.write(obj);
}
next();
},
));
.pipe(ezs(drainWith, { path }, {
store: this.store,
cache: this.cache,
stack: this.stack,
bufferID,
}));
const input = Array.from(this.buffer);
this.buffer = [];
this.bufferID += 1;
Expand All @@ -155,10 +175,11 @@ export default async function expand(data, feed) {
if (this.isLast()) {
if (this.buffer && this.buffer.length > 0) {
const strm = this.buffer2stream(this.bufferID);
strm.once('end', () => this.store.close());
return feed.flow(strm);
this.flows.push(feed.flow(strm));
}
this.store.close();
await Promise.all(this.flows);
await this.store.close();

return feed.close();
}
const value = get(data, path);
Expand All @@ -181,7 +202,7 @@ export default async function expand(data, feed) {
this.buffer.push(core(id, value));
if (this.buffer.length >= size) {
const strm = this.buffer2stream(this.bufferID);
return feed.flow(strm);
return this.flows.push(feed.flow(strm));
}
return feed.end();
} catch (e) {
Expand Down
89 changes: 70 additions & 19 deletions packages/analytics/test/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -682,14 +682,12 @@ test('with a script that loses some items', (done) => {
output.push(chunk);
})
.on('end', () => {
expect(output.length).toEqual(6);
expect(output.length).toEqual(5);
expect(output[0].b).toEqual('A');
expect(output[1].b).toEqual('B');
expect(output[2].b).toEqual('c');
expect(output[3].b).toEqual('D');
expect(output[4].b).toEqual('E');
expect(output[5].b).toEqual('F');
expect(env.executed).toEqual(false);
expect(output[2].b).toEqual('D');
expect(output[3].b).toEqual('E');
expect(output[4].b).toEqual('F');
done();
});
});
Expand Down Expand Up @@ -872,14 +870,12 @@ describe('with sub script and brute force write', () => {
`;

Promise.all(Array(size).fill(true).map(() => func(script)))
.then(() => {
// expand extract error because a error in sub pipeline cannot be rejectied in the main pipeline (no id)
done(new Error('Error is the right behavior'));
})
.catch((e) => {
.then((r) => {
const e = r.map(x => x.filter(y => (y instanceof Error)).pop()).filter(Boolean).pop();
expect(e.message).toEqual(expect.stringContaining('Erratic Error'));
done();
});
})
.catch(done);
});

test('erratic error on top', (done) => {
Expand Down Expand Up @@ -940,7 +936,6 @@ describe('with sub script and brute force write', () => {
[use]
plugin = basics
plugin = analytics
[replace]
path = id
value = get('a')
Expand Down Expand Up @@ -972,13 +967,15 @@ describe('with sub script and brute force write', () => {
value = get('value')
`;
func(script)
.then(() => {
done(new Error('Error is the right behavior'));
})
.catch((e) => {
expect(e.message).toEqual(expect.stringContaining('No back pressure control ?'));
.then((r) => {
expect(r[0].b).toEqual('A');
expect(r[1].b).toEqual('B');
expect(r[2].b).toEqual('C');
expect(r[3].b).toEqual('d');
expect(r[4].b).toEqual('e');
done();
});
})
.catch(done);
});

test('truncated on top', (done) => {
Expand Down Expand Up @@ -1027,3 +1024,57 @@ describe('with sub script and brute force write', () => {
.catch(done);
});
});
test('deep script', (done) => {
ezs.use(statements);
ezs.use(statements);
const input = Array(30).fill(true).map((i, index) => ({ a: index, b: ['a', 'b', 'c', 'd', 'e', 'f'] }));
const output = [];
const script = `
[use]
plugin = basics
plugin = analytics
[replace]
path = id
value = get('a')
path = value
value = get('b')
[expand]
path = value
[expand/expand]
path = value
[expand/expand/exploding]
[expand/expand/expand]
path = value
size = 5
[expand/expand/expand/transit]
[expand/expand/aggregate]
[replace]
path = a
value = get('id')
path = b
value = get('value')
`;
from(input)
.pipe(ezs('delegate', { script }))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
expect(output.length).toEqual(30);
expect(output[0].a).toEqual(0);
expect(output[5].b[0]).toEqual('a');
done();
});
});

8 changes: 4 additions & 4 deletions packages/core/src/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ export default class Engine extends SafeTransform {
}
};
const push = (data) => {
if (this._readableState.ended) {
if (data === null) {
this.nullWasSent = true;
}
if (!this.nullWasSent && this._readableState.ended) {
return warn(new Error('No back pressure control ?'));
}
if (data instanceof Error) {
debug('ezs')(`Ignoring error at item #${currentIndex}`);
return this.push(createErrorWith(data, currentIndex, this.funcName, chunk));
}
if (data === null) {
this.nullWasSent = true;
}
return this.push(data);
};
const wait = async () => {
Expand Down
32 changes: 16 additions & 16 deletions packages/store/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
"author": "Nicolas Thouvenin <[email protected]>",
"bugs": "https://github.com/Inist-CNRS/ezs/issues",
"dependencies": {
"debug": "4.3.3",
"debug": "4.3.4",
"del": "6.0.0",
"leveldown": "6.1.0",
"leveldown": "6.1.1",
"levelup": "5.1.1",
"make-dir": "3.1.0",
"path-exists": "4.0.0",
Expand Down
3 changes: 3 additions & 0 deletions packages/store/src/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Store {
});
this.hdb = (cb) => this.ready.then(() => {
if (!this.handle) return cb(new Error('Store was closed'));
if (!this.handle.isOperational()) return cb(new Error('Store is not operational'));
return cb(null, this.handle);
}).catch(cb);
}
Expand Down Expand Up @@ -95,6 +96,7 @@ class Store {
return db.get(key2, (err1, value) => {
if (err1) {
if (err1.notFound) {
console.error('WARNING', err1);
return resolve(null);
}
return reject(err1);
Expand Down Expand Up @@ -193,6 +195,7 @@ class Store {
}

async close() {
debug('ezs')(`DB from ${this.directory} is closing`);
delete handle[this.directory];
if (!this.handle) {
return del([this.directory], { force: true });
Expand Down

0 comments on commit cf15306

Please sign in to comment.