Skip to content

Commit

Permalink
feat: [exchange] can use one pipeline per chunk (or not)
Browse files Browse the repository at this point in the history
  • Loading branch information
touv committed Dec 13, 2020
1 parent 675028c commit c6fd334
Showing 1 changed file with 51 additions and 20 deletions.
71 changes: 51 additions & 20 deletions packages/analytics/src/expand.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,44 +64,75 @@ async function mergeWith(data, feed) {
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @param {String} [persistent=false] The internal database will be reused until it is deleted
* @param {String} [cache] Use a specific ezs statement to run commands (advanced)
* @returns {Object}
*/
export default async function expand(data, feed) {
const { ezs } = this;
if (this.isLast()) {
if (this.input) {
this.whenFinish.finally(() => feed.close());
return this.input.end();
}
return feed.close();
}

const path = this.getParam('path');
if (this.isFirst()) {
debug('ezs')('[expand] with sub pipeline.');
const id = this.getIndex().toString().padStart(20, '0');
const value = get(data, path);
const { ezs } = this;
const persistent = this.getParam('persistent', false);

if (value === undefined) {
return feed.send(data);
}

if (!this.store) {
const location = this.getParam('location');
this.store = createStore(ezs, 'expand', location);
this.store.reset();
const commands = ezs.createCommands({
}

if (!this.commands) {
this.commands = ezs.createCommands({
file: this.getParam('file'),
script: this.getParam('script'),
command: this.getParam('command'),
commands: this.getParam('commands'),
prepend: this.getParam('prepend'),
append: this.getParam('append'),
});
const statements = ezs.compileCommands(commands, this.getEnv());
this.input = ezs.createStream(ezs.objectMode());
const output = ezs.createPipeline(this.input, statements)
.pipe(ezs(mergeWith, { path }, this.store))
.pipe(ezs.catch());
this.whenFinish = feed.flow(output);
}
if (this.isLast()) {
this.whenFinish.finally(() => feed.close());
return this.input.end();
}
try {
const id = this.getIndex().toString().padStart(20, '0');
await this.store.put(id, data);
const value = get(data, path);
if (value !== undefined) {
return ezs.writeTo(this.input, core(id, value), () => feed.end());
}
return feed.send(data);
} catch (e) {
return feed.stop(e);
}

if (persistent) {
if (!this.input) {
debug('ezs')('[expand] with sub pipeline.');
this.input = ezs.createStream(ezs.objectMode());
const statements = ezs.compileCommands(this.commands, this.getEnv());
const output = ezs.createPipeline(this.input, statements)
.pipe(ezs(mergeWith, { path }, this.store))
.pipe(ezs.catch());
this.whenFinish = feed.flow(output);
}
return ezs.writeTo(this.input, core(id, value), () => feed.end());
}
const cache = this.getParam('cache');
let statements;
const { commands } = this;
if (cache) {
statements = [ezs(cache, { commands }, this.getEnv())];
} else {
statements = ezs.compileCommands(commands, this.getEnv());
}
const stream = ezs.createStream(ezs.objectMode());
const output = ezs.createPipeline(stream, statements)
.pipe(ezs(mergeWith, { path }, this.store))
.pipe(ezs.catch());
ezs.writeTo(stream, core(id, value), () => stream.end());
feed.flow(output);
}

0 comments on commit c6fd334

Please sign in to comment.