From 65fe917049f6804a4f26fa3c51c72c2a3d7ee6e6 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Thu, 12 Mar 2020 15:29:41 +0100 Subject: [PATCH] =?UTF-8?q?perf:=20=E2=9A=A1=EF=B8=8F=20new=20statement=20?= =?UTF-8?q?[boost]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/analytics/src/store.js | 1 - packages/storage/package-lock.json | 10 + packages/storage/package.json | 2 + packages/storage/src/boost.js | 144 +++++++++++ packages/storage/src/index.js | 2 + packages/storage/src/store.js | 1 + packages/storage/test/boost.js | 385 +++++++++++++++++++++++++++++ packages/storage/test/locals.js | 179 ++++++++++++++ 8 files changed, 723 insertions(+), 1 deletion(-) create mode 100644 packages/storage/src/boost.js create mode 100644 packages/storage/test/boost.js create mode 100644 packages/storage/test/locals.js diff --git a/packages/analytics/src/store.js b/packages/analytics/src/store.js index 2c80f0bd..64228b95 100644 --- a/packages/analytics/src/store.js +++ b/packages/analytics/src/store.js @@ -83,7 +83,6 @@ export default class Store { found !== null; found = cursor.goToNext()) { const id = decodeKey(found); - console.log('get', id); const value = decodeValue(txn.getString(this.dbi, found)); flow.write({ id, value }); } diff --git a/packages/storage/package-lock.json b/packages/storage/package-lock.json index fe4e8e74..7af73dfa 100644 --- a/packages/storage/package-lock.json +++ b/packages/storage/package-lock.json @@ -98,6 +98,11 @@ "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" }, + "date-diff": { + "version": "0.2.1", + "resolved": "https://registry.npmjs.org/date-diff/-/date-diff-0.2.1.tgz", + "integrity": "sha512-Cjq+RuGGqLSIyWo9It3ShVEAmHSjifF/UAPSTlT0GBT8O5zgfY0hwaH4y1n9uoB71ey3pfybbcRpvUwCEclLKw==" + }, "decompress-response": { "version": "4.2.1", "resolved": "https://registry.npmjs.org/decompress-response/-/decompress-response-4.2.1.tgz", @@ -355,6 +360,11 @@ "prebuild-install": "^5.2.5" } }, + "node-object-hash": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/node-object-hash/-/node-object-hash-2.0.0.tgz", + "integrity": "sha512-VZR0zroAusy1ETZMZiGeLkdu50LGjG5U1KHZqTruqtTyQ2wfWhHG2Ow4nsUbfTFGlaREgNHcCWoM/OzEm6p+NQ==" + }, "noop-logger": { "version": "0.1.1", "resolved": "https://registry.npmjs.org/noop-logger/-/noop-logger-0.1.1.tgz", diff --git a/packages/storage/package.json b/packages/storage/package.json index fd82b6ba..a36064b4 100644 --- a/packages/storage/package.json +++ b/packages/storage/package.json @@ -19,6 +19,7 @@ }, "homepage": "https://github.com/Inist-CNRS/ezs/tree/master/packages/storage#readme", "dependencies": { + "date-diff": "^0.2.1", "leveldown": "^5.4.1", "levelup": "^4.3.2", "lodash.get": "^4.4.2", @@ -26,6 +27,7 @@ "nanoid": "^2.1.8", "nanoid-dictionary": "^2.0.0", "node-lmdb": "^0.8.0", + "node-object-hash": "^2.0.0", "path-exists": "^4.0.0" }, "main": "./lib/index.js", diff --git a/packages/storage/src/boost.js b/packages/storage/src/boost.js new file mode 100644 index 00000000..8e28b024 --- /dev/null +++ b/packages/storage/src/boost.js @@ -0,0 +1,144 @@ +import hasher from 'node-object-hash'; +import DateDiff from 'date-diff'; +import debug from 'debug'; +import { + decodeValue, encodeKey, encodeValue, lmdbEnv, +} from './store'; + + +const hashCoerce = hasher({ + sort: false, + coerce: true, +}); + + debug.enable('ezs'); +const computeHash = (commands, environment, chunk) => { + const commandsHash = hashCoerce.hash(commands); + const environmentHash = hashCoerce.hash(environment); + const firstChunkHash = hashCoerce.hash(chunk); + const hashs = [commandsHash, environmentHash, firstChunkHash]; + debug('ezs')('Compute cache hash with', hashs.map((h) => h.slice(0, 5).concat('...'))); + return hashCoerce.hash(hashs); +}; + +const hitThe = (cache, ttl) => { + if (!cache) { + return false; + } + const diff = new DateDiff(Date.now(), cache.createdDate); + if (diff.seconds() <= ttl) { + return true; + } + return false; +}; + +/** + * Takes an `Object` delegate processing to an external pipeline and cache the result + * + * @param {String} [file] the external pipeline is descrbied in a file + * @param {String} [script] the external pipeline is descrbied in a sting of characters + * @param {String} [commands] the external pipeline is descrbied in object + * @param {String} [key] the cache key form the stream, in not provided, it's computed with the first chunk + * @param {Number} [cleanupDelay=600] Frequency (seconds) to cleanup the cache (10 min) + * @returns {Object} + */ +export default function boost(data, feed) { + const { ezs } = this; + if (this.isFirst()) { + const file = this.getParam('file'); + const fileContent = ezs.loadScript(file); + const script = this.getParam('script', fileContent); + const cmds = ezs.compileScript(script); + const commands = this.getParam('commands', cmds.get()); + const key = this.getParam('key'); + const cleanupDelay = Number(this.getParam('cleanupDelay', 10 * 60)); + const environment = this.getEnv(); + if (!this.dbi) { + this.dbi = lmdbEnv(this.ezs).openDbi({ + name: 'cache_index', + create: true, + }); + } + + if (!commands || commands.length === 0) { + return feed.stop(new Error('Invalid parameter for booster')); + } + + const streams = ezs.compileCommands(commands, environment); + const uniqHash = key || computeHash(commands, environment, data); + const resetCacheOnError = (error, action) => { + debug('ezs')(`Error while ${action} cache with hash`, uniqHash, error); + const txn3 = lmdbEnv().beginTxn(); + txn3.putString(this.dbi, encodeKey(uniqHash), encodeValue(Date.now())); + txn3.commit(); + debug('ezs')('Error while deleting cache with hash', uniqHash, error); + }; + const txn = lmdbEnv().beginTxn({ readOnly: true }); + const cache = decodeValue(txn.getString(this.dbi, uniqHash)); + txn.commit(); + + if (hitThe(cache, cleanupDelay)) { + debug('ezs')('Using cache with hash', uniqHash); + this.emit('cache:connected', uniqHash); + const cacheGetInput = ezs.createStream(ezs.objectMode()); + + cacheGetInput.pipe(ezs('storage:flow', { domain: uniqHash })) + .pipe(ezs('extract', { path: 'value.0' })) + .pipe(ezs.catch((e) => e)) + .on('error', (e) => feed.stop(e)) + .on('data', (d) => feed.write(d)) + .on('end', () => { + debug('ezs')('Cache with hash', uniqHash, 'was readed'); + feed.close(); + }); + cacheGetInput.end('GO'); + return true; + } + debug('ezs')('Creating cache with hash', uniqHash); + this.emit('cache:created', uniqHash); + this.cacheSetInput = ezs.createStream(ezs.objectMode()); + const cacheSetOutput = ezs.createPipeline(this.cacheSetInput, streams) + .pipe(ezs.catch((e) => e)) + .on('error', (e) => feed.write(e)) + .on('data', (d) => feed.write(d)) + .pipe(ezs((da, fe) => (da === null ? fe.close() : fe.send({ value: [da] })))) + .pipe(ezs('storage:identify')) + .pipe(ezs('storage:save', { domain: uniqHash, reset: true })) + .pipe(ezs.catch((e) => e)) + .on('error', (e) => resetCacheOnError(e, 'after saving')); + this.whenFinish = new Promise((cacheSaved) => { + cacheSetOutput.on('end', () => { + debug('ezs')('Registering cache with hash', uniqHash); + const txn2 = lmdbEnv().beginTxn(); + txn2.putString(this.dbi, encodeKey(uniqHash), encodeValue({ createdDate: Date.now() })); + txn2.commit(); + cacheSaved(); + }); + }); + debug('ezs')( + `Booster first chunk #${this.getIndex()} containing ${Object.keys(data).length || 0} keys`, + ); + this.whenReady = new Promise((cacheCreated) => ezs.writeTo(this.cacheSetInput, data, () => { + cacheSetOutput.resume(); // empty the pipeline because no processing reads the data it contains. + feed.end(); + cacheCreated(); + })); + } else { + this.whenReady + .then(() => { + if (this.isLast()) { + this.whenFinish + .then(() => { + this.dbi.close(); + feed.close(); + }) + .catch((e) => feed.stop(e)); + this.cacheSetInput.end(); + return true; + } + debug('ezs')(`Booster chunk #${this.getIndex()} containing ${Object.keys(data).length || 0} keys`); + return ezs.writeTo(this.cacheSetInput, data, () => feed.end()); + }); + } + return true; +} diff --git a/packages/storage/src/index.js b/packages/storage/src/index.js index e8c2c7c9..32f8bbfe 100644 --- a/packages/storage/src/index.js +++ b/packages/storage/src/index.js @@ -1,11 +1,13 @@ import save from './save'; import load from './load'; import flow from './flow'; +import boost from './boost'; import identify from './identify'; export default { save, load, flow, + boost, identify, }; diff --git a/packages/storage/src/store.js b/packages/storage/src/store.js index dcbfa1bc..5cdb53d3 100644 --- a/packages/storage/src/store.js +++ b/packages/storage/src/store.js @@ -28,3 +28,4 @@ export const lmdbEnv = () => { }); return handle; }; + diff --git a/packages/storage/test/boost.js b/packages/storage/test/boost.js new file mode 100644 index 00000000..e4c67278 --- /dev/null +++ b/packages/storage/test/boost.js @@ -0,0 +1,385 @@ +import assert from 'assert'; +import { Readable } from 'stream'; +import fs from 'fs'; +import ezs from '../../core/src'; + +ezs.use(require('../src')); +ezs.use(require('./locals')); + + +const environment = { + launchedDate: Date.now(), +} + +class Decade extends Readable { + constructor() { + super({ objectMode: true }); + this.i = 0; + } + + _read() { + this.i += 1; + if (this.i >= 10) { + this.push(null); + } else { + this.push(this.i); + } + } +} + +describe('boost', () => { + describe('Boost script #1', () => { + const script = ` + [transit] + + [transit] + `; + it('with pipeline', (done) => { + let res = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + output.send(input); + })) + .pipe(ezs('delegate', { script })) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 45); + done(); + }); + }); + describe('first call', () => { + it('with boost', (done) => { + let res = 0; + let cid = null; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + output.send(input); + })) + .pipe(ezs('boost', { script }, environment)) + .on('cache:created', (id) => { + cid = id; + }) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.notEqual(cid, null); + assert.strictEqual(res, 45); + done(); + }); + }); + }); + describe('second call', () => { + it('with boost', (done) => { + let res = 0; + let cid = null; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + // to fool the cache + output.send(input === 2 ? 1 : input); + })) + .pipe(ezs('boost', { script }, environment)) + .on('cache:connected', (id) => { + cid = id; + }) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.notEqual(cid, null); + assert.strictEqual(res, 45); + done(); + }); + }); + }); + /**/ + }); + describe('Boost script #2', () => { + const script = ` + [increment] + step = 2 + + [decrement] + step = 1 + `; + it('with pipeline', (done) => { + let res = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + output.send(input); + })) + .pipe(ezs('delegate', { script })) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 54); + done(); + }); + }); + it('with boost', (done) => { + let res = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + output.send(input); + })) + .pipe(ezs('boost', { script }, environment)) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 54); + done(); + }); + }); + /**/ + }); + describe.skip('Boost script #3 (with error)', () => { + const script = ` + [transit] + + [transit] + + [transit] + `; + let cid = null; + describe('first call', () => { + it('with boost', (done) => { + let res = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + output.send(input); + })) + .pipe(ezs('boost', { script }, environment)) + .on('cache:created', (id) => { + cid = id; + }) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 45); + assert.notEqual(cid, null); + done(); + }); + }); + }); + describe('second call', () => { + it('with boost', (done) => { + // force error + fs.writeFileSync(`/tmp/ezs/${cid}`, Buffer.from('')); + + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + // to fool the cache + output.send(input === 2 ? 1 : input); + })) + .pipe(ezs('boost', { script }, environment)) + .on('error', (error) => { + assert(error instanceof Error); + done(); + }); + }); + }); + /**/ + }); + describe.skip('Boost script #4 (with error)', () => { + const script = ` + [transit] + + [transit] + + [transit] + + [transit] + `; + let cid = null; + describe('first call', () => { + it('with boost', (done) => { + let res = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + output.send(input); + })) + .pipe(ezs('boost', { script })) + .on('cache:created', (id) => { + cid = id; + }) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 45); + assert.notEqual(cid, null); + done(); + }); + }); + }); + describe('second call', () => { + it('with boost', (done) => { + // force error + fs.writeFileSync(`/tmp/ezs/${cid}`, Buffer.from('H4sIAJjfd1wAA4vmAgB+f0P4AgAAAA==', 'base64')); + + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + // to fool the cache + output.send(input === 2 ? 1 : input); + })) + .pipe(ezs('boost', { script })) + .pipe(ezs('transit')) + .pipe(ezs.catch((e) => e)) + .on('error', (error) => { + assert(error instanceof Error); + done(); + }); + }); + }); + + /**/ + }); + + + describe('Boost script #5 (with error)', () => { + const script = ` + [transit] + [transit] + [transit] + [transit] + [transit] + `; + let cid = null; + describe('first call', () => { + it('with boost', (done) => { + let res = 0; + let cnt = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + cnt += 1; + if (cnt === 5) { + output.send(new Error('Paf')); + } else { + output.send(input); + } + })) + .pipe(ezs('boost', { script }, environment)) + .on('cache:created', (id) => { + cid = id; + }) + .pipe(ezs.catch((e) => assert(e))) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 40); + assert.notEqual(cid, null); + done(); + }); + }); + }); + describe('second call', () => { + it('with boost', (done) => { + let res = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + // to fool the cache + output.send(input === 2 ? 1 : input); + })) + .pipe(ezs('boost', { script }, environment)) + .pipe(ezs('transit')) + .pipe(ezs.catch((e) => e)) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 40); + assert.notEqual(cid, null); + done(); + }); + }); + }); + }); + +/* + describe('Boost script #6 (with error)', () => { + const script = ` + [transit] + [transit] + [transit] + [transit] + [transit] + [transit] + `; + let cid = null; + describe('first call', () => { + it('with boost', (done) => { + let res = 0; + let cnt = 0; + const ten = new Decade(); + const boost = ezs.boost(statements); + ten + .pipe(boost) + .pipe(ezs((input, output) => { + cnt += 1; + if (cnt === 5) { + boost.emit('error', new Error('Pif')); + } + output.send(input); + })) + .on('error', assert.ifError) + .on('cache:created', (id) => { + cid = id; + }) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 0); + assert.notEqual(cid, null); + done(); + }); + }); + }); + describe('second call', () => { + it('with boost', (done) => { + let res = 0; + const ten = new Decade(); + ten + .pipe(ezs((input, output) => { + throw new Error('Pif'); + })) + .pipe(ezs.boost(statements)) + .on('error', assert.ifError) + .on('data', (chunk) => { + res += chunk; + }) + .on('end', () => { + assert.strictEqual(res, 40); + assert.notEqual(cid, null); + done(); + }); + }); + }); + }); +*/ +}); diff --git a/packages/storage/test/locals.js b/packages/storage/test/locals.js new file mode 100644 index 00000000..7125d221 --- /dev/null +++ b/packages/storage/test/locals.js @@ -0,0 +1,179 @@ +function plus1(data, feed) { + feed.send(data + 1); +} + +function increment(data, feed) { + if (!this.isLast()) { + const step = this.getParam('step', 1); + const value = data || 0; + feed.send(value + step); + } else { + feed.send(data); + } +} + +function decrement(data, feed) { + if (!this.isLast()) { + const step = this.getParam('step', 1); + const value = data || 0; + feed.send(value - step); + } else { + feed.send(data); + } +} + +function stepper(data, feed) { + if (!this.isLast()) { + const step = this.getParam('step'); + const sign = this.getParam('sign', '+'); + const value = data || 0; + if (sign === '+') { + feed.send(value + step); + } else { + feed.send(value - step); + } + } else { + feed.send(data); + } +} + +function slow(data, feed) { + const time2sleep = Number(this.getParam('time', 200)); + if (this.isLast()) { + return setTimeout(() => feed.close(), 1); + } + return setTimeout(() => { + feed.write(data); + setTimeout(() => { + feed.write(data); + feed.end(); + }, time2sleep); + }, time2sleep); +} + +function bad(data, feed) { + if (this.isLast()) { + return feed.close(); + } + feed.end(); + return feed.write(data); +} + +function accu(data, feed) { + if (!this.buff) { + this.buff = []; + } + if (this.isLast()) { + setTimeout(() => { + this.buff.forEach((item) => { + feed.write(item); + }); + feed.close(); + }, 500); + } else { + this.buff.push(data); + feed.end(); + } +} + +function ignoreMe(data, feed) { + this.getParam('object', {}); + return feed.send(data); +} + +function beat(data, feed) { + if (this.isLast()) { + return feed.close(); + } + return setTimeout(() => { + feed.write({ beat: 1 }); + feed.end(); + }, 1); +} + +function boum(data, feed) { + if (this.isLast()) { + return feed.close(); + } + return feed.send(new Error('Boum!')); +} + +// WARNING : https://bytearcher.com/articles/why-asynchronous-exceptions-are-uncatchable/ +function badaboum(data, feed) { + if (this.isLast()) { + return feed.close(); + } + return setTimeout(() => { + throw new Error('Badaboum!'); + }, 1); +} + +function bang() { + throw new Error('Bang!'); +} + +function plouf(data, feed) { + if (this.isLast()) { + return feed.close(); + } + return setTimeout(() => { + feed.stop(new Error(`Plouf #${this.getIndex()}`)); + }, 1); +} + +function plaf(data, feed) { + if (this.isLast()) { + return feed.close(); + } + return setTimeout(() => { + if (data === 7) { + feed.stop(new Error('Plaf!')); + } else { + feed.send(data); + } + }, 1); +} + +function splish(data, feed) { + if (this.isLast()) { + return feed.close(); + } + const p = new Promise((resolve) => { + resolve(data); + }); + return p + .then((d) => feed.send(d)) + .catch(() => feed.end()); +} + +function splash(data, feed) { + if (this.isLast()) { + return feed.close(); + } + const p = new Promise((resolve, reject) => { + reject(data); + }); + return p + .then((d) => feed.send(d)) + .catch(() => feed.end()); +} + + +module.exports = { + plus1, + boum, + increment, + decrement, + stepper, + slow, + bad, + accu, + beat, + ignoreMe, + badaboum, + bang, + plouf, + plaf, + splish, + splash, +};