Skip to content

Commit

Permalink
feat: 🎸 add [combine] & [files]
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed May 15, 2020
1 parent 68f4965 commit b4ee1e0
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 0 deletions.
106 changes: 106 additions & 0 deletions packages/analytics/src/combine.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import get from 'lodash.get';
import set from 'lodash.set';
import debug from 'debug';
import Store from './store';

/**
* Takes an `Object` and substitute a field with the corresponding value found in a external pipeline
* the internal pipeline must produce a stream of special object (id, value)
*
* ```json
* [
* { year: 2000, dept: 54 },
* { year: 2001, dept: 55 },
* { year: 2003, dept: 54 },
* ]
* ```
*
* Script:
*
* ```ini
* [use]
* plugin = analytics
*
* [combine]
* path = dept
* file = ./departement.ini
*
* ```
*
* Output:
*
* ```json
* [
* { year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
* { year: 2001, dept: { id: 55, value: 'Meuse' } },
* { year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
* ]
* ```
*
* @name combine
* @param {String} [path] the path to substitute
* @param {String} [primer=auto] Data to send to the external pipeline
* @param {String} [file] the external pipeline is described in a file
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @returns {Object}
*/
export default function combine(data, feed) {
const { ezs } = this;
let whenReady = Promise.resolve(true);
if (this.isFirst()) {
const file = this.getParam('file');
const fileContent = ezs.loadScript(file);
const script = this.getParam('script', fileContent);
const cmd1 = ezs.compileScript(script).get();
const command = this.getParam('command');
const cmd2 = [].concat(command).map(ezs.parseCommand).filter(Boolean);
const commands = this.getParam('commands', cmd1.concat(cmd2));
const environment = this.getEnv();
if (!commands || commands.length === 0) {
return feed.stop(new Error('Invalid parmeter for [combine]'));
}
debug('ezs')('[combine] with sub pipeline.');
const domain = `combine_${Math.random()}`;
const primer = this.getParam('primer', domain);
const location = this.getParam('location');
this.store = new Store(ezs, domain, location);
this.store.reset();
const streams = ezs.compileCommands(commands, environment);
const input = ezs.createStream(ezs.objectMode());
const output = ezs.createPipeline(input, streams)
.pipe(ezs.catch())
.on('data', async (item) => {
const key = get(item, 'id');
const isKey = Boolean(key);

if (isKey) {
await this.store.put(key, item);
}
})
.on('error', (e) => feed.stop(e));
whenReady = new Promise((resolve) => output.on('end', resolve));
input.write(primer);
input.end();
}
if (this.isLast()) {
this.store.close();
return feed.close();
}
return whenReady
.then(async () => {
const path = this.getParam('path');
const key = get(data, path);
const validKey = Boolean(key);
if (!validKey) {
return feed.send(data);
}
const value = await this.store.get(key);
if (value) {
set(data, path, value);
}
return feed.send(data);
})
.catch((e) => feed.stop(e));
}
57 changes: 57 additions & 0 deletions packages/analytics/src/files.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { existsSync, createReadStream } from 'fs';
import { resolve } from 'path';

/**
* Take `Object` containing filename et throw content by chunk
*
* Note : files must be under the working directory of the Node.js process.
*
* ```json
* [ fi1e1.csv, file2.csv ]
* ```
*
* Script:
*
* ```ini
* [use]
* plugin = analytics
* plugin = basics
*
* [files]
* [CSVParse]
*
* ```
*
* Output:
*
* ```json
* [
* (...)
* ]
* ```
*
* @name files
* @param {String} [location=.] path location to find files
* @returns {Object}
*/
export default function files(data, feed) {
if (this.isLast()) {
feed.close();
return;
}
const cwd = process.cwd();
const location = [].concat(this.getParam('location', cwd));
const file = location
.filter(Boolean)
.map((dir) => resolve(dir, data))
.filter((fil) => (fil.indexOf(cwd) === 0 && existsSync(fil)))
.shift();
if (!file) {
feed.end();
return;
}
createReadStream(file)
.on('data', (d) => feed.write(d))
.on('error', (e) => feed.stop(e))
.on('end', () => feed.end());
}
4 changes: 4 additions & 0 deletions packages/analytics/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import multiply from './mulitply';
import distance from './distance';
import aggregate from './aggregate';
import statistics from './statistics';
import combine from './combine';
import files from './files';

export default {
count,
Expand Down Expand Up @@ -64,4 +66,6 @@ export default {
distance,
aggregate,
statistics,
combine,
files,
};
167 changes: 167 additions & 0 deletions packages/analytics/test/combine.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import assert from 'assert';
import from from 'from';
import ezs from '../../core/src';
import statements from '../src';

ezs.addPath(__dirname);
ezs.use(statements);

describe('combine', () => {
test('with script', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
const output = [];
const script = `
[use]
plugin = analytics
[replace]
path = value
value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'})
[exploding]
[value]
`;

from(input)
.pipe(ezs('combine', { path: 'b', script }))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 6);
assert.equal(output[0].b.value, 'aa');
assert.equal(output[1].b.value, 'bb');
assert.equal(output[2].b.value, 'cc');
assert.equal(output[3].b.value, 'dd');
assert.equal(output[4].b.value, 'ee');
assert.equal(output[5].b.value, 'ff');
done();
});
});
test('with file', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
const output = [];
const file = './datasource01.ini';

from(input)
.pipe(ezs('combine', { path: 'b', file }))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 6);
assert.equal(output[0].b.value, 'aa');
assert.equal(output[1].b.value, 'bb');
assert.equal(output[2].b.value, 'cc');
assert.equal(output[3].b.value, 'dd');
assert.equal(output[4].b.value, 'ee');
assert.equal(output[5].b.value, 'ff');
done();
});
});
});
describe('no combine', () => {
test('with script', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
const output = [];
const script = `
[use]
plugin = analytics
[replace]
path = value
value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'})
[exploding]
[value]
`;

from(input)
.pipe(ezs('combine', { path: 'no exiting path', script }))
.pipe(ezs.catch())
.on('error', done)
.on('data', (chunk) => {
output.push(chunk);
})
.on('end', () => {
assert.equal(output.length, 6);
assert.equal(output[0].b, 'a');
assert.equal(output[1].b, 'b');
assert.equal(output[2].b, 'c');
assert.equal(output[3].b, 'd');
assert.equal(output[4].b, 'e');
assert.equal(output[5].b, 'f');
done();
});
});
test('with error script', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
const script = `
[use]
plugin = analytics
[files]
location = ${__dirname}
`;
from(input)
.pipe(ezs('combine', { script, primer: 'forbidden' }))
.pipe(ezs.catch())
.on('error', () => {
done();
})
.on('end', () => {
done(new Error('Error is the right behavior'));
});
});
test('with no in script', (done) => {
const input = [
{ a: 1, b: 'a' },
{ a: 2, b: 'b' },
{ a: 3, b: 'c' },
{ a: 4, b: 'd' },
{ a: 5, b: 'e' },
{ a: 6, b: 'f' },
];
from(input)
.pipe(ezs('combine'))
.pipe(ezs.catch())
.on('error', () => {
done();
})
.on('end', () => {
done(new Error('Error is the right behavior'));
});
});
});
10 changes: 10 additions & 0 deletions packages/analytics/test/datasource01.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[use]
plugin = analytics

[replace]
path = value
value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'})

[exploding]
[value]

Loading

0 comments on commit b4ee1e0

Please sign in to comment.