Skip to content

Commit

Permalink
feat: 🎸 improve : mode / cache / cli
Browse files Browse the repository at this point in the history
mode is just a statement like
delegate,parallel,singleton,dispatch,boost,booster ; memoize can be
disable with EZS_CACHE=false ; cli is compatible with append/prepend
meta
  • Loading branch information
touv committed May 7, 2020
1 parent 44cccf0 commit 8ee1993
Show file tree
Hide file tree
Showing 21 changed files with 193 additions and 133 deletions.
21 changes: 13 additions & 8 deletions packages/analytics/src/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import core from './core';
*
* ```json
* [
* { id: 'x', value: 3 },
* { id: 't', value: 1 },
* { id: 'x', value: [ 2, 3, 5] },
* { id: 't', value: [ 2 ] },
* ]
* ```
*
* @name aggregate
* @param {String} [path=id] path to use for id
* @param {String} [value=value] path to use for value (if not found 1 is the default value)
* @returns {Object}
*/
export default function aggregate(data, feed) {
Expand All @@ -47,14 +48,18 @@ export default function aggregate(data, feed) {
}
feed.close();
} else {
const path = this.getParam('path', 'id');
const path = this.getParam('id', 'id');
const paths = Array.isArray(path) ? path : [path];
paths.forEach((p) => {
const id = JSON.stringify(get(data, p));
if (this.store[id]) {
this.store[id] += 1;
} else {
this.store[id] = 1;
const key = get(data, p);
if (key) {
const id = JSON.stringify(key);
const value = get(data, this.getParam('value', 'value'), 1);
if (this.store[id]) {
this.store[id].push(value);
} else {
this.store[id] = [value];
}
}
});
feed.end();
Expand Down
51 changes: 49 additions & 2 deletions packages/analytics/test/tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,8 @@ describe('test', () => {
{ a: 'x', b: 'z' },
{ a: 'x', b: 'z' },
])
.pipe(ezs('aggregate', { path: 'a' }))
.pipe(ezs('aggregate', { id: 'a' }))
.pipe(ezs('summing'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
Expand All @@ -1131,7 +1132,8 @@ describe('test', () => {
{ a: ['x', 'x'], b: 'z' },
{ a: ['x', 'x'], b: 'z' },
])
.pipe(ezs('aggregate', { path: 'a' }))
.pipe(ezs('aggregate', { id: 'a' }))
.pipe(ezs('summing'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
Expand All @@ -1143,5 +1145,50 @@ describe('test', () => {
done();
});
});
it('aggregate #3', (done) => {
const res = [];
from([
{ a: 'x', b: 3 },
{ a: 't', b: 2 },
{ a: 't', b: 3 },
{ a: 'x', b: 1 },
{ a: 'x', b: 4 },
])
.pipe(ezs('aggregate', { id: ['a', 'x'], value: 'b' }))
.pipe(ezs('summing'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
})
.on('end', () => {
assert.equal(res.length, 2);
assert.equal(res[0].value, 8);
assert.equal(res[1].value, 5);
done();
});
});

it('aggregate #4', (done) => {
const res = [];
from([
{ a: 'x', b: 3 },
{ a: 't', b: 2 },
{ a: 't', b: 3 },
{ a: 'x', b: 1 },
{ a: 'x', b: 4 },
])
.pipe(ezs('aggregate', { id: 'unknow' }))
.pipe(ezs('summing'))
.on('data', (chunk) => {
assert(typeof chunk === 'object');
res.push(chunk);
})
.on('end', () => {
assert.equal(res.length, 0);
done();
});
});



});
15 changes: 9 additions & 6 deletions packages/booster/src/booster.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const computeHash = (commands, environment, chunk) => {
/**
* Takes an `Object` delegate processing to an external pipeline and cache the result
*
* @param {String} [file] the external pipeline is descrbied in a file
* @param {String} [script] the external pipeline is descrbied in a sting of characters
* @param {String} [commands] the external pipeline is descrbied in object
* @param {String} [file] the external pipeline is described in a file
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [key] the cache key form the stream, in not provided, it's computed with the first chunk
* @param {Number} [hitsByCheck=1000] Number of hits to verify the cache
* @param {Number} [maxFiles=100] Number of hits to verify the cache
Expand All @@ -38,16 +39,18 @@ export default function booster(data, feed) {
const file = this.getParam('file');
const fileContent = ezs.loadScript(file);
const script = this.getParam('script', fileContent);
const cmds = ezs.compileScript(script);
const commands = this.getParam('commands', cmds.get());
const cmd1 = ezs.compileScript(script).get();
const command = this.getParam('command');
const cmd2 = [].concat(command).map(ezs.parseCommand).filter(Boolean);
const commands = this.getParam('commands', cmd1.concat(cmd2));
const environment = this.getEnv();
const key = this.getParam('key');
const hitsByCheck = Number(this.getParam('hitsByCheck', 1000));
const maxTotalSize = Number(this.getParam('maxTotalSize'));
const maxFiles = Number(this.getParam('maxFiles', 100));
const cleanupDelay = Number(this.getParam('cleanupDelay', 10 * 60 * 1000));
const rootPath = this.getParam('rootPath');
const directoryPath = this.getParam('directoryPath');
const environment = this.getEnv();

if (!cacheHandle) {
cacheHandle = new Cache({
Expand Down
28 changes: 19 additions & 9 deletions packages/core/src/cli.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import _ from 'lodash';
import { realpathSync, createReadStream } from 'fs';
import { PassThrough } from 'stream';
import yargs from 'yargs';
import debug from 'debug';
import ezs from '.';
import Commands from './commands';
import File from './file';
import { version } from '../package.json';
import settings from './settings';
Expand Down Expand Up @@ -59,7 +59,7 @@ export default function cli(errlog) {
errlog(`Error: ${argv.daemon} doesn't exists.`);
process.exit(1);
}
debug('ezs')(`Serving ${serverPath} with ${settings.nShards} shards`);
debug('ezs')(`Serving ${serverPath} with ${settings.concurrency} shards`);
return ezs.createCluster(settings.port, serverPath);
}
if (argv._.length === 0) {
Expand Down Expand Up @@ -87,25 +87,35 @@ export default function cli(errlog) {
input = process.stdin;
input.resume();
}
const onlyOne = (item) => (Array.isArray(item) ? item.shift() : item);
const params = Array.isArray(argv.param) ? argv.param : [argv.param];
const environement = params
.filter(Boolean)
.map((p) => p.split('='))
.reduce((obj, item) => ({ ...obj, [item[0]]: item[1] }), {});

const runScriptLocal = (strm, cmds) => strm
.pipe(ezs('delegate', { commands: cmds.get() }, environement));
const output = argv._
const scripts = argv._
.map((arg) => {
const script = File(ezs, arg);
if (!script) {
errlog(`Error: ${arg} isn't a file.`);
process.exit(1);
}
return script;
})
.map((script) => new Commands(ezs.parseString(script)))
.reduce(runScriptLocal, input)
});
const meta = scripts.map((script) => ezs.metaString(script)).reduce((prev, cur) => _.merge(cur, prev), {});
const { prepend, append } = meta;
const prepend2Pipeline = ezs.parseCommand(onlyOne(prepend));
const append2Pipeline = ezs.parseCommand(onlyOne(append));
const { server, delegate } = settings;
const execMode = server ? 'dispatch' : delegate;
const statements = scripts.map((script) => ezs(execMode, { script, server }, environement));
if (prepend2Pipeline) {
statements.unshift(ezs.createCommand(prepend2Pipeline, environement));
}
if (append2Pipeline) {
statements.push(ezs.createCommand(append2Pipeline, environement));
}
const output = ezs.createPipeline(input, statements)
.pipe(ezs.catch((e) => e))
.on('error', (e) => {
errlog(e.message.split('\n').shift());
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export const inspectServers = (servers, commands, environment, ns) => ensureArra
.filter(Boolean)
.filter((elem, pos, arr) => arr.indexOf(elem) === pos)
.map(parseAddress(commands, environment))
.map((s) => Array(ns || settings.nShards).fill(s)) // multiple each line
.map((s) => Array(ns || settings.concurrency).fill(s)) // multiple each line
.reduce((a, b) => a.concat(b), []); // flatten all

export const connectServer = (ezs) => (serverOptions, index) => {
Expand Down
5 changes: 2 additions & 3 deletions packages/core/src/commands.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import assert from 'assert';
import { M_DISPATCH } from './constants';

export default class Commands {
constructor(commands) {
Expand All @@ -23,9 +22,9 @@ export default class Commands {
this.commands
.filter((c) => c.name !== 'use')
.forEach((c) => {
const currentMode = c.mode === M_DISPATCH ? 'dispatch' : 'pipeline';
const currentMode = c.mode;
if (currentMode !== previousMode) {
if (currentMode === 'dispatch' || first) {
if (first) {
newCmds.push({
func: currentMode,
cmds: [...this.useCommands, c],
Expand Down
4 changes: 0 additions & 4 deletions packages/core/src/constants.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import filenameRegex from 'filename-regex';
import { version } from '../package.json';

export const M_NORMAL = 'normal';
export const M_SINGLE = 'unique';
export const M_DISPATCH = 'detachable';
export const M_ALL = [M_DISPATCH, M_NORMAL, M_SINGLE, 'single'];
export const VERSION = version;
export const STARTED_AT = Date.now();
export const RX_IDENTIFIER = new RegExp(/^[a-z]+:\//);
Expand Down
18 changes: 5 additions & 13 deletions packages/core/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@ import Meta from './meta';
import Server from './server';
import settings from './settings';
import { compressStream, uncompressStream } from './compactor';
import {
M_SINGLE, M_DISPATCH, M_NORMAL,
} from './constants';


const ezs = (name, options, environment) => new Engine(ezs, Statement.get(ezs, name, options), options, environment);
const ezsPath = [resolve(__dirname, '../..'), process.cwd(), globalModules];
const ezsCache = new LRU(settings.cache);

ezs.memoize = (key, func) => {
if (!key) {
if (!key || !settings.cacheEnable) {
return func();
}
const cached = ezsCache.get(key);
Expand Down Expand Up @@ -63,28 +59,24 @@ ezs.loadScript = (file) => ezs.memoize(`ezs.loadScript>${file}`, () => File(ezs,
ezs.compileScript = (script) => new Commands(ezs.parseString(script));
ezs.parseCommand = (command) => ezs.memoize(`ezs.parseCommand>${command}`, () => parseCommand(command));
ezs.createCommand = (command, environment) => {
const mode = command.mode || M_NORMAL;
if (!command.name) {
throw new Error(`Bad command : ${command.name}`);
}
if (command.use) {
Statement.load(ezs, command.use);
}
if (mode === M_NORMAL || mode === M_DISPATCH) {
return ezs(command.name, command.args, environment);
}
if (mode === M_SINGLE || mode === 'single' /* Backward compatibility */) {
return ezs('singleton', { ...command.args, statement: command.name }, environment);
if (Statement.exists(ezs, command.mode)) {
return ezs(command.mode, { commands: [{ name: command.name, args: command.args }] }, environment);
}
throw new Error(`Bad mode: ${mode}`);
return ezs(command.name, command.args, environment);
};
ezs.compileCommands = (commands, environment) => {
if (!Array.isArray(commands)) {
throw new Error('Pipeline works with an array of commands.');
}
const cmds = [...commands];
cmds.push({
mode: M_NORMAL,
mode: null,
name: 'transit',
args: { },
});
Expand Down
20 changes: 11 additions & 9 deletions packages/core/src/script.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { parse } from 'querystring';
import { parse, unescape } from 'querystring';
import autocast from 'autocast';
import Expression from './expression';
import { M_NORMAL, M_ALL } from './constants';

const regex = {
section: /^\s*\[\s*([^\]]*)\s*\]\s*$/,
param: /^\s*([\w.\-_]+)\s*[=: ]\s*(.*?)\s*$/,
comment: /^\s*[;#].*$/,
};

const decodeURIComponent = (s) => autocast(unescape(s));
const parseOpts = (obj) => {
const res = {};
Object.keys(obj).forEach((key) => {
Expand All @@ -18,22 +18,24 @@ const parseOpts = (obj) => {
};

export const parseCommand = (cmdline) => {
if (!cmdline) {
return cmdline;
if (!cmdline || typeof cmdline !== 'string') {
return false;
}
const matches1 = cmdline.match(/([:\w]+)\?(.*)/);
let args = {};
let mode = M_NORMAL;
let mode = null;
let name = 'debug';
let use = '';
const test = '';
if (Array.isArray(matches1)) {
let qstr;
[, name, qstr] = matches1;
args = { ...parse(qstr) };
mode = M_ALL.reduce((prev, cur) => ((args[cur] !== undefined) ? cur : prev), M_NORMAL);
args = {
...parse(qstr, null, null, { decodeURIComponent }),
};
mode = qstr;
} else {
mode = M_NORMAL;
mode = null;
name = cmdline;
}
if (name.indexOf(':') !== -1) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function createServer(ezs, serverPort, serverPath) {
function createCluster(ezs, serverPort, serverPath) {
let term = false;
if (cluster.isMaster) {
for (let i = 0; i < settings.nShards; i += 1) {
for (let i = 0; i < settings.concurrency; i += 1) {
cluster.fork();
}
cluster.on('exit', () => {
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/server/serverInformation.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {

const getInformations = (dirPath) => new Promise((resolve) => {
const infos = {
concurrency: settings.queue.concurrency,
nShards: settings.nShards,
concurrency: settings.concurrency,
encoding: settings.encoding,
uptime: Date.now() - STARTED_AT,
timestamp: Date.now(),
Expand Down
Loading

0 comments on commit 8ee1993

Please sign in to comment.