Skip to content

Commit

Permalink
Merge pull request #253 from Inist-CNRS/fix-feed-flow
Browse files Browse the repository at this point in the history
fix: 🐛 feed.flow return a Promise
  • Loading branch information
touv authored Sep 12, 2022
2 parents 8aa2a16 + e73f140 commit 56396ea
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 18 deletions.
3 changes: 2 additions & 1 deletion packages/analytics/src/bufferize.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ export default async function bufferize(data, feed) {
const output = stream
.pipe(this.ezs('extract', { path: 'value' }))
.once('end', ()=>feed.close());
return feed.flow(output);
await feed.flow(output);
return;
}
const path = this.getParam('path', 'bufferID');
const key = this.getIndex().toString().padStart(20, '0');
Expand Down
4 changes: 2 additions & 2 deletions packages/analytics/src/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { resolve } from 'path';
* @param {String} [location=.] path location to find files
* @returns {Object}
*/
export default function files(data, feed) {
export default async function files(data, feed) {
if (this.isLast()) {
feed.close();
return;
Expand All @@ -50,5 +50,5 @@ export default function files(data, feed) {
feed.end();
return;
}
feed.flow(createReadStream(file));
await feed.flow(createReadStream(file));
}
10 changes: 4 additions & 6 deletions packages/basics/src/file-load.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { tmpdir } from 'os';
* @param {Boolean} [compress=false] Enable gzip compression
* @returns {Object}
*/
export default function FILELoad(data, feed) {
export default async function FILELoad(data, feed) {
if (this.isLast()) {
feed.close();
return;
Expand All @@ -55,9 +55,7 @@ export default function FILELoad(data, feed) {
feed.end();
return;
}
if (compress) {
feed.flow(createReadStream(file).pipe(createGunzip()));
return;
}
feed.flow(createReadStream(file));
const stream = compress ? createReadStream(file).pipe(createGunzip()) : createReadStream(file);
await feed.flow(stream);
return;
}
5 changes: 3 additions & 2 deletions packages/core/src/statements/loop.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
function loopFunc(data, feed) {
async function loopFunc(data, feed) {
const { ezs } = this;
if (this.isLast()) {
return feed.close();
Expand All @@ -25,7 +25,8 @@ function loopFunc(data, feed) {
if (tests.every((test) => test)) {
input.write(data);
input.end();
return feed.flow(output);
await feed.flow(output);
return;
}
return feed.end();
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/statements/spawn.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* @param {String} [cache] Use a specific ezs statement to run commands (advanced)
* @returns {Object}
*/
export default function spawn(data, feed) {
export default async function spawn(data, feed) {
if (this.isLast()) {
return feed.close();
}
Expand All @@ -35,5 +35,5 @@ export default function spawn(data, feed) {
const output = ezs.createPipeline(input, statements)
.pipe(ezs.catch((e) => feed.write(e))); // avoid to break pipeline at each error
ezs.writeTo(input, data, () => input.end());
return feed.flow(output);
await feed.flow(output);
}
2 changes: 1 addition & 1 deletion packages/lodex/src/aggregateQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const createFunction = () => async function LodexAggregateQuery(data, fee
.skip(Number(skip))
.limit(Number(limit))
.pipe(ezs('assign', { path, value }));
feed.flow(stream);
await feed.flow(stream);
};

export default {
Expand Down
2 changes: 1 addition & 1 deletion packages/lodex/src/joinQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,5 @@ export default async function LodexJoinQuery(data, feed) {
output.send(input);
}));

feed.flow(stream);
await feed.flow(stream);
}
2 changes: 1 addition & 1 deletion packages/lodex/src/reduceQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export const createFunction = () => async function LodexReduceQuery(data, feed)
.skip(Number(skip))
.limit(Number(limit))
.pipe(ezs('assign', { path, value }));
feed.flow(stream);
await feed.flow(stream);
};

export default {
Expand Down
2 changes: 1 addition & 1 deletion packages/lodex/src/runQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export const createFunction = () => async function LodexRunQuery(data, feed) {
.skip(Number(skip))
.limit(Number(limit))
.pipe(ezs('assign', { path, value }));
feed.flow(stream);
await feed.flow(stream);
};

export default {
Expand Down
3 changes: 2 additions & 1 deletion packages/loterre/src/skos-pathenum.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ async function SKOSPathEnum(data, feed) {
this.store.close();
});
;
return feed.flow(output);
await feed.flow(output);
return;
}
await this.store.add(data.rdf$about, data);
feed.end();
Expand Down

0 comments on commit 56396ea

Please sign in to comment.