From 867be415c1300ff8ea52f5618e816c4347977c2a Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Tue, 10 Mar 2020 22:47:54 +0100 Subject: [PATCH] =?UTF-8?q?perf:=20=E2=9A=A1=EF=B8=8F=20new=20strategy=20t?= =?UTF-8?q?o=20sort=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/analytics/package-lock.json | 5 ++ packages/analytics/package.json | 1 + packages/analytics/src/sort.js | 38 ++++++--- packages/analytics/src/store.js | 114 +++++++++++++++++--------- packages/analytics/src/tune.js | 3 +- packages/analytics/test/sort.js | 116 +++++++++++++-------------- packages/analytics/test/store.js | 80 ++++++++++++++++++ 7 files changed, 246 insertions(+), 111 deletions(-) create mode 100644 packages/analytics/test/store.js diff --git a/packages/analytics/package-lock.json b/packages/analytics/package-lock.json index 63a77926..ddb73341 100644 --- a/packages/analytics/package-lock.json +++ b/packages/analytics/package-lock.json @@ -440,6 +440,11 @@ } } }, + "fast-sort": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/fast-sort/-/fast-sort-2.1.1.tgz", + "integrity": "sha512-X8DNrUzoejZZ+AdAHCVXdKHUrbzz57jKG5/prsKKzyaVKwzSwzm0HQ76cPdo69LObWDShJ77Cn1nPMSXP87FOQ==" + }, "figgy-pudding": { "version": "3.5.1", "resolved": "https://registry.npmjs.org/figgy-pudding/-/figgy-pudding-3.5.1.tgz", diff --git a/packages/analytics/package.json b/packages/analytics/package.json index 0ec7cce4..714f65c9 100644 --- a/packages/analytics/package.json +++ b/packages/analytics/package.json @@ -19,6 +19,7 @@ }, "homepage": "https://github.com/Inist-CNRS/ezs/tree/master/packages/analytics#readme", "dependencies": { + "fast-sort": "^2.1.1", "lda": "~0.2.0", "leveldown": "~5.1.1", "levelup": "~4.1.0", diff --git a/packages/analytics/src/sort.js b/packages/analytics/src/sort.js index d2f10c4b..f112f275 100644 --- a/packages/analytics/src/sort.js +++ b/packages/analytics/src/sort.js @@ -1,6 +1,15 @@ import get from 'lodash.get'; +import fastsort from 'fast-sort'; import Store from './store'; +import { normalize } from './tune'; + +const sorting = (arr, reverse = false) => { + if (!reverse) { + return fastsort(arr).asc(); + } + return fastsort(arr).desc(); +}; /** * Take all `Object` and sort them with dedicated key * @@ -30,28 +39,31 @@ import Store from './store'; * @param {String} [path=id] path to use for id * @returns {Object} */ -export default function sort(data, feed) { +export default async function sort(data, feed) { if (!this.store) { - this.store = new Store(this.ezs, 'sort'); + this.store = new Store(this.ezs, `sort_${Date.now()}`); + this.table = []; } if (this.isLast()) { const reverse = this.getParam('reverse', false); - this.store.cast({ reverse }) - .on('data', (item) => feed.write(item.value)) - .on('end', () => feed.close()); + const sorted = sorting(this.table, reverse); + await sorted.reduce(async (prev, cur) => { + const val = await this.store.get(cur); + feed.write(val); + return prev; + }, Promise.resolve(true)); + this.store.close(); + feed.close(); } else { const path = this.getParam('path', 'id'); const fields = Array.isArray(path) ? path : [path]; - const values = fields + const keys = fields .filter((k) => typeof k === 'string') - .map((key) => get(data, key)) - .map((val) => (typeof val === 'number' - ? val.toFixed(20).toString().padStart(40, '0') - : String(val).slice(0, 40).padEnd(40, '~'))); - - const key = fields.length > 1 ? values.join(',') : values[0]; + .map((key) => get(data, key)); + const key = keys.length > 1 ? keys.join(',') : keys[0]; const idx = this.getIndex().toString().padStart(20, '0'); - const hash = key.concat('~').concat(idx); + const hash = normalize(key).concat('~').concat(idx).replace(/\s/g, '~'); + this.table.push(hash); this.store.put(hash, data).then(() => feed.end()); } } diff --git a/packages/analytics/src/store.js b/packages/analytics/src/store.js index 44d36941..2c80f0bd 100644 --- a/packages/analytics/src/store.js +++ b/packages/analytics/src/store.js @@ -1,61 +1,99 @@ -import levelup from 'levelup'; -import leveldown from 'leveldown'; +import { totalmem, cpus } from 'os'; +import pathExists from 'path-exists'; import tmpFilepath from 'tmp-filepath'; -import core from './core'; +import makeDir from 'make-dir'; +import lmdb from 'node-lmdb'; -const encodeKey = (k) => JSON.stringify(k).split(' ').join('~'); -const decodeKey = (k) => JSON.parse(String(k).split('~').join(' ')); - -const encodeValue = (k) => JSON.stringify(k); -const decodeValue = (k) => JSON.parse(String(k)); - - -function decode(data, feed) { - if (this.isLast()) { - feed.close(); return; +const maxDbs = cpus().length ** 2; +const mapSize = totalmem() / 2; +const encodeKey = (k) => JSON.stringify(k); +const decodeKey = (k) => JSON.parse(String(k)); +const encodeValue = (v) => JSON.stringify(v); +const decodeValue = (v) => JSON.parse(String(v)); +let handle; +const lmdbEnv = () => { + if (handle) { + return handle; } - const k = decodeKey(data.key); - const v = decodeValue(data.value); - feed.send(core(k, v)); -} - + const path = tmpFilepath('store'); + if (!pathExists.sync(path)) { + makeDir.sync(path); + } + handle = new lmdb.Env(); + handle.open({ + path, + mapSize, + maxDbs, + }); + return handle; +}; export default class Store { - constructor(ezs, source) { - this.file = tmpFilepath(`.${source}`); - this.db = levelup(leveldown(this.file)); + constructor(ezs, domain) { this.ezs = ezs; + this.dbi = lmdbEnv(this.ezs).openDbi({ + name: domain, + create: true, + }); } get(key) { - return this.db.get(encodeKey(key)).then((val) => new Promise((resolve) => resolve(decodeValue(val)))); - } - - set(key, value) { - return this.put(key, value); + return new Promise((resolve) => { + const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true }); + const ekey = encodeKey(key); + const val = decodeValue(txn.getString(this.dbi, ekey)); + txn.commit(); + resolve(val); + }); } put(key, value) { - return this.db.put( - encodeKey(key), - encodeValue(value), - ); + return new Promise((resolve) => { + const txn = lmdbEnv(this.ezs).beginTxn(); + const ekey = encodeKey(key); + txn.putString(this.dbi, ekey, encodeValue(value)); + txn.commit(); + resolve(true); + }); } add(key, value) { - return this.get(key) - .then((val) => this.put(key, val.concat(value))) - .catch(() => this.put(key, [value])); + return new Promise((resolve) => { + const txn = lmdbEnv(this.ezs).beginTxn(); + const ekey = encodeKey(key); + const vvalue = decodeValue(txn.getString(this.dbi, ekey)); + if (vvalue) { + txn.putString(this.dbi, ekey, encodeValue(vvalue.concat(value))); + } else { + txn.putString(this.dbi, ekey, encodeValue([value])); + } + txn.commit(); + resolve(true); + }); } + cast() { + const flow = this.ezs.createStream(this.ezs.objectMode()) + .on('end', () => this.close()); - cast(opt) { - return this.db.createReadStream(opt) - .on('end', () => this.close()) - .pipe(this.ezs(decode)); + process.nextTick(() => { + const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true }); + const cursor = new lmdb.Cursor(txn, this.dbi); + for (let found = cursor.goToFirst(); + found !== null; + found = cursor.goToNext()) { + const id = decodeKey(found); + console.log('get', id); + const value = decodeValue(txn.getString(this.dbi, found)); + flow.write({ id, value }); + } + flow.end(); + txn.commit(); + }); + return flow; } close() { - return this.db.close(); + return this.dbi.close(); } } diff --git a/packages/analytics/src/tune.js b/packages/analytics/src/tune.js index 4ff5dfc0..f999f286 100644 --- a/packages/analytics/src/tune.js +++ b/packages/analytics/src/tune.js @@ -16,7 +16,7 @@ import clone from 'lodash.clone'; import core from './core'; -const normalize = (s) => { +export const normalize = (s) => { if (typeof s === 'string') { return String(s).normalize('NFD').replace(/[\u0300-\u036f]/g, '').padEnd(40, '~'); } @@ -25,7 +25,6 @@ const normalize = (s) => { } return String(s); }; - const vector = (input, size) => { const v = Array(size).fill(0); input.split('').map((x) => x.charCodeAt(0)).forEach((x, i) => { v[i] = x; }); diff --git a/packages/analytics/test/sort.js b/packages/analytics/test/sort.js index 125c80f5..89eb00c4 100644 --- a/packages/analytics/test/sort.js +++ b/packages/analytics/test/sort.js @@ -205,17 +205,17 @@ describe('sort ', () => { { id: 'électrifie', value: 9 }, { id: 'collectivité', value: 10 }, ]) - .pipe(ezs('tune', { path: 'id', method: 'cosine' })) - .pipe(ezs('sort', { reverse: true })) - .pipe(ezs('value')) - .on('data', (chunk) => { - assert(typeof chunk === 'object'); - res.push(chunk); + .pipe(ezs('tune', { path: 'id', method: 'cosine' })) + .pipe(ezs('sort', { reverse: true })) + .pipe(ezs('value')) + .on('data', (chunk) => { + assert(typeof chunk === 'object'); + res.push(chunk); }) - .on('end', () => { - assert.equal(10, res[0].value); - assert.equal(1, res[9].value); - done(); + .on('end', () => { + assert.equal(10, res[0].value); + assert.equal(1, res[9].value); + done(); }); }); @@ -235,19 +235,19 @@ describe('sort ', () => { { id: 'électrifie', value: 9 }, { id: 'collectivité', value: 10 }, ]) - .pipe(ezs('tune', { path: 'id' })) - .pipe(ezs('sort')) - .pipe(ezs('value')) - .on('data', (chunk) => { - assert(typeof chunk === 'object'); - res.push(chunk); + .pipe(ezs('tune', { path: 'id' })) + .pipe(ezs('sort')) + .pipe(ezs('value')) + .on('data', (chunk) => { + assert(typeof chunk === 'object'); + res.push(chunk); }) - .on('end', () => { - assert.equal(10, res[0].value); - assert.equal(4, res[1].value); - assert.equal(8, res[2].value); - assert.equal(2, res[9].value); - done(); + .on('end', () => { + assert.equal(10, res[0].value); + assert.equal(4, res[1].value); + assert.equal(8, res[2].value); + assert.equal(2, res[9].value); + done(); }); }); @@ -265,25 +265,25 @@ describe('sort ', () => { { id: 'électrifié', value: 40 }, { id: 'collectivité', value: 10 }, ]) - .pipe(ezs('tune', { path: 'value' })) - .pipe(ezs('sort')) - .pipe(ezs('value')) - .on('data', (chunk) => { - assert(typeof chunk === 'object'); - res.push(chunk); + .pipe(ezs('tune', { path: 'value' })) + .pipe(ezs('sort')) + .pipe(ezs('value')) + .on('data', (chunk) => { + assert(typeof chunk === 'object'); + res.push(chunk); }) - .on('end', () => { - assert.equal(1, res[0].value); - assert.equal(2, res[1].value); - assert.equal(4, res[2].value); - assert.equal(6, res[3].value); - assert.equal(8, res[4].value); - assert.equal(10, res[5].value); - assert.equal(20, res[6].value); - assert.equal(40, res[7].value); - assert.equal(60, res[8].value); - assert.equal(900, res[9].value); - done(); + .on('end', () => { + assert.equal(1, res[0].value); + assert.equal(2, res[1].value); + assert.equal(4, res[2].value); + assert.equal(6, res[3].value); + assert.equal(8, res[4].value); + assert.equal(10, res[5].value); + assert.equal(20, res[6].value); + assert.equal(40, res[7].value); + assert.equal(60, res[8].value); + assert.equal(900, res[9].value); + done(); }); }); @@ -301,25 +301,25 @@ describe('sort ', () => { { id: 'électrifié', value: 40 }, { id: 'collectivité', value: 10 }, ]) - .pipe(ezs('tune', { path: 'value' })) - .pipe(ezs('sort', { reverse: true })) - .pipe(ezs('value')) - .on('data', (chunk) => { - assert(typeof chunk === 'object'); - res.push(chunk); + .pipe(ezs('tune', { path: 'value' })) + .pipe(ezs('sort', { reverse: true })) + .pipe(ezs('value')) + .on('data', (chunk) => { + assert(typeof chunk === 'object'); + res.push(chunk); }) - .on('end', () => { - assert.equal(1, res[9].value); - assert.equal(2, res[8].value); - assert.equal(4, res[7].value); - assert.equal(6, res[6].value); - assert.equal(8, res[5].value); - assert.equal(10, res[4].value); - assert.equal(20, res[3].value); - assert.equal(40, res[2].value); - assert.equal(60, res[1].value); - assert.equal(900, res[0].value); - done(); + .on('end', () => { + assert.equal(1, res[9].value); + assert.equal(2, res[8].value); + assert.equal(4, res[7].value); + assert.equal(6, res[6].value); + assert.equal(8, res[5].value); + assert.equal(10, res[4].value); + assert.equal(20, res[3].value); + assert.equal(40, res[2].value); + assert.equal(60, res[1].value); + assert.equal(900, res[0].value); + done(); }); }); }); diff --git a/packages/analytics/test/store.js b/packages/analytics/test/store.js new file mode 100644 index 00000000..9520d747 --- /dev/null +++ b/packages/analytics/test/store.js @@ -0,0 +1,80 @@ +import ezs from '../../core/src'; +import Store from '../src/store'; + +describe('Store', () => { + it('add distinct values', async (done) => { + const store = new Store(ezs, 'test_store1'); + await Promise.all([ + store.add(1, 'A'), + store.add(2, 'B'), + store.add(3, 'C'), + ]); + const output = []; + store + .cast() + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + expect(output.length).toEqual(3); + expect(output[0].id).toEqual(1); + expect(output[0].value[0]).toEqual('A'); + done(); + }); + }); + + it('add duplicate keys', async (done) => { + const store = new Store(ezs, 'test_store2'); + await store.add(1, 'A'); + await store.add(2, 'B'); + await store.add(2, 'X'); + await store.add(2, 'D'); + await store.add(3, 'C'); + await store.add(1, 'A'); + await store.add(1, 'A'); + await store.add(1, 'R'); + const output = []; + store + .cast() + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + expect(output.length).toEqual(3); + expect(output[0].id).toEqual(1); + expect(output[0].value.length).toEqual(4); + expect(output[1].value.length).toEqual(3); + expect(output[2].value.length).toEqual(1); + done(); + }); + }); + + it('put duplicate keys', async (done) => { + const store = new Store(ezs, 'test_store2'); + await store.put(1, 'A'); + await store.put(2, 'B'); + await store.put(2, 'X'); + await store.put(2, 'D'); + await store.put(3, 'C'); + await store.put(1, 'A'); + await store.put(1, 'A'); + await store.put(1, 'R'); + const output = []; + store + .cast() + .on('data', (chunk) => { + output.push(chunk); + }) + .on('end', () => { + expect(output.length).toEqual(3); + expect(output[0].id).toEqual(1); + expect(output[0].value).toEqual('R'); + expect(output[1].value.length).toEqual(1); + expect(output[1].value).toEqual('D'); + expect(output[2].value.length).toEqual(1); + expect(output[2].value).toEqual('C'); + done(); + }); + }); + +});