Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Dec 13, 2020
1 parent c304c38 commit 89b434b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
17 changes: 13 additions & 4 deletions packages/core/src/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import debug from 'debug';
import queue from 'concurrent-queue';
import { hrtime } from 'process';
import pWaitFor from 'p-wait-for';
import Feed from './feed';
import Shell from './shell';

Expand Down Expand Up @@ -56,6 +57,7 @@ export default class Engine extends SafeTransform {
this.scope.isFirst = () => (this.index === 1);
this.scope.getIndex = () => this.index;
this.scope.isLast = () => (this.chunk === null);
this.scope.getCumulativeTime = () => nano2sec(hrtime.bigint() - this.stime);
this.scope.getParam = (name, defval) => {
if (this.params[name] !== undefined) {
return this.shell.run(this.params[name], this.chunk);
Expand Down Expand Up @@ -101,6 +103,11 @@ export default class Engine extends SafeTransform {
done();
});
}
isReady() {
return (!this._readableState.ended
&& (this._readableState.length < this._readableState.highWaterMark
|| this._readableState.length === 0));
}

execWith(chunk, done) {
if (this.errorWasSent || this.nullWasSent) {
Expand Down Expand Up @@ -128,10 +135,12 @@ export default class Engine extends SafeTransform {
}
return this.push(data);
};
const ready = () => (!this._readableState.ended
&& (this._readableState.length < this._readableState.highWaterMark
|| this._readableState.length === 0));
const feed = new Feed(push, done, warn, ready);
const wait = async () => {
this.pause();
await pWaitFor(() => this.isReady(), { interval: 20 });
return this.resume();
};
const feed = new Feed(push, done, warn, wait);
try {
this.chunk = chunk;
return Promise.resolve(this.func.call(this.scope, chunk, feed, currentIndex)).catch((e) => {
Expand Down
7 changes: 3 additions & 4 deletions packages/core/src/feed.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import pWaitFor from 'p-wait-for';
import once from 'once';

export default class Feed {
constructor(push, done, error, isReady) {
constructor(push, done, error, wait) {
this.push = push;
this.done = once(done);
this.error = once(error);
this.seal = once(() => { push(null); done(); });
this.isReady = isReady;
this.wait = wait;
}

write(something) {
Expand All @@ -22,7 +21,7 @@ export default class Feed {
stream.on('data', async (data) => {
if (!this.push(data)) {
stream.pause();
await pWaitFor(() => this.isReady());
await this.wait();
stream.resume();
}
});
Expand Down

0 comments on commit 89b434b

Please sign in to comment.