Skip to content

Commit

Permalink
fix: 🐛 add warning for back pressure control
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Jan 29, 2022
1 parent ef5f954 commit beb67f8
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 27 deletions.
105 changes: 78 additions & 27 deletions packages/analytics/test/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -711,29 +711,32 @@ describe('with sub script and brute force write', () => {
const func = (script) => new Promise((resolve, reject) => {
const output = [];
const strm = new PassThrough({ objectMode: true });
strm
.pipe(ezs('delegate', { script }))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
resolve(output);
})
.on('error', (e) => {
reject(e);
});

// brute force write ! (no back pressure control)
for (const entry of input) {
strm.write(entry);
try {
strm
.pipe(ezs('delegate', { script }))
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
resolve(output);
})
.on('error', (e) => {
reject(e);
});
// brute force write ! (no back pressure control)
for (const entry of input) {
strm.write(entry);
}
strm.end();
} catch(e) {
reject(e);
}
strm.end();
});

beforeAll(() => jest.setTimeout(60000));
afterAll(() => jest.setTimeout(5000));

test('with no error', (done) => {
test('no error', (done) => {
ezs.use(statements);
const script = `
[use]
Expand Down Expand Up @@ -783,7 +786,7 @@ describe('with sub script and brute force write', () => {
.catch(done);
});

test('stopped with erratic error', (done) => {
test('erratic fatal error in depth', (done) => {
ezs.use(statements);
const script = `
[use]
Expand Down Expand Up @@ -830,7 +833,7 @@ describe('with sub script and brute force write', () => {
});


test('corrupted with erratic error', (done) => {
test('erratic error in depth', (done) => {
ezs.use(statements);
const script = `
[use]
Expand Down Expand Up @@ -879,7 +882,7 @@ describe('with sub script and brute force write', () => {
});
});

test('improper with erratic error', (done) => {
test('erratic error on top', (done) => {
ezs.use(statements);
const script = `
[use]
Expand Down Expand Up @@ -930,7 +933,8 @@ describe('with sub script and brute force write', () => {
})
.catch(done);
});
test('improper with erratic error', (done) => {

test('truncated in depth', (done) => {
ezs.use(statements);
const script = `
[use]
Expand Down Expand Up @@ -958,21 +962,68 @@ describe('with sub script and brute force write', () => {
path = value
value = get('value').toUpper()
[erraticError]
stop = true
[expand/truncate]
length = 3
[replace]
path = a
value = get('id')
path = b
value = get('value')
`;

Promise.all(Array(size).fill(true).map(() => func(script)))
.then(() => done(new Error('Error is the right behavior')))
func(script)
.then(() => {
done(new Error('Error is the right behavior'));
})
.catch((e) => {
expect(e.message).toEqual(expect.stringContaining('Erratic Error'));
expect(e.message).toEqual(expect.stringContaining('No back pressure control ?'));
done();
});
});

test('truncated on top', (done) => {
ezs.use(statements);
const script = `
[use]
plugin = basics
plugin = analytics
[replace]
path = id
value = get('a')
path = value
value = get('b')
[validate]
path = id
rule = required
path = value
rule = required
[expand]
size = 100
path = value
[expand/assign]
path = value
value = get('value').toUpper()
[truncate]
length = 3
[replace]
path = a
value = get('id')
path = b
value = get('value')func(script)
`;
Promise.all(Array(size).fill(true).map(() => func(script)))
.then((r) => {
expect(r.length).toBe(size);
expect(r[0].length).toBe(3);
done();
})
.catch(done);
});
});
3 changes: 3 additions & 0 deletions packages/core/src/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ export default class Engine extends SafeTransform {
}
};
const push = (data) => {
if (this._readableState.ended) {
return warn(new Error('No back pressure control ?'));
}
if (data instanceof Error) {
debug('ezs')(`Ignoring error at item #${currentIndex}`);
return this.push(createErrorWith(data, currentIndex, this.funcName, chunk));
Expand Down

0 comments on commit beb67f8

Please sign in to comment.