diff --git a/packages/analytics/src/combine.js b/packages/analytics/src/combine.js new file mode 100644 index 00000000..1ed1d3ff --- /dev/null +++ b/packages/analytics/src/combine.js @@ -0,0 +1,106 @@ +import get from 'lodash.get'; +import set from 'lodash.set'; +import debug from 'debug'; +import Store from './store'; + +/** + * Takes an `Object` and substitute a field with the corresponding value found in a external pipeline + * the internal pipeline must produce a stream of special object (id, value) + * + * ```json + * [ + * { year: 2000, dept: 54 }, + * { year: 2001, dept: 55 }, + * { year: 2003, dept: 54 }, + * ] + * ``` + * + * Script: + * + * ```ini + * [use] + * plugin = analytics + * + * [combine] + * path = dept + * file = ./departement.ini + * + * ``` + * + * Output: + * + * ```json + * [ + * { year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } }, + * { year: 2001, dept: { id: 55, value: 'Meuse' } }, + * { year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } }, + * ] + * ``` + * + * @name combine + * @param {String} [path] the path to substitute + * @param {String} [primer=auto] Data to send to the external pipeline + * @param {String} [file] the external pipeline is described in a file + * @param {String} [script] the external pipeline is described in a string of characters + * @param {String} [commands] the external pipeline is described in a object + * @param {String} [command] the external pipeline is described in a URL-like command + * @returns {Object} + */ +export default function combine(data, feed) { + const { ezs } = this; + let whenReady = Promise.resolve(true); + if (this.isFirst()) { + const file = this.getParam('file'); + const fileContent = ezs.loadScript(file); + const script = this.getParam('script', fileContent); + const cmd1 = ezs.compileScript(script).get(); + const command = this.getParam('command'); + const cmd2 = [].concat(command).map(ezs.parseCommand).filter(Boolean); + const commands = this.getParam('commands', cmd1.concat(cmd2)); + const environment = this.getEnv(); + if (!commands || commands.length === 0) { + return feed.stop(new Error('Invalid parmeter for [combine]')); + } + debug('ezs')('[combine] with sub pipeline.'); + const domain = `combine_${Math.random()}`; + const primer = this.getParam('primer', domain); + const location = this.getParam('location'); + this.store = new Store(ezs, domain, location); + this.store.reset(); + const streams = ezs.compileCommands(commands, environment); + const input = ezs.createStream(ezs.objectMode()); + const output = ezs.createPipeline(input, streams) + .pipe(ezs.catch()) + .on('data', async (item) => { + const key = get(item, 'id'); + const isKey = Boolean(key); + + if (isKey) { + await this.store.put(key, item); + } + }) + .on('error', (e) => feed.stop(e)); + whenReady = new Promise((resolve) => output.on('end', resolve)); + input.write(primer); + input.end(); + } + if (this.isLast()) { + this.store.close(); + return feed.close(); + } + return whenReady + .then(async () => { + const path = this.getParam('path'); + const key = get(data, path); + const validKey = Boolean(key); + if (!validKey) { + return feed.send(data); + } + const value = await this.store.get(key); + if (value) { + set(data, path, value); + } + return feed.send(data); + }) + .catch((e) => feed.stop(e)); +} diff --git a/packages/analytics/src/files.js b/packages/analytics/src/files.js new file mode 100644 index 00000000..02e42b0b --- /dev/null +++ b/packages/analytics/src/files.js @@ -0,0 +1,57 @@ +import { existsSync, createReadStream } from 'fs'; +import { resolve } from 'path'; + +/** + * Take `Object` containing filename et throw content by chunk + * + * Note : files must be under the working directory of the Node.js process. + * + * ```json + * [ fi1e1.csv, file2.csv ] + * ``` + * + * Script: + * + * ```ini + * [use] + * plugin = analytics + * plugin = basics + * + * [files] + * [CSVParse] + * + * ``` + * + * Output: + * + * ```json + * [ + * (...) + * ] + * ``` + * + * @name files + * @param {String} [location=.] path location to find files + * @returns {Object} + */ +export default function files(data, feed) { + if (this.isLast()) { + feed.close(); + return; + } + const cwd = process.cwd(); + const location = [].concat(this.getParam('location', cwd)); + const file = location + .filter(Boolean) + .map((dir) => resolve(dir, data)) + .filter((fil) => (fil.indexOf(cwd) === 0 && existsSync(fil))) + .shift(); + if (!file) { + feed.end(); + return; + } + createReadStream(file) + .on('data', (d) => feed.write(d)) + .on('error', (e) => feed.stop(e)) + .on('end', () => feed.end()); +} diff --git a/packages/analytics/src/index.js b/packages/analytics/src/index.js index c2d2c4c2..86b3c298 100644 --- a/packages/analytics/src/index.js +++ b/packages/analytics/src/index.js @@ -30,6 +30,8 @@ import multiply from './mulitply'; import distance from './distance'; import aggregate from './aggregate'; import statistics from './statistics'; +import combine from './combine'; +import files from './files'; export default { count, @@ -64,4 +66,6 @@ export default { distance, aggregate, statistics, + combine, + files, }; diff --git a/packages/analytics/test/combine.js b/packages/analytics/test/combine.js new file mode 100644 index 00000000..8e942639 --- /dev/null +++ b/packages/analytics/test/combine.js @@ -0,0 +1,167 @@ +import assert from 'assert'; +import from from 'from'; +import ezs from '../../core/src'; +import statements from '../src'; + +ezs.addPath(__dirname); +ezs.use(statements); + +describe('combine', () => { + test('with script', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 2, b: 'b' }, + { a: 3, b: 'c' }, + { a: 4, b: 'd' }, + { a: 5, b: 'e' }, + { a: 6, b: 'f' }, + ]; + const output = []; + const script = ` + [use] + plugin = analytics + + [replace] + path = value + value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'}) + + [exploding] + [value] + `; + + from(input) + .pipe(ezs('combine', { path: 'b', script })) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.equal(output[0].b.value, 'aa'); + assert.equal(output[1].b.value, 'bb'); + assert.equal(output[2].b.value, 'cc'); + assert.equal(output[3].b.value, 'dd'); + assert.equal(output[4].b.value, 'ee'); + assert.equal(output[5].b.value, 'ff'); + done(); + }); + }); + test('with file', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 2, b: 'b' }, + { a: 3, b: 'c' }, + { a: 4, b: 'd' }, + { a: 5, b: 'e' }, + { a: 6, b: 'f' }, + ]; + const output = []; + const file = './datasource01.ini'; + + from(input) + .pipe(ezs('combine', { path: 'b', file })) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.equal(output[0].b.value, 'aa'); + assert.equal(output[1].b.value, 'bb'); + assert.equal(output[2].b.value, 'cc'); + assert.equal(output[3].b.value, 'dd'); + assert.equal(output[4].b.value, 'ee'); + assert.equal(output[5].b.value, 'ff'); + done(); + }); + }); +}); +describe('no combine', () => { + test('with script', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 2, b: 'b' }, + { a: 3, b: 'c' }, + { a: 4, b: 'd' }, + { a: 5, b: 'e' }, + { a: 6, b: 'f' }, + ]; + const output = []; + const script = ` + [use] + plugin = analytics + + [replace] + path = value + value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'}) + + [exploding] + [value] + `; + + from(input) + .pipe(ezs('combine', { path: 'no exiting path', script })) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 6); + assert.equal(output[0].b, 'a'); + assert.equal(output[1].b, 'b'); + assert.equal(output[2].b, 'c'); + assert.equal(output[3].b, 'd'); + assert.equal(output[4].b, 'e'); + assert.equal(output[5].b, 'f'); + done(); + }); + }); + test('with error script', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 2, b: 'b' }, + { a: 3, b: 'c' }, + { a: 4, b: 'd' }, + { a: 5, b: 'e' }, + { a: 6, b: 'f' }, + ]; + const script = ` + [use] + plugin = analytics + + [files] + location = ${__dirname} + `; + from(input) + .pipe(ezs('combine', { script, primer: 'forbidden' })) + .pipe(ezs.catch()) + .on('error', () => { + done(); + }) + .on('end', () => { + done(new Error('Error is the right behavior')); + }); + }); + test('with no in script', (done) => { + const input = [ + { a: 1, b: 'a' }, + { a: 2, b: 'b' }, + { a: 3, b: 'c' }, + { a: 4, b: 'd' }, + { a: 5, b: 'e' }, + { a: 6, b: 'f' }, + ]; + from(input) + .pipe(ezs('combine')) + .pipe(ezs.catch()) + .on('error', () => { + done(); + }) + .on('end', () => { + done(new Error('Error is the right behavior')); + }); + }); +}); diff --git a/packages/analytics/test/datasource01.ini b/packages/analytics/test/datasource01.ini new file mode 100644 index 00000000..f832a0f8 --- /dev/null +++ b/packages/analytics/test/datasource01.ini @@ -0,0 +1,10 @@ +[use] +plugin = analytics + +[replace] +path = value +value = fix({id:'a', value:'aa'},{id:'b', value:'bb'},{id:'c', value:'cc'},{id:'d', value:'dd'},{id:'e', value:'ee'},{id:'f', value:'ff'}) + +[exploding] +[value] + diff --git a/packages/analytics/test/files.js b/packages/analytics/test/files.js new file mode 100644 index 00000000..b7076be5 --- /dev/null +++ b/packages/analytics/test/files.js @@ -0,0 +1,69 @@ +import fs from 'fs'; +import assert from 'assert'; +import from from 'from'; +import ezs from '../../core/src'; +import statements from '../src'; +import basics from '../../basics/src'; + +ezs.addPath(__dirname); +ezs.use(statements); +ezs.use(basics); + +describe('files', () => { + test('with existing file', (done) => { + const input = [ + 'data01.json', + ]; + const output = []; + from(input) + .pipe(ezs('files', { location: __dirname })) + .pipe(ezs('JSONParse')) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 336); + assert.equal(output[0].id, '9.928'); + assert.equal(output[0].value, 1); + done(); + }); + }); + + test('with not existing file', (done) => { + const input = [ + 'XXXXX.json', + ]; + const output = []; + from(input) + .pipe(ezs('files', { location: __dirname })) + .pipe(ezs('JSONParse')) + .pipe(ezs.catch()) + .on('error', done) + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + assert.equal(output.length, 0); + done(); + }); + }); + + test('with not authorized file', (done) => { + fs.chmodSync('./forbidden', 0o333); + const input = [ + 'forbidden', + ]; + from(input) + .pipe(ezs('files', { location: __dirname })) + .pipe(ezs('JSONParse')) + .pipe(ezs.catch()) + .on('error', () => { + done(); + }) + .on('end', () => { + done(new Error('Error is the right behavior')); + }); + }); +}); diff --git a/packages/analytics/test/forbidden b/packages/analytics/test/forbidden new file mode 100644 index 00000000..e69de29b