Skip to content

Commit

Permalink
perf: ⚡️ new strategy to sort stream
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Mar 10, 2020
1 parent bc90d18 commit 867be41
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 111 deletions.
5 changes: 5 additions & 0 deletions packages/analytics/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/analytics/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
},
"homepage": "https://github.com/Inist-CNRS/ezs/tree/master/packages/analytics#readme",
"dependencies": {
"fast-sort": "^2.1.1",
"lda": "~0.2.0",
"leveldown": "~5.1.1",
"levelup": "~4.1.0",
Expand Down
38 changes: 25 additions & 13 deletions packages/analytics/src/sort.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import get from 'lodash.get';
import fastsort from 'fast-sort';
import Store from './store';
import { normalize } from './tune';


const sorting = (arr, reverse = false) => {
if (!reverse) {
return fastsort(arr).asc();
}
return fastsort(arr).desc();
};
/**
* Take all `Object` and sort them with dedicated key
*
Expand Down Expand Up @@ -30,28 +39,31 @@ import Store from './store';
* @param {String} [path=id] path to use for id
* @returns {Object}
*/
export default function sort(data, feed) {
export default async function sort(data, feed) {
if (!this.store) {
this.store = new Store(this.ezs, 'sort');
this.store = new Store(this.ezs, `sort_${Date.now()}`);
this.table = [];
}
if (this.isLast()) {
const reverse = this.getParam('reverse', false);
this.store.cast({ reverse })
.on('data', (item) => feed.write(item.value))
.on('end', () => feed.close());
const sorted = sorting(this.table, reverse);
await sorted.reduce(async (prev, cur) => {
const val = await this.store.get(cur);
feed.write(val);
return prev;
}, Promise.resolve(true));
this.store.close();
feed.close();
} else {
const path = this.getParam('path', 'id');
const fields = Array.isArray(path) ? path : [path];
const values = fields
const keys = fields
.filter((k) => typeof k === 'string')
.map((key) => get(data, key))
.map((val) => (typeof val === 'number'
? val.toFixed(20).toString().padStart(40, '0')
: String(val).slice(0, 40).padEnd(40, '~')));

const key = fields.length > 1 ? values.join(',') : values[0];
.map((key) => get(data, key));
const key = keys.length > 1 ? keys.join(',') : keys[0];
const idx = this.getIndex().toString().padStart(20, '0');
const hash = key.concat('~').concat(idx);
const hash = normalize(key).concat('~').concat(idx).replace(/\s/g, '~');
this.table.push(hash);
this.store.put(hash, data).then(() => feed.end());
}
}
114 changes: 76 additions & 38 deletions packages/analytics/src/store.js
Original file line number Diff line number Diff line change
@@ -1,61 +1,99 @@
import levelup from 'levelup';
import leveldown from 'leveldown';
import { totalmem, cpus } from 'os';
import pathExists from 'path-exists';
import tmpFilepath from 'tmp-filepath';
import core from './core';
import makeDir from 'make-dir';
import lmdb from 'node-lmdb';

const encodeKey = (k) => JSON.stringify(k).split(' ').join('~');
const decodeKey = (k) => JSON.parse(String(k).split('~').join(' '));

const encodeValue = (k) => JSON.stringify(k);
const decodeValue = (k) => JSON.parse(String(k));


function decode(data, feed) {
if (this.isLast()) {
feed.close(); return;
const maxDbs = cpus().length ** 2;
const mapSize = totalmem() / 2;
const encodeKey = (k) => JSON.stringify(k);
const decodeKey = (k) => JSON.parse(String(k));
const encodeValue = (v) => JSON.stringify(v);
const decodeValue = (v) => JSON.parse(String(v));
let handle;
const lmdbEnv = () => {
if (handle) {
return handle;
}
const k = decodeKey(data.key);
const v = decodeValue(data.value);
feed.send(core(k, v));
}

const path = tmpFilepath('store');
if (!pathExists.sync(path)) {
makeDir.sync(path);
}
handle = new lmdb.Env();
handle.open({
path,
mapSize,
maxDbs,
});
return handle;
};

export default class Store {
constructor(ezs, source) {
this.file = tmpFilepath(`.${source}`);
this.db = levelup(leveldown(this.file));
constructor(ezs, domain) {
this.ezs = ezs;
this.dbi = lmdbEnv(this.ezs).openDbi({
name: domain,
create: true,
});
}

get(key) {
return this.db.get(encodeKey(key)).then((val) => new Promise((resolve) => resolve(decodeValue(val))));
}

set(key, value) {
return this.put(key, value);
return new Promise((resolve) => {
const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true });
const ekey = encodeKey(key);
const val = decodeValue(txn.getString(this.dbi, ekey));
txn.commit();
resolve(val);
});
}

put(key, value) {
return this.db.put(
encodeKey(key),
encodeValue(value),
);
return new Promise((resolve) => {
const txn = lmdbEnv(this.ezs).beginTxn();
const ekey = encodeKey(key);
txn.putString(this.dbi, ekey, encodeValue(value));
txn.commit();
resolve(true);
});
}

add(key, value) {
return this.get(key)
.then((val) => this.put(key, val.concat(value)))
.catch(() => this.put(key, [value]));
return new Promise((resolve) => {
const txn = lmdbEnv(this.ezs).beginTxn();
const ekey = encodeKey(key);
const vvalue = decodeValue(txn.getString(this.dbi, ekey));
if (vvalue) {
txn.putString(this.dbi, ekey, encodeValue(vvalue.concat(value)));
} else {
txn.putString(this.dbi, ekey, encodeValue([value]));
}
txn.commit();
resolve(true);
});
}

cast() {
const flow = this.ezs.createStream(this.ezs.objectMode())
.on('end', () => this.close());

cast(opt) {
return this.db.createReadStream(opt)
.on('end', () => this.close())
.pipe(this.ezs(decode));
process.nextTick(() => {
const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true });
const cursor = new lmdb.Cursor(txn, this.dbi);
for (let found = cursor.goToFirst();
found !== null;
found = cursor.goToNext()) {
const id = decodeKey(found);
console.log('get', id);
const value = decodeValue(txn.getString(this.dbi, found));
flow.write({ id, value });
}
flow.end();
txn.commit();
});
return flow;
}

close() {
return this.db.close();
return this.dbi.close();
}
}
3 changes: 1 addition & 2 deletions packages/analytics/src/tune.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import clone from 'lodash.clone';
import core from './core';


const normalize = (s) => {
export const normalize = (s) => {
if (typeof s === 'string') {
return String(s).normalize('NFD').replace(/[\u0300-\u036f]/g, '').padEnd(40, '~');
}
Expand All @@ -25,7 +25,6 @@ const normalize = (s) => {
}
return String(s);
};

const vector = (input, size) => {
const v = Array(size).fill(0);
input.split('').map((x) => x.charCodeAt(0)).forEach((x, i) => { v[i] = x; });
Expand Down
116 changes: 58 additions & 58 deletions packages/analytics/test/sort.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,17 @@ describe('sort ', () => {
{ id: 'électrifie', value: 9 },
{ id: 'collectivité', value: 10 },
])
.pipe(ezs('tune', { path: 'id', method: 'cosine' }))
.pipe(ezs('sort', { reverse: true }))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
.pipe(ezs('tune', { path: 'id', method: 'cosine' }))
.pipe(ezs('sort', { reverse: true }))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
})
.on('end', () => {
assert.equal(10, res[0].value);
assert.equal(1, res[9].value);
done();
.on('end', () => {
assert.equal(10, res[0].value);
assert.equal(1, res[9].value);
done();
});
});

Expand All @@ -235,19 +235,19 @@ describe('sort ', () => {
{ id: 'électrifie', value: 9 },
{ id: 'collectivité', value: 10 },
])
.pipe(ezs('tune', { path: 'id' }))
.pipe(ezs('sort'))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
.pipe(ezs('tune', { path: 'id' }))
.pipe(ezs('sort'))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
})
.on('end', () => {
assert.equal(10, res[0].value);
assert.equal(4, res[1].value);
assert.equal(8, res[2].value);
assert.equal(2, res[9].value);
done();
.on('end', () => {
assert.equal(10, res[0].value);
assert.equal(4, res[1].value);
assert.equal(8, res[2].value);
assert.equal(2, res[9].value);
done();
});
});

