From cf15306531070939295ff8fa922795e1fd7a2fb2 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Sat, 2 Apr 2022 00:02:27 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20=F0=9F=90=9B=20erratic=20error?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/analytics/src/expand.js | 69 +++++++++++++++--------- packages/analytics/test/expand.js | 89 ++++++++++++++++++++++++------- packages/core/src/engine.js | 8 +-- packages/store/package-lock.json | 32 +++++------ packages/store/package.json | 4 +- packages/store/src/store.js | 3 ++ 6 files changed, 140 insertions(+), 65 deletions(-) diff --git a/packages/analytics/src/expand.js b/packages/analytics/src/expand.js index 0cb44e03..6ee62a09 100644 --- a/packages/analytics/src/expand.js +++ b/packages/analytics/src/expand.js @@ -29,10 +29,41 @@ async function mergeWith(data, feed) { set(obj, path, value); return feed.send(obj); } catch (e) { - return feed.send(e); + return feed.stop(e); } } +async function drainWith(data, feed) { + if (this.isLast()) { + const { + store, + stack, + bufferID, + } = this.getEnv(); + + return each( + Object.keys(stack[bufferID]), + async (cur, next) => { + let obj; + try { + obj = await store.get(cur); + } catch (e) { + feed.write(e); + } + if (obj === null) { + feed.stop(new Error(`Unable to find ${cur} in the store ${store.id()}`)); + } else { + feed.write(obj); + } + next(); + }, + () => feed.close(), + ); + } + return feed.send(data); +} + + /** * Takes an `Object` and substitute a field with the corresponding value found in a external pipeline * the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value @@ -87,7 +118,8 @@ export default async function expand(data, feed) { if (!this.store) { const location = this.getParam('location'); this.store = createStore(ezs, 'expand', location); - this.store.reset(); + await this.store.reset(); + this.flows = []; } if (!this.createStatements) { const commands = ezs.createCommands({ @@ -116,24 +148,12 @@ export default async function expand(data, feed) { stack: this.stack, bufferID, })) - .pipe(ezs.catch()) - .on('end', () => each( - Object.keys(this.stack[bufferID]), - async (cur, next) => { - let obj; - try { - obj = await this.store.get(cur); - } catch (e) { - feed.write(e); - } - if (obj === null) { - feed.write(new Error('id has been lost')); - } else { - feed.write(obj); - } - next(); - }, - )); + .pipe(ezs(drainWith, { path }, { + store: this.store, + cache: this.cache, + stack: this.stack, + bufferID, + })); const input = Array.from(this.buffer); this.buffer = []; this.bufferID += 1; @@ -155,10 +175,11 @@ export default async function expand(data, feed) { if (this.isLast()) { if (this.buffer && this.buffer.length > 0) { const strm = this.buffer2stream(this.bufferID); - strm.once('end', () => this.store.close()); - return feed.flow(strm); + this.flows.push(feed.flow(strm)); } - this.store.close(); + await Promise.all(this.flows); + await this.store.close(); + return feed.close(); } const value = get(data, path); @@ -181,7 +202,7 @@ export default async function expand(data, feed) { this.buffer.push(core(id, value)); if (this.buffer.length >= size) { const strm = this.buffer2stream(this.bufferID); - return feed.flow(strm); + return this.flows.push(feed.flow(strm)); } return feed.end(); } catch (e) { diff --git a/packages/analytics/test/expand.js b/packages/analytics/test/expand.js index 27157c4a..ba2c54da 100644 --- a/packages/analytics/test/expand.js +++ b/packages/analytics/test/expand.js @@ -682,14 +682,12 @@ test('with a script that loses some items', (done) => { output.push(chunk); }) .on('end', () => { - expect(output.length).toEqual(6); + expect(output.length).toEqual(5); expect(output[0].b).toEqual('A'); expect(output[1].b).toEqual('B'); - expect(output[2].b).toEqual('c'); - expect(output[3].b).toEqual('D'); - expect(output[4].b).toEqual('E'); - expect(output[5].b).toEqual('F'); - expect(env.executed).toEqual(false); + expect(output[2].b).toEqual('D'); + expect(output[3].b).toEqual('E'); + expect(output[4].b).toEqual('F'); done(); }); }); @@ -872,14 +870,12 @@ describe('with sub script and brute force write', () => { `; Promise.all(Array(size).fill(true).map(() => func(script))) - .then(() => { - // expand extract error because a error in sub pipeline cannot be rejectied in the main pipeline (no id) - done(new Error('Error is the right behavior')); - }) - .catch((e) => { + .then((r) => { + const e = r.map(x => x.filter(y => (y instanceof Error)).pop()).filter(Boolean).pop(); expect(e.message).toEqual(expect.stringContaining('Erratic Error')); done(); - }); + }) + .catch(done); }); test('erratic error on top', (done) => { @@ -940,7 +936,6 @@ describe('with sub script and brute force write', () => { [use] plugin = basics plugin = analytics - [replace] path = id value = get('a') @@ -972,13 +967,15 @@ describe('with sub script and brute force write', () => { value = get('value') `; func(script) - .then(() => { - done(new Error('Error is the right behavior')); - }) - .catch((e) => { - expect(e.message).toEqual(expect.stringContaining('No back pressure control ?')); + .then((r) => { + expect(r[0].b).toEqual('A'); + expect(r[1].b).toEqual('B'); + expect(r[2].b).toEqual('C'); + expect(r[3].b).toEqual('d'); + expect(r[4].b).toEqual('e'); done(); - }); + }) + .catch(done); }); test('truncated on top', (done) => { @@ -1027,3 +1024,57 @@ describe('with sub script and brute force write', () => { .catch(done); }); }); +test('deep script', (done) => { + ezs.use(statements); + ezs.use(statements); + const input = Array(30).fill(true).map((i, index) => ({ a: index, b: ['a', 'b', 'c', 'd', 'e', 'f'] })); + const output = []; + const script = ` + [use] + plugin = basics + plugin = analytics + + [replace] + path = id + value = get('a') + path = value + value = get('b') + + [expand] + path = value + + [expand/expand] + path = value + + [expand/expand/exploding] + + [expand/expand/expand] + path = value + size = 5 + + [expand/expand/expand/transit] + + [expand/expand/aggregate] + + [replace] + path = a + value = get('id') + path = b + value = get('value') + + `; + from(input) + .pipe(ezs('delegate', { script })) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + expect(output.length).toEqual(30); + expect(output[0].a).toEqual(0); + expect(output[5].b[0]).toEqual('a'); + done(); + }); +}); + diff --git a/packages/core/src/engine.js b/packages/core/src/engine.js index 45bc3de2..304d20cc 100644 --- a/packages/core/src/engine.js +++ b/packages/core/src/engine.js @@ -141,16 +141,16 @@ export default class Engine extends SafeTransform { } }; const push = (data) => { - if (this._readableState.ended) { + if (data === null) { + this.nullWasSent = true; + } + if (!this.nullWasSent && this._readableState.ended) { return warn(new Error('No back pressure control ?')); } if (data instanceof Error) { debug('ezs')(`Ignoring error at item #${currentIndex}`); return this.push(createErrorWith(data, currentIndex, this.funcName, chunk)); } - if (data === null) { - this.nullWasSent = true; - } return this.push(data); }; const wait = async () => { diff --git a/packages/store/package-lock.json b/packages/store/package-lock.json index 48c9cc54..4ed16741 100644 --- a/packages/store/package-lock.json +++ b/packages/store/package-lock.json @@ -6,12 +6,12 @@ "packages": { "": { "name": "@ezs/store", - "version": "1.3.7", + "version": "1.4.1", "license": "MIT", "dependencies": { - "debug": "4.3.3", + "debug": "4.3.4", "del": "6.0.0", - "leveldown": "6.1.0", + "leveldown": "6.1.1", "levelup": "5.1.1", "make-dir": "3.1.0", "path-exists": "4.0.0", @@ -22,7 +22,7 @@ "mkdirp": "1.0.4" }, "peerDependencies": { - "@ezs/core": "1.27.2" + "@ezs/core": "*" } }, "node_modules/@babel/code-frame": { @@ -507,9 +507,9 @@ "peer": true }, "node_modules/debug": { - "version": "4.3.3", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.3.tgz", - "integrity": "sha512-/zxw5+vh1Tfv+4Qn7a5nsbcJKPaSvCDhojn6FEl9vupwK2VCSDtEiEtqr8DFtzYFOdz63LBkxec7DYuc2jon6Q==", + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", "dependencies": { "ms": "2.1.2" }, @@ -1218,9 +1218,9 @@ } }, "node_modules/leveldown": { - "version": "6.1.0", - "resolved": "https://registry.npmjs.org/leveldown/-/leveldown-6.1.0.tgz", - "integrity": "sha512-8C7oJDT44JXxh04aSSsfcMI8YiaGRhOFI9/pMEL7nWJLVsWajDPTRxsSHTM2WcTVY5nXM+SuRHzPPi0GbnDX+w==", + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/leveldown/-/leveldown-6.1.1.tgz", + "integrity": "sha512-88c+E+Eizn4CkQOBHwqlCJaTNEjGpaEIikn1S+cINc5E9HEvJ77bqY4JY/HxT5u0caWqsc3P3DcFIKBI1vHt+A==", "hasInstallScript": true, "dependencies": { "abstract-leveldown": "^7.2.0", @@ -2447,9 +2447,9 @@ "peer": true }, "debug": { - "version": "4.3.3", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.3.tgz", - "integrity": "sha512-/zxw5+vh1Tfv+4Qn7a5nsbcJKPaSvCDhojn6FEl9vupwK2VCSDtEiEtqr8DFtzYFOdz63LBkxec7DYuc2jon6Q==", + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", "requires": { "ms": "2.1.2" } @@ -2968,9 +2968,9 @@ "integrity": "sha512-E486g1NCjW5cF78KGPrMDRBYzPuueMZ6VBXHT6gC7A8UYWGiM14fGgp+s/L1oFfDWSPV/+SFkYCmZ0SiESkRKA==" }, "leveldown": { - "version": "6.1.0", - "resolved": "https://registry.npmjs.org/leveldown/-/leveldown-6.1.0.tgz", - "integrity": "sha512-8C7oJDT44JXxh04aSSsfcMI8YiaGRhOFI9/pMEL7nWJLVsWajDPTRxsSHTM2WcTVY5nXM+SuRHzPPi0GbnDX+w==", + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/leveldown/-/leveldown-6.1.1.tgz", + "integrity": "sha512-88c+E+Eizn4CkQOBHwqlCJaTNEjGpaEIikn1S+cINc5E9HEvJ77bqY4JY/HxT5u0caWqsc3P3DcFIKBI1vHt+A==", "requires": { "abstract-leveldown": "^7.2.0", "napi-macros": "~2.0.0", diff --git a/packages/store/package.json b/packages/store/package.json index a512a464..e1d128b9 100644 --- a/packages/store/package.json +++ b/packages/store/package.json @@ -5,9 +5,9 @@ "author": "Nicolas Thouvenin ", "bugs": "https://github.com/Inist-CNRS/ezs/issues", "dependencies": { - "debug": "4.3.3", + "debug": "4.3.4", "del": "6.0.0", - "leveldown": "6.1.0", + "leveldown": "6.1.1", "levelup": "5.1.1", "make-dir": "3.1.0", "path-exists": "4.0.0", diff --git a/packages/store/src/store.js b/packages/store/src/store.js index 8a2aab28..a29887ae 100644 --- a/packages/store/src/store.js +++ b/packages/store/src/store.js @@ -54,6 +54,7 @@ class Store { }); this.hdb = (cb) => this.ready.then(() => { if (!this.handle) return cb(new Error('Store was closed')); + if (!this.handle.isOperational()) return cb(new Error('Store is not operational')); return cb(null, this.handle); }).catch(cb); } @@ -95,6 +96,7 @@ class Store { return db.get(key2, (err1, value) => { if (err1) { if (err1.notFound) { + console.error('WARNING', err1); return resolve(null); } return reject(err1); @@ -193,6 +195,7 @@ class Store { } async close() { + debug('ezs')(`DB from ${this.directory} is closing`); delete handle[this.directory]; if (!this.handle) { return del([this.directory], { force: true });