Skip to content

Commit

Permalink
feat: use memory
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Sep 4, 2022
1 parent b6d40a5 commit 8caa855
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 45 deletions.
77 changes: 35 additions & 42 deletions packages/analytics/src/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@ import get from 'lodash.get';
import set from 'lodash.set';
import debug from 'debug';
import assert from 'assert';
import crypto from 'crypto';
import { createStore, createPersistentStore } from '@ezs/store';
import hasher from 'node-object-hash';
import core from './core';

const hashCoerce = hasher({ sort: false, coerce: true });

const database = {};

async function saveIn(data, feed) {
if (this.isLast()) {
return feed.close();
}
const store = this.getEnv();
const key = get(data, 'id');
const isKey = Boolean(key);
const databaseID = this.getEnv();
const { id, value } = data;
const isKey = Boolean(id);
if (isKey) {
await store.put(key, data);
if (!database[databaseID]) {
database[databaseID] = {};
}
if (!database[databaseID][id]) {
database[databaseID][id] = value;
}
}
return feed.send(key);
return feed.send(id);
}

function sha1(input, salt) {
return crypto.createHash('sha1').update(JSON.stringify(input) + String(salt)).digest('hex');
}

/**
* Takes an `Object` and substitute a field with the corresponding value found in a external pipeline
Expand Down Expand Up @@ -59,23 +65,19 @@ function sha1(input, salt) {
* @name combine
* @param {String} [path] the path to substitute
* @param {String} [default] value if no substitution (otherwise value stay unchanged)
* @param {String} [primer=auto] Data to send to the external pipeline
* @param {String} [primer=n/a] Data to send to the external pipeline
* @param {String} [file] the external pipeline is described in a file
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [cacheName] Enable cache, with dedicated name
* @returns {Object}
*/
export default function combine(data, feed) {
const { ezs } = this;
const cacheName = this.getParam('cacheName');
const persistent = Boolean(cacheName);
let whenReady = Promise.resolve(true);
if (this.isFirst()) {
debug('ezs')('[combine] with sub pipeline.');
const location = this.getParam('location');
const primer = this.getParam('primer');
const primer = this.getParam('primer', 'n/a');
const input = ezs.createStream(ezs.objectMode());
const commands = ezs.createCommands({
file: this.getParam('file'),
Expand All @@ -85,41 +87,35 @@ export default function combine(data, feed) {
prepend: this.getParam('prepend'),
append: this.getParam('append'),
});
if (persistent) {
this.store = createPersistentStore(ezs, `combine${cacheName}`, location);
} else {
this.store = createStore(ezs, 'combine', location);
}
if (persistent && !this.store.isCreated()) {
whenReady = Promise.resolve(true);
} else {
const statements = ezs.compileCommands(commands, this.getEnv());
const output = ezs.createPipeline(input, statements)
.pipe(ezs(saveIn, null, this.store))
.pipe(ezs.catch())
.on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered
.on('error', (e) => feed.stop(e));
whenReady = new Promise((resolve) => output.on('end', resolve));
input.write(primer || this.store.id());
input.end();
}
this.databaseID = hashCoerce.hash({ primer, commands });
const statements = ezs.compileCommands(commands, this.getEnv());
const output = ezs.createPipeline(input, statements)
.pipe(ezs(saveIn, null, this.databaseID))
.pipe(ezs.catch())
.on('data', (d) => assert(d)) // WARNING: The data must be consumed, otherwise the "end" event has not been triggered
.on('error', (e) => feed.stop(e));
whenReady = new Promise((resolve) => output.on('end', resolve));
input.write(primer);
input.end();
}
if (this.isLast()) {
if (!persistent) {
this.store.close();
}
return feed.close();
}
return whenReady
.then(async () => {
.then(() => {
const defval = this.getParam('default', null);
const path = this.getParam('path');
const pathVal = get(data, path);
const keys = [].concat(pathVal).filter(Boolean);
if (keys.length === 0) {
return feed.send(data);
}
const values = await Promise.all(keys.map((key) => this.store.get(key)));
const values = keys.map((key) => {
if (!database[this.databaseID][key]) {
return null;
}
return core(key, database[this.databaseID][key]);
});
if (values.length && Array.isArray(pathVal)) {
set(data, path, values);
} else if (values.length && !Array.isArray(pathVal)) {
Expand All @@ -141,9 +137,6 @@ export default function combine(data, feed) {
return feed.send(data);
})
.catch((e) => {
if (this.store) {
this.store.close();
}
feed.stop(e);
});
}
8 changes: 5 additions & 3 deletions packages/analytics/test/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ describe('combine', () => {
output.push(chunk);
})
.on('end', () => {

assert.equal(output.length, 6);
console.log(output, output[0], output[0]);
assert.equal(output[0].b.value, 'aa');
assert.equal(output[1].b.value, 'bb');
assert.equal(output[2].b.value, 'cc');
Expand Down Expand Up @@ -340,7 +342,7 @@ describe('no combine', () => {
done(new Error('Error is the right behavior'));
});
});
test('with wrong location', (done) => {
test.skip('with wrong location', (done) => {
ezs.use(statements);
const input = [
{ a: 1, b: 'a' },
Expand Down Expand Up @@ -383,7 +385,7 @@ const env = {
};
const cacheName = Date.now();

test('combine with cache with script #1', (done) => {
test.skip('combine with cache with script #1', (done) => {
ezs.use(statements);
const input = [
{ a: 1, b: 'a' },
Expand Down Expand Up @@ -431,7 +433,7 @@ test('combine with cache with script #1', (done) => {
});
});

test('combine with cache with script #2', (done) => {
test.skip('combine with cache with script #2', (done) => {
ezs.use(statements);
const input = [
{ a: 1, b: 'a' },
Expand Down

0 comments on commit 8caa855

Please sign in to comment.