Skip to content

Commit

Permalink
fix: 🐛 bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Mar 27, 2020
1 parent aab8dbe commit 277de15
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 123 deletions.
2 changes: 1 addition & 1 deletion packages/analytics/src/distribute.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export default function distribute(data, feed) {
j += 1;
}
let x = 0;
this.store.cast()
this.store.empty()
.on('data', (item) => {
const key = parseInt(item.id, 10);
const idx = ruler.indexOf(key);
Expand Down
2 changes: 1 addition & 1 deletion packages/analytics/src/reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default function reducing(data, feed) {
this.store = new Store(this.ezs, 'reducing');
}
if (this.isLast()) {
this.store.cast()
this.store.empty()
.on('data', (item) => feed.write(item))
.on('end', () => feed.close());
} else {
Expand Down
107 changes: 77 additions & 30 deletions packages/analytics/src/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,71 +28,118 @@ const lmdbEnv = () => {
return handle;
};

export const validKey = (input) => (Boolean(input) && typeof input === 'string' && input.search(/\w+:(\/?\/?)[^\s]+/g) >= 0);

export default class Store {
constructor(ezs, domain) {
this.ezs = ezs;
this.dbi = lmdbEnv(this.ezs).openDbi({
name: domain,
this.domain = domain;
this.open();
}

open() {
this.handle = lmdbEnv().openDbi({
name: this.domain,
create: true,
});
}

dbi() {
return this.handle;
}

get(key) {
return new Promise((resolve) => {
const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true });
return new Promise((resolve, reject) => {
const txn = lmdbEnv().beginTxn({ readOnly: true });
const ekey = encodeKey(key);
const val = decodeValue(txn.getString(this.dbi, ekey));
txn.commit();
resolve(val);
try {
const val = decodeValue(txn.getString(this.dbi(), ekey));
txn.abort();
resolve(val);
} catch (e) {
txn.abort();
reject(e);
}
});
}

put(key, value) {
return new Promise((resolve) => {
const txn = lmdbEnv(this.ezs).beginTxn();
return new Promise((resolve, reject) => {
const txn = lmdbEnv().beginTxn();
const ekey = encodeKey(key);
txn.putString(this.dbi, ekey, encodeValue(value));
try {
txn.putString(this.dbi(), ekey, encodeValue(value));
} catch (e) {
txn.abort();
reject(e);
}
txn.commit();
resolve(true);
});
}

add(key, value) {
return new Promise((resolve) => {
const txn = lmdbEnv(this.ezs).beginTxn();
return new Promise((resolve, reject) => {
const txn = lmdbEnv().beginTxn();
const ekey = encodeKey(key);
const vvalue = decodeValue(txn.getString(this.dbi, ekey));
if (vvalue) {
txn.putString(this.dbi, ekey, encodeValue(vvalue.concat(value)));
} else {
txn.putString(this.dbi, ekey, encodeValue([value]));
const vvalue = decodeValue(txn.getString(this.dbi(), ekey));
try {
if (vvalue) {
txn.putString(this.dbi(), ekey, encodeValue(vvalue.concat(value)));
} else {
txn.putString(this.dbi(), ekey, encodeValue([value]));
}
} catch (e) {
txn.abort();
reject(e);
}
txn.commit();
resolve(true);
});
}

stream() {
return this.cast();
}

empty() {
return this.cast().on('end', () => this.reset());
}

cast() {
const flow = this.ezs.createStream(this.ezs.objectMode())
.on('end', () => this.close());
const flow = this.ezs.createStream(this.ezs.objectMode());

process.nextTick(() => {
const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true });
const cursor = new lmdb.Cursor(txn, this.dbi);
for (let found = cursor.goToFirst();
found !== null;
found = cursor.goToNext()) {
const id = decodeKey(found);
const value = decodeValue(txn.getString(this.dbi, found));
flow.write({ id, value });
}
flow.end();
txn.commit();
const cursor = new lmdb.Cursor(txn, this.dbi());
const walker = (found, done) => {
if (found) {
const id = decodeKey(found);
const value = decodeValue(txn.getString(this.dbi(), found));
this.ezs.writeTo(flow, { id, value }, (err, writable) => {
if (err || writable === false) {
return done();
}
return walker(cursor.goToNext(), done);
});
} else {
done();
}
};
walker(cursor.goToFirst(), () => {
flow.end();
txn.abort();
});
});
return flow;
}

reset() {
this.dbi().drop();
this.open();
}

close() {
return this.dbi.drop();
this.dbi().close();
}
}
2 changes: 1 addition & 1 deletion packages/lodex/src/formatOutput.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ function formatOutput(data, feed) {
let check = false;
keys.forEach((k, index) => {
if (values[index]) {
feed.write(!check ? ' ' : ',');
check = true;
feed.write(index === 0 ? ' ' : ',');
feed.write(json(k));
feed.write(':');
feed.write(json(values[index]));
Expand Down
47 changes: 25 additions & 22 deletions packages/storage/src/boost.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import hasher from 'node-object-hash';
import DateDiff from 'date-diff';
import debug from 'debug';
import {
decodeValue, encodeKey, encodeValue, lmdbEnv,
} from './store';


let dbi;
import Store from './store';

const hashCoerce = hasher({
sort: false,
coerce: true,
});

function createURI(data, feed) {
if (this.isLast()) {
return feed.close();
}
const uri = 'uid:'.concat(this.getIndex().toString().padStart(10, '0'));
return feed.send({
value: [data],
uri,
});
}

const computeHash = (commands, environment, chunk) => {
const commandsHash = hashCoerce.hash(commands);
const environmentHash = hashCoerce.hash(environment);
Expand Down Expand Up @@ -45,7 +51,7 @@ function hitThe(cache, ttl) {
* @param {Number} [cleanupDelay=600] Frequency (seconds) to cleanup the cache (10 min)
* @returns {Object}
*/
export default function boost(data, feed) {
export default async function boost(data, feed) {
const { ezs } = this;
if (this.isFirst()) {
const file = this.getParam('file');
Expand All @@ -55,22 +61,16 @@ export default function boost(data, feed) {
const commands = this.getParam('commands', cmds.get());
const cleanupDelay = Number(this.getParam('cleanupDelay', 10 * 60));
const environment = this.getEnv();
if (!dbi) {
dbi = lmdbEnv(this.ezs).openDbi({
name: 'cache_index',
create: true,
});
if (!this.store) {
this.store = new Store(this.ezs, 'cache_index');
}

if (!commands || commands.length === 0) {
return feed.stop(new Error('Invalid parameter for booster'));
}

const streams = ezs.compileCommands(commands, environment);
const uniqHash = String(this.getParam('key') || computeHash(commands, environment, data));
const txn = lmdbEnv().beginTxn({ readOnly: true });
const cache = decodeValue(txn.getString(dbi, uniqHash));
txn.commit();
const cache = await this.store.get(uniqHash);

if (hitThe(cache, cleanupDelay)) {
debug('ezs')('Boost using cache with hash', uniqHash);
Expand All @@ -96,20 +96,22 @@ export default function boost(data, feed) {
.pipe(ezs.catch())
.on('error', (e) => feed.write(e))
.on('data', (d) => feed.write(d))
.pipe(ezs((da, fe) => (da === null ? fe.close() : fe.send({ value: [da] }))))
.pipe(ezs('storage:identify'))
.pipe(ezs(createURI))
.pipe(ezs('storage:save', { domain: uniqHash, reset: true }))
.pipe(ezs.catch());
this.whenFinish = new Promise((cacheSaved) => {
cacheSetOutput.on('error', (error) => {
debug('ezs')('Error catched, no cache created with hash', uniqHash, error);
cacheSaved();
});
cacheSetOutput.on('end', () => {
cacheSetOutput.on('end', async () => {
debug('ezs')('Registering cache with hash', uniqHash);
const txn2 = lmdbEnv().beginTxn();
txn2.putString(dbi, encodeKey(uniqHash), encodeValue({ createdDate: Date.now() }));
txn2.commit();
try {
await this.store.put(uniqHash, { createdDate: Date.now() });
} catch (error) {
debug('ezs')('Error catched, no cache created with hash', uniqHash, error);
cacheSaved();
}
cacheSaved();
});
});
Expand All @@ -128,6 +130,7 @@ export default function boost(data, feed) {
debug('ezs')(`${this.getIndex()} chunks have been boosted`);
this.whenFinish
.then(() => {
this.store.close();
feed.close();
})
.catch((e) => feed.stop(e));
Expand Down
33 changes: 12 additions & 21 deletions packages/storage/src/flow.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import lmdb from 'node-lmdb';
import { encodeKey, decodeValue, lmdbEnv } from './store';
import Store from './store';

/**
* Take an `Object` and replace it with all the objects of the same domain contained in the store.
Expand All @@ -11,29 +10,21 @@ import { encodeKey, decodeValue, lmdbEnv } from './store';
*/
export default async function flow(data, feed) {
const length = Number(this.getParam('length', -1));
const statement = length === -1 ? 'transit' : 'truncate';
const domainName = this.getParam('domain', 'ezs');
const domain = Array.isArray(domainName) ? domainName.shift() : domainName;
if (this.isFirst()) {
this.dbi = lmdbEnv(this.ezs).openDbi({
name: domain,
});
if (!this.store) {
this.store = new Store(this.ezs, domain);
}
if (this.isLast()) {
if (this.dbi) {
this.dbi.close();
}
this.store.close();
return feed.close();
}
const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true });
const cursor = new lmdb.Cursor(txn, this.dbi);
let counter = 0;
for (let found = cursor.goToFirst();
(found !== null && (counter < length || length < 0));
found = cursor.goToNext()) {
counter += 1;
const value = txn.getString(this.dbi, encodeKey(found));
feed.write(decodeValue(value));
}
txn.commit();
return feed.end();

return this.store.stream()
.pipe(this.ezs('extract', { path: 'value' }))
.pipe(this.ezs(statement, { length }))
.on('data', (item) => feed.write(item))
.on('error', (e) => feed.stop(e))
.on('end', () => feed.end());
}
20 changes: 6 additions & 14 deletions packages/storage/src/load.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
validKey, encodeKey, decodeValue, lmdbEnv,
} from './store';
import Store, { validKey } from './store';

/**
* With a `String`, containing a URI throw all the documents that match
Expand All @@ -10,23 +8,17 @@ import {
*/
export default async function load(data, feed) {
const domain = this.getParam('domain', 'ezs');
if (this.isFirst()) {
this.dbi = lmdbEnv(this.ezs).openDbi({
name: domain,
create: true,
});
if (!this.store) {
this.store = new Store(this.ezs, domain);
}
if (this.isLast()) {
if (this.dbi) {
this.dbi.close();
}
this.store.close();
return feed.close();
}
if (!validKey(data)) {
return feed.end();
}
const txn = lmdbEnv(this.ezs).beginTxn({ readOnly: true });
const value = feed.send(decodeValue(txn.getString(this.dbi, encodeKey(data))));
txn.commit();

const value = await this.store.get(data);
return feed.send(value);
}
Loading

0 comments on commit 277de15

Please sign in to comment.