Expand All @@ -265,25 +265,25 @@ describe('sort ', () => {
{ id: 'électrifié', value: 40 },
{ id: 'collectivité', value: 10 },
])
.pipe(ezs('tune', { path: 'value' }))
.pipe(ezs('sort'))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
.pipe(ezs('tune', { path: 'value' }))
.pipe(ezs('sort'))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
})
.on('end', () => {
assert.equal(1, res[0].value);
assert.equal(2, res[1].value);
assert.equal(4, res[2].value);
assert.equal(6, res[3].value);
assert.equal(8, res[4].value);
assert.equal(10, res[5].value);
assert.equal(20, res[6].value);
assert.equal(40, res[7].value);
assert.equal(60, res[8].value);
assert.equal(900, res[9].value);
done();
.on('end', () => {
assert.equal(1, res[0].value);
assert.equal(2, res[1].value);
assert.equal(4, res[2].value);
assert.equal(6, res[3].value);
assert.equal(8, res[4].value);
assert.equal(10, res[5].value);
assert.equal(20, res[6].value);
assert.equal(40, res[7].value);
assert.equal(60, res[8].value);
assert.equal(900, res[9].value);
done();
});
});

Expand All @@ -301,25 +301,25 @@ describe('sort ', () => {
{ id: 'électrifié', value: 40 },
{ id: 'collectivité', value: 10 },
])
.pipe(ezs('tune', { path: 'value' }))
.pipe(ezs('sort', { reverse: true }))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
.pipe(ezs('tune', { path: 'value' }))
.pipe(ezs('sort', { reverse: true }))
.pipe(ezs('value'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
})
.on('end', () => {
assert.equal(1, res[9].value);
assert.equal(2, res[8].value);
assert.equal(4, res[7].value);
assert.equal(6, res[6].value);
assert.equal(8, res[5].value);
assert.equal(10, res[4].value);
assert.equal(20, res[3].value);
assert.equal(40, res[2].value);
assert.equal(60, res[1].value);
assert.equal(900, res[0].value);
done();
.on('end', () => {
assert.equal(1, res[9].value);
assert.equal(2, res[8].value);
assert.equal(4, res[7].value);
assert.equal(6, res[6].value);
assert.equal(8, res[5].value);
assert.equal(10, res[4].value);
assert.equal(20, res[3].value);
assert.equal(40, res[2].value);
assert.equal(60, res[1].value);
assert.equal(900, res[0].value);
done();
});
});
});
Loading

0 comments on commit 867be41

Please sign in to comment.