diff --git a/packages/analytics/src/combine.js b/packages/analytics/src/combine.js index 508f4a6c..619696e0 100644 --- a/packages/analytics/src/combine.js +++ b/packages/analytics/src/combine.js @@ -2,25 +2,31 @@ import get from 'lodash.get'; import set from 'lodash.set'; import debug from 'debug'; import assert from 'assert'; -import crypto from 'crypto'; -import { createStore, createPersistentStore } from '@ezs/store'; +import hasher from 'node-object-hash'; +import core from './core'; + +const hashCoerce = hasher({ sort: false, coerce: true }); + +const database = {}; async function saveIn(data, feed) { if (this.isLast()) { return feed.close(); } - const store = this.getEnv(); - const key = get(data, 'id'); - const isKey = Boolean(key); + const databaseID = this.getEnv(); + const { id, value } = data; + const isKey = Boolean(id); if (isKey) { - await store.put(key, data); + if (!database[databaseID]) { + database[databaseID] = {}; + } + if (!database[databaseID][id]) { + database[databaseID][id] = value; + } } - return feed.send(key); + return feed.send(id); } -function sha1(input, salt) { - return crypto.createHash('sha1').update(JSON.stringify(input) + String(salt)).digest('hex'); -} /** * Takes an `Object` and substitute a field with the corresponding value found in a external pipeline @@ -59,23 +65,19 @@ function sha1(input, salt) { * @name combine * @param {String} [path] the path to substitute * @param {String} [default] value if no substitution (otherwise value stay unchanged) - * @param {String} [primer=auto] Data to send to the external pipeline + * @param {String} [primer=n/a] Data to send to the external pipeline * @param {String} [file] the external pipeline is described in a file * @param {String} [script] the external pipeline is described in a string of characters * @param {String} [commands] the external pipeline is described in a object * @param {String} [command] the external pipeline is described in a URL-like command - * @param {String} [cacheName] Enable cache, with dedicated name * @returns {Object} */ export default function combine(data, feed) { const { ezs } = this; - const cacheName = this.getParam('cacheName'); - const persistent = Boolean(cacheName); let whenReady = Promise.resolve(true); if (this.isFirst()) { debug('ezs')('[combine] with sub pipeline.'); - const location = this.getParam('location'); - const primer = this.getParam('primer'); + const primer = this.getParam('primer', 'n/a'); const input = ezs.createStream(ezs.objectMode()); const commands = ezs.createCommands({ file: this.getParam('file'), @@ -85,33 +87,22 @@ export default function combine(data, feed) { prepend: this.getParam('prepend'), append: this.getParam('append'), }); - if (persistent) { - this.store = createPersistentStore(ezs, `combine${cacheName}`, location); - } else { - this.store = createStore(ezs, 'combine', location); - } - if (persistent && !this.store.isCreated()) { - whenReady = Promise.resolve(true); - } else { - const statements = ezs.compileCommands(commands, this.getEnv()); - const output = ezs.createPipeline(input, statements) - .pipe(ezs(saveIn, null, this.store)) - .pipe(ezs.catch()) - .on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered - .on('error', (e) => feed.stop(e)); - whenReady = new Promise((resolve) => output.on('end', resolve)); - input.write(primer || this.store.id()); - input.end(); - } + this.databaseID = hashCoerce.hash({ primer, commands }); + const statements = ezs.compileCommands(commands, this.getEnv()); + const output = ezs.createPipeline(input, statements) + .pipe(ezs(saveIn, null, this.databaseID)) + .pipe(ezs.catch()) + .on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered + .on('error', (e) => feed.stop(e)); + whenReady = new Promise((resolve) => output.on('end', resolve)); + input.write(primer); + input.end(); } if (this.isLast()) { - if (!persistent) { - this.store.close(); - } return feed.close(); } return whenReady - .then(async () => { + .then(() => { const defval = this.getParam('default', null); const path = this.getParam('path'); const pathVal = get(data, path); @@ -119,7 +110,12 @@ export default function combine(data, feed) { if (keys.length === 0) { return feed.send(data); } - const values = await Promise.all(keys.map((key) => this.store.get(key))); + const values = keys.map((key) => { + if (!database[this.databaseID][key]) { + return null; + } + return core(key, database[this.databaseID][key]); + }); if (values.length && Array.isArray(pathVal)) { set(data, path, values); } else if (values.length && !Array.isArray(pathVal)) { @@ -141,9 +137,6 @@ export default function combine(data, feed) { return feed.send(data); }) .catch((e) => { - if (this.store) { - this.store.close(); - } feed.stop(e); }); } diff --git a/packages/analytics/test/combine.js b/packages/analytics/test/combine.js index 8c152433..6e6e9a47 100644 --- a/packages/analytics/test/combine.js +++ b/packages/analytics/test/combine.js @@ -38,7 +38,9 @@ describe('combine', () => { output.push(chunk); }) .on('end', () => { + assert.equal(output.length, 6); + console.log(output, output[0], output[0]); assert.equal(output[0].b.value, 'aa'); assert.equal(output[1].b.value, 'bb'); assert.equal(output[2].b.value, 'cc'); @@ -340,7 +342,7 @@ describe('no combine', () => { done(new Error('Error is the right behavior')); }); }); - test('with wrong location', (done) => { + test.skip('with wrong location', (done) => { ezs.use(statements); const input = [ { a: 1, b: 'a' }, @@ -383,7 +385,7 @@ const env = { }; const cacheName = Date.now(); -test('combine with cache with script #1', (done) => { +test.skip('combine with cache with script #1', (done) => { ezs.use(statements); const input = [ { a: 1, b: 'a' }, @@ -431,7 +433,7 @@ test('combine with cache with script #1', (done) => { }); }); -test('combine with cache with script #2', (done) => { +test.skip('combine with cache with script #2', (done) => { ezs.use(statements); const input = [ { a: 1, b: 'a' },