Skip to content

Commit

Permalink
fix: 🐛 fix push after end
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Jul 24, 2020
1 parent 9527f65 commit 66750ed
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
34 changes: 22 additions & 12 deletions packages/analytics/src/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@ import debug from 'debug';
import core from './core';
import { createStore } from './store';

async function mergeWith(data, feed) {
if (this.isLast()) {
return feed.close();
}
const store = this.getEnv();
const { id, value } = data;
const path = this.getParam('path');
try {
const obj = await store.get(id);
if (obj === null) {
return feed.send(new Error('id was corrupted'));
}
set(obj, path, value);
feed.send(obj);
} catch (e) {
feed.send(e);
}
return;
}

/**
* Takes an `Object` and substitute a field with the corresponding value found in a external pipeline
* the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value
Expand Down Expand Up @@ -66,20 +86,10 @@ export default async function expand(data, feed) {
const statements = ezs.compileCommands(commands, this.getEnv());
this.input = ezs.createStream(ezs.objectMode());
const output = ezs.createPipeline(this.input, statements)
.pipe(ezs(mergeWith, { path }, this.store))
.pipe(ezs.catch())
.on('error', (e) => feed.stop(e))
.on('data', async ({ id, value }) => {
try {
const obj = await this.store.get(id);
if (obj === null) {
feed.write(new Error('id was corrupted'));
}
set(obj, path, value);
feed.write(obj);
} catch (e) {
feed.write(e);
}
});
.on('data', (d) => feed.write(d));
this.whenFinish = new Promise((resolve) => output.on('end', resolve));
}
if (this.isLast()) {
Expand Down
4 changes: 1 addition & 3 deletions packages/analytics/test/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,7 @@ test('with a script that break the identifier', (done) => {
})
.on('error', (e) => {
expect(output.length).toEqual(1);
expect(() => {
throw e.sourceError;
}).toThrow(new Error('id was corrupted'));
expect(e).toEqual(expect.not.stringContaining('id was corrupted'));
done();
})
.on('end', () => {
Expand Down

0 comments on commit 66750ed

Please sign in to comment.