diff --git a/packages/analytics/src/distribute.js b/packages/analytics/src/distribute.js index cc87bac8..52ac0eff 100644 --- a/packages/analytics/src/distribute.js +++ b/packages/analytics/src/distribute.js @@ -60,7 +60,7 @@ export default function distribute(data, feed) { j += 1; } let x = 0; - this.store.cast() + this.store.empty() .on('data', (item) => { const key = parseInt(item.id, 10); const idx = ruler.indexOf(key); diff --git a/packages/analytics/src/reducing.js b/packages/analytics/src/reducing.js index 75cf0deb..08223377 100644 --- a/packages/analytics/src/reducing.js +++ b/packages/analytics/src/reducing.js @@ -42,7 +42,7 @@ export default function reducing(data, feed) { this.store = new Store(this.ezs, 'reducing'); } if (this.isLast()) { - this.store.cast() + this.store.empty() .on('data', (item) => feed.write(item)) .on('end', () => feed.close()); } else { diff --git a/packages/analytics/src/store.js b/packages/analytics/src/store.js index a985b6a2..04cc1ce4 100644 --- a/packages/analytics/src/store.js +++ b/packages/analytics/src/store.js @@ -28,71 +28,118 @@ const lmdbEnv = () => { return handle; }; +export const validKey = (input) => (Boolean(input) && typeof input === 'string' && input.search(/\w+:(\/?\/?)[^\s]+/g) >= 0); + export default class Store { constructor(ezs, domain) { this.ezs = ezs; - this.dbi = lmdbEnv(this.ezs).openDbi({ - name: domain, + this.domain = domain; + this.open(); + } + + open() { + this.handle = lmdbEnv().openDbi({ + name: this.domain, create: true, }); } + dbi() { + return this.handle; + } + get(key) { - return new Promise((resolve) => { - const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true }); + return new Promise((resolve, reject) => { + const txn = lmdbEnv().beginTxn({ readOnly: true }); const ekey = encodeKey(key); - const val = decodeValue(txn.getString(this.dbi, ekey)); - txn.commit(); - resolve(val); + try { + const val = decodeValue(txn.getString(this.dbi(), ekey)); + txn.abort(); + resolve(val); + } catch (e) { + txn.abort(); + reject(e); + } }); } put(key, value) { - return new Promise((resolve) => { - const txn = lmdbEnv(this.ezs).beginTxn(); + return new Promise((resolve, reject) => { + const txn = lmdbEnv().beginTxn(); const ekey = encodeKey(key); - txn.putString(this.dbi, ekey, encodeValue(value)); + try { + txn.putString(this.dbi(), ekey, encodeValue(value)); + } catch (e) { + txn.abort(); + reject(e); + } txn.commit(); resolve(true); }); } add(key, value) { - return new Promise((resolve) => { - const txn = lmdbEnv(this.ezs).beginTxn(); + return new Promise((resolve, reject) => { + const txn = lmdbEnv().beginTxn(); const ekey = encodeKey(key); - const vvalue = decodeValue(txn.getString(this.dbi, ekey)); - if (vvalue) { - txn.putString(this.dbi, ekey, encodeValue(vvalue.concat(value))); - } else { - txn.putString(this.dbi, ekey, encodeValue([value])); + const vvalue = decodeValue(txn.getString(this.dbi(), ekey)); + try { + if (vvalue) { + txn.putString(this.dbi(), ekey, encodeValue(vvalue.concat(value))); + } else { + txn.putString(this.dbi(), ekey, encodeValue([value])); + } + } catch (e) { + txn.abort(); + reject(e); } txn.commit(); resolve(true); }); } + stream() { + return this.cast(); + } + + empty() { + return this.cast().on('end', () => this.reset()); + } + cast() { - const flow = this.ezs.createStream(this.ezs.objectMode()) - .on('end', () => this.close()); + const flow = this.ezs.createStream(this.ezs.objectMode()); process.nextTick(() => { const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true }); - const cursor = new lmdb.Cursor(txn, this.dbi); - for (let found = cursor.goToFirst(); - found !== null; - found = cursor.goToNext()) { - const id = decodeKey(found); - const value = decodeValue(txn.getString(this.dbi, found)); - flow.write({ id, value }); - } - flow.end(); - txn.commit(); + const cursor = new lmdb.Cursor(txn, this.dbi()); + const walker = (found, done) => { + if (found) { + const id = decodeKey(found); + const value = decodeValue(txn.getString(this.dbi(), found)); + this.ezs.writeTo(flow, { id, value }, (err, writable) => { + if (err || writable === false) { + return done(); + } + return walker(cursor.goToNext(), done); + }); + } else { + done(); + } + }; + walker(cursor.goToFirst(), () => { + flow.end(); + txn.abort(); + }); }); return flow; } + reset() { + this.dbi().drop(); + this.open(); + } + close() { - return this.dbi.drop(); + this.dbi().close(); } } diff --git a/packages/lodex/src/formatOutput.js b/packages/lodex/src/formatOutput.js index 01aabdf8..fb304401 100644 --- a/packages/lodex/src/formatOutput.js +++ b/packages/lodex/src/formatOutput.js @@ -48,8 +48,8 @@ function formatOutput(data, feed) { let check = false; keys.forEach((k, index) => { if (values[index]) { + feed.write(!check ? ' ' : ','); check = true; - feed.write(index === 0 ? ' ' : ','); feed.write(json(k)); feed.write(':'); feed.write(json(values[index])); diff --git a/packages/storage/src/boost.js b/packages/storage/src/boost.js index 33af1580..0166791a 100644 --- a/packages/storage/src/boost.js +++ b/packages/storage/src/boost.js @@ -1,18 +1,24 @@ import hasher from 'node-object-hash'; import DateDiff from 'date-diff'; import debug from 'debug'; -import { - decodeValue, encodeKey, encodeValue, lmdbEnv, -} from './store'; - - -let dbi; +import Store from './store'; const hashCoerce = hasher({ sort: false, coerce: true, }); +function createURI(data, feed) { + if (this.isLast()) { + return feed.close(); + } + const uri = 'uid:'.concat(this.getIndex().toString().padStart(10, '0')); + return feed.send({ + value: [data], + uri, + }); +} + const computeHash = (commands, environment, chunk) => { const commandsHash = hashCoerce.hash(commands); const environmentHash = hashCoerce.hash(environment); @@ -45,7 +51,7 @@ function hitThe(cache, ttl) { * @param {Number} [cleanupDelay=600] Frequency (seconds) to cleanup the cache (10 min) * @returns {Object} */ -export default function boost(data, feed) { +export default async function boost(data, feed) { const { ezs } = this; if (this.isFirst()) { const file = this.getParam('file'); @@ -55,22 +61,16 @@ export default function boost(data, feed) { const commands = this.getParam('commands', cmds.get()); const cleanupDelay = Number(this.getParam('cleanupDelay', 10 * 60)); const environment = this.getEnv(); - if (!dbi) { - dbi = lmdbEnv(this.ezs).openDbi({ - name: 'cache_index', - create: true, - }); + if (!this.store) { + this.store = new Store(this.ezs, 'cache_index'); } - if (!commands || commands.length === 0) { return feed.stop(new Error('Invalid parameter for booster')); } const streams = ezs.compileCommands(commands, environment); const uniqHash = String(this.getParam('key') || computeHash(commands, environment, data)); - const txn = lmdbEnv().beginTxn({ readOnly: true }); - const cache = decodeValue(txn.getString(dbi, uniqHash)); - txn.commit(); + const cache = await this.store.get(uniqHash); if (hitThe(cache, cleanupDelay)) { debug('ezs')('Boost using cache with hash', uniqHash); @@ -96,8 +96,7 @@ export default function boost(data, feed) { .pipe(ezs.catch()) .on('error', (e) => feed.write(e)) .on('data', (d) => feed.write(d)) - .pipe(ezs((da, fe) => (da === null ? fe.close() : fe.send({ value: [da] })))) - .pipe(ezs('storage:identify')) + .pipe(ezs(createURI)) .pipe(ezs('storage:save', { domain: uniqHash, reset: true })) .pipe(ezs.catch()); this.whenFinish = new Promise((cacheSaved) => { @@ -105,11 +104,14 @@ export default function boost(data, feed) { debug('ezs')('Error catched, no cache created with hash', uniqHash, error); cacheSaved(); }); - cacheSetOutput.on('end', () => { + cacheSetOutput.on('end', async () => { debug('ezs')('Registering cache with hash', uniqHash); - const txn2 = lmdbEnv().beginTxn(); - txn2.putString(dbi, encodeKey(uniqHash), encodeValue({ createdDate: Date.now() })); - txn2.commit(); + try { + await this.store.put(uniqHash, { createdDate: Date.now() }); + } catch (error) { + debug('ezs')('Error catched, no cache created with hash', uniqHash, error); + cacheSaved(); + } cacheSaved(); }); }); @@ -128,6 +130,7 @@ export default function boost(data, feed) { debug('ezs')(`${this.getIndex()} chunks have been boosted`); this.whenFinish .then(() => { + this.store.close(); feed.close(); }) .catch((e) => feed.stop(e)); diff --git a/packages/storage/src/flow.js b/packages/storage/src/flow.js index f9c911f3..d7ec21c3 100644 --- a/packages/storage/src/flow.js +++ b/packages/storage/src/flow.js @@ -1,5 +1,4 @@ -import lmdb from 'node-lmdb'; -import { encodeKey, decodeValue, lmdbEnv } from './store'; +import Store from './store'; /** * Take an `Object` and replace it with all the objects of the same domain contained in the store. @@ -11,29 +10,21 @@ import { encodeKey, decodeValue, lmdbEnv } from './store'; */ export default async function flow(data, feed) { const length = Number(this.getParam('length', -1)); + const statement = length === -1 ? 'transit' : 'truncate'; const domainName = this.getParam('domain', 'ezs'); const domain = Array.isArray(domainName) ? domainName.shift() : domainName; - if (this.isFirst()) { - this.dbi = lmdbEnv(this.ezs).openDbi({ - name: domain, - }); + if (!this.store) { + this.store = new Store(this.ezs, domain); } if (this.isLast()) { - if (this.dbi) { - this.dbi.close(); - } + this.store.close(); return feed.close(); } - const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true }); - const cursor = new lmdb.Cursor(txn, this.dbi); - let counter = 0; - for (let found = cursor.goToFirst(); - (found !== null && (counter < length || length < 0)); - found = cursor.goToNext()) { - counter += 1; - const value = txn.getString(this.dbi, encodeKey(found)); - feed.write(decodeValue(value)); - } - txn.commit(); - return feed.end(); + + return this.store.stream() + .pipe(this.ezs('extract', { path: 'value' })) + .pipe(this.ezs(statement, { length })) + .on('data', (item) => feed.write(item)) + .on('error', (e) => feed.stop(e)) + .on('end', () => feed.end()); } diff --git a/packages/storage/src/load.js b/packages/storage/src/load.js index 1fdf88c5..1900fe47 100644 --- a/packages/storage/src/load.js +++ b/packages/storage/src/load.js @@ -1,6 +1,4 @@ -import { - validKey, encodeKey, decodeValue, lmdbEnv, -} from './store'; +import Store, { validKey } from './store'; /** * With a `String`, containing a URI throw all the documents that match @@ -10,23 +8,17 @@ import { */ export default async function load(data, feed) { const domain = this.getParam('domain', 'ezs'); - if (this.isFirst()) { - this.dbi = lmdbEnv(this.ezs).openDbi({ - name: domain, - create: true, - }); + if (!this.store) { + this.store = new Store(this.ezs, domain); } if (this.isLast()) { - if (this.dbi) { - this.dbi.close(); - } + this.store.close(); return feed.close(); } if (!validKey(data)) { return feed.end(); } - const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true }); - const value = feed.send(decodeValue(txn.getString(this.dbi, encodeKey(data)))); - txn.commit(); + + const value = await this.store.get(data); return feed.send(value); } diff --git a/packages/storage/src/save.js b/packages/storage/src/save.js index 6c2ddc1a..99efe806 100644 --- a/packages/storage/src/save.js +++ b/packages/storage/src/save.js @@ -1,8 +1,6 @@ import { hostname } from 'os'; import get from 'lodash.get'; -import { - validKey, encodeKey, encodeValue, lmdbEnv, -} from './store'; +import Store, { validKey } from './store'; /** * Take `Object`, to save it into a store and throw an URL @@ -22,32 +20,20 @@ export default async function save(data, feed) { const uri = get(data, path); const domainName = this.getParam('domain', 'ezs'); const domain = Array.isArray(domainName) ? domainName.shift() : domainName; - - if (this.isFirst()) { - this.dbi = lmdbEnv(this.ezs).openDbi({ - name: domain, - create: true, - }); + if (!this.store) { + this.store = new Store(this.ezs, domain); + } + if (this.isFirst() && reset === true) { + this.store.reset(); } if (this.isLast()) { - if (this.dbi) { - this.dbi.close(); - } + this.store.close(); return feed.close(); } if (!validKey(uri)) { return feed.end(); } - if (this.isFirst() && reset === true) { - this.dbi.drop(); - this.dbi = lmdbEnv(this.ezs).openDbi({ - name: domain, - create: true, - }); - } - const txn = lmdbEnv(this.ezs).beginTxn(); - txn.putString(this.dbi, encodeKey(uri), encodeValue(data)); - txn.commit(); + await this.store.put(uri, data); if (protocol && host) { return feed.send(`${protocol}//${host}/${uri}`); diff --git a/packages/storage/src/store.js b/packages/storage/src/store.js index 5cdb53d3..04cc1ce4 100644 --- a/packages/storage/src/store.js +++ b/packages/storage/src/store.js @@ -1,22 +1,21 @@ -import { join } from 'path'; -import { tmpdir, totalmem, cpus } from 'os'; +import { totalmem, cpus } from 'os'; import pathExists from 'path-exists'; +import tmpFilepath from 'tmp-filepath'; import makeDir from 'make-dir'; import lmdb from 'node-lmdb'; -const EZS_STORAGE_PATH = process.env.EZS_STORAGE_PATH || tmpdir(); const maxDbs = cpus().length ** 2; const mapSize = totalmem() / 2; +const encodeKey = (k) => JSON.stringify(k); +const decodeKey = (k) => JSON.parse(String(k)); +const encodeValue = (v) => JSON.stringify(v); +const decodeValue = (v) => JSON.parse(String(v)); let handle; -export const validKey = (input) => (Boolean(input) && typeof input === 'string' && input.search(/\w+:(\/?\/?)[^\s]+/g) >= 0); -export const encodeKey = (k) => k; -export const encodeValue = (v) => JSON.stringify(v); -export const decodeValue = (v) => JSON.parse(String(v)); -export const lmdbEnv = () => { +const lmdbEnv = () => { if (handle) { return handle; } - const path = join(EZS_STORAGE_PATH, 'store'); + const path = tmpFilepath('store'); if (!pathExists.sync(path)) { makeDir.sync(path); } @@ -29,3 +28,118 @@ export const lmdbEnv = () => { return handle; }; +export const validKey = (input) => (Boolean(input) && typeof input === 'string' && input.search(/\w+:(\/?\/?)[^\s]+/g) >= 0); + +export default class Store { + constructor(ezs, domain) { + this.ezs = ezs; + this.domain = domain; + this.open(); + } + + open() { + this.handle = lmdbEnv().openDbi({ + name: this.domain, + create: true, + }); + } + + dbi() { + return this.handle; + } + + get(key) { + return new Promise((resolve, reject) => { + const txn = lmdbEnv().beginTxn({ readOnly: true }); + const ekey = encodeKey(key); + try { + const val = decodeValue(txn.getString(this.dbi(), ekey)); + txn.abort(); + resolve(val); + } catch (e) { + txn.abort(); + reject(e); + } + }); + } + + put(key, value) { + return new Promise((resolve, reject) => { + const txn = lmdbEnv().beginTxn(); + const ekey = encodeKey(key); + try { + txn.putString(this.dbi(), ekey, encodeValue(value)); + } catch (e) { + txn.abort(); + reject(e); + } + txn.commit(); + resolve(true); + }); + } + + add(key, value) { + return new Promise((resolve, reject) => { + const txn = lmdbEnv().beginTxn(); + const ekey = encodeKey(key); + const vvalue = decodeValue(txn.getString(this.dbi(), ekey)); + try { + if (vvalue) { + txn.putString(this.dbi(), ekey, encodeValue(vvalue.concat(value))); + } else { + txn.putString(this.dbi(), ekey, encodeValue([value])); + } + } catch (e) { + txn.abort(); + reject(e); + } + txn.commit(); + resolve(true); + }); + } + + stream() { + return this.cast(); + } + + empty() { + return this.cast().on('end', () => this.reset()); + } + + cast() { + const flow = this.ezs.createStream(this.ezs.objectMode()); + + process.nextTick(() => { + const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true }); + const cursor = new lmdb.Cursor(txn, this.dbi()); + const walker = (found, done) => { + if (found) { + const id = decodeKey(found); + const value = decodeValue(txn.getString(this.dbi(), found)); + this.ezs.writeTo(flow, { id, value }, (err, writable) => { + if (err || writable === false) { + return done(); + } + return walker(cursor.goToNext(), done); + }); + } else { + done(); + } + }; + walker(cursor.goToFirst(), () => { + flow.end(); + txn.abort(); + }); + }); + return flow; + } + + reset() { + this.dbi().drop(); + this.open(); + } + + close() { + this.dbi().close(); + } +} diff --git a/packages/storage/test/boost.js b/packages/storage/test/boost.js index 9c525e68..1210e88c 100644 --- a/packages/storage/test/boost.js +++ b/packages/storage/test/boost.js @@ -512,7 +512,7 @@ describe('boost', () => { }); - describe('with a pipeline & a fixed key in parallel', () => { + describe.only('with a pipeline & a fixed key in parallel', () => { const script = ` [transit] @@ -558,7 +558,7 @@ describe('boost', () => { .on('cache:created', (id) => { cid = id; }) - .on('error', (e) => expect(e).toBetoBeUndefined()) + .on('error', (e) => expect(e).toBeUndefined()) .on('data', (chunk) => { res += chunk; })