From e73f14042eb60b464c9e021345e479d24ba0ec81 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Mon, 12 Sep 2022 08:20:09 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20=F0=9F=90=9B=20feed.flow=20return=20a=20?= =?UTF-8?q?Promise?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/analytics/src/bufferize.js | 3 ++- packages/analytics/src/files.js | 4 ++-- packages/basics/src/file-load.js | 10 ++++------ packages/core/src/statements/loop.js | 5 +++-- packages/core/src/statements/spawn.js | 4 ++-- packages/lodex/src/aggregateQuery.js | 2 +- packages/lodex/src/joinQuery.js | 2 +- packages/lodex/src/reduceQuery.js | 2 +- packages/lodex/src/runQuery.js | 2 +- packages/loterre/src/skos-pathenum.js | 3 ++- 10 files changed, 19 insertions(+), 18 deletions(-) diff --git a/packages/analytics/src/bufferize.js b/packages/analytics/src/bufferize.js index 49175258..a31094c6 100644 --- a/packages/analytics/src/bufferize.js +++ b/packages/analytics/src/bufferize.js @@ -49,7 +49,8 @@ export default async function bufferize(data, feed) { const output = stream .pipe(this.ezs('extract', { path: 'value' })) .once('end', ()=>feed.close()); - return feed.flow(output); + await feed.flow(output); + return; } const path = this.getParam('path', 'bufferID'); const key = this.getIndex().toString().padStart(20, '0'); diff --git a/packages/analytics/src/files.js b/packages/analytics/src/files.js index 40b902d8..a4f3bbdf 100644 --- a/packages/analytics/src/files.js +++ b/packages/analytics/src/files.js @@ -34,7 +34,7 @@ import { resolve } from 'path'; * @param {String} [location=.] path location to find files * @returns {Object} */ -export default function files(data, feed) { +export default async function files(data, feed) { if (this.isLast()) { feed.close(); return; @@ -50,5 +50,5 @@ export default function files(data, feed) { feed.end(); return; } - feed.flow(createReadStream(file)); + await feed.flow(createReadStream(file)); } diff --git a/packages/basics/src/file-load.js b/packages/basics/src/file-load.js index 1a67dd70..cf451ffb 100644 --- a/packages/basics/src/file-load.js +++ b/packages/basics/src/file-load.js @@ -36,7 +36,7 @@ import { tmpdir } from 'os'; * @param {Boolean} [compress=false] Enable gzip compression * @returns {Object} */ -export default function FILELoad(data, feed) { +export default async function FILELoad(data, feed) { if (this.isLast()) { feed.close(); return; @@ -55,9 +55,7 @@ export default function FILELoad(data, feed) { feed.end(); return; } - if (compress) { - feed.flow(createReadStream(file).pipe(createGunzip())); - return; - } - feed.flow(createReadStream(file)); + const stream = compress ? createReadStream(file).pipe(createGunzip()) : createReadStream(file); + await feed.flow(stream); + return; } diff --git a/packages/core/src/statements/loop.js b/packages/core/src/statements/loop.js index 64cef69c..a2895514 100644 --- a/packages/core/src/statements/loop.js +++ b/packages/core/src/statements/loop.js @@ -1,4 +1,4 @@ -function loopFunc(data, feed) { +async function loopFunc(data, feed) { const { ezs } = this; if (this.isLast()) { return feed.close(); @@ -25,7 +25,8 @@ function loopFunc(data, feed) { if (tests.every((test) => test)) { input.write(data); input.end(); - return feed.flow(output); + await feed.flow(output); + return; } return feed.end(); } diff --git a/packages/core/src/statements/spawn.js b/packages/core/src/statements/spawn.js index 43905117..2b62ca0d 100644 --- a/packages/core/src/statements/spawn.js +++ b/packages/core/src/statements/spawn.js @@ -11,7 +11,7 @@ * @param {String} [cache] Use a specific ezs statement to run commands (advanced) * @returns {Object} */ -export default function spawn(data, feed) { +export default async function spawn(data, feed) { if (this.isLast()) { return feed.close(); } @@ -35,5 +35,5 @@ export default function spawn(data, feed) { const output = ezs.createPipeline(input, statements) .pipe(ezs.catch((e) => feed.write(e))); // avoid to break pipeline at each error ezs.writeTo(input, data, () => input.end()); - return feed.flow(output); + await feed.flow(output); } diff --git a/packages/lodex/src/aggregateQuery.js b/packages/lodex/src/aggregateQuery.js index be68587c..43771b15 100644 --- a/packages/lodex/src/aggregateQuery.js +++ b/packages/lodex/src/aggregateQuery.js @@ -56,7 +56,7 @@ export const createFunction = () => async function LodexAggregateQuery(data, fee .skip(Number(skip)) .limit(Number(limit)) .pipe(ezs('assign', { path, value })); - feed.flow(stream); + await feed.flow(stream); }; export default { diff --git a/packages/lodex/src/joinQuery.js b/packages/lodex/src/joinQuery.js index ba9448ea..69b4fbaa 100644 --- a/packages/lodex/src/joinQuery.js +++ b/packages/lodex/src/joinQuery.js @@ -123,5 +123,5 @@ export default async function LodexJoinQuery(data, feed) { output.send(input); })); - feed.flow(stream); + await feed.flow(stream); } diff --git a/packages/lodex/src/reduceQuery.js b/packages/lodex/src/reduceQuery.js index a6a7b873..acfb2504 100644 --- a/packages/lodex/src/reduceQuery.js +++ b/packages/lodex/src/reduceQuery.js @@ -112,7 +112,7 @@ export const createFunction = () => async function LodexReduceQuery(data, feed) .skip(Number(skip)) .limit(Number(limit)) .pipe(ezs('assign', { path, value })); - feed.flow(stream); + await feed.flow(stream); }; export default { diff --git a/packages/lodex/src/runQuery.js b/packages/lodex/src/runQuery.js index 48a9006b..37545738 100644 --- a/packages/lodex/src/runQuery.js +++ b/packages/lodex/src/runQuery.js @@ -64,7 +64,7 @@ export const createFunction = () => async function LodexRunQuery(data, feed) { .skip(Number(skip)) .limit(Number(limit)) .pipe(ezs('assign', { path, value })); - feed.flow(stream); + await feed.flow(stream); }; export default { diff --git a/packages/loterre/src/skos-pathenum.js b/packages/loterre/src/skos-pathenum.js index 5024d508..1726aed2 100644 --- a/packages/loterre/src/skos-pathenum.js +++ b/packages/loterre/src/skos-pathenum.js @@ -91,7 +91,8 @@ async function SKOSPathEnum(data, feed) { this.store.close(); }); ; - return feed.flow(output); + await feed.flow(output); + return; } await this.store.add(data.rdf$about, data); feed.end();