From c6fd334387753a3e6636f3ae37eb78e1bb6bde36 Mon Sep 17 00:00:00 2001 From: Nicolas Thouvenin Date: Sun, 13 Dec 2020 23:12:33 +0100 Subject: [PATCH] feat: [exchange] can use one pipeline per chunk (or not) --- packages/analytics/src/expand.js | 71 +++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 20 deletions(-) diff --git a/packages/analytics/src/expand.js b/packages/analytics/src/expand.js index 4cc1a284..bbd47149 100644 --- a/packages/analytics/src/expand.js +++ b/packages/analytics/src/expand.js @@ -64,17 +64,37 @@ async function mergeWith(data, feed) { * @param {String} [script] the external pipeline is described in a string of characters * @param {String} [commands] the external pipeline is described in a object * @param {String} [command] the external pipeline is described in a URL-like command + * @param {String} [persistent=false] The internal database will be reused until it is deleted + * @param {String} [cache] Use a specific ezs statement to run commands (advanced) * @returns {Object} */ export default async function expand(data, feed) { - const { ezs } = this; + if (this.isLast()) { + if (this.input) { + this.whenFinish.finally(() => feed.close()); + return this.input.end(); + } + return feed.close(); + } + const path = this.getParam('path'); - if (this.isFirst()) { - debug('ezs')('[expand] with sub pipeline.'); + const id = this.getIndex().toString().padStart(20, '0'); + const value = get(data, path); + const { ezs } = this; + const persistent = this.getParam('persistent', false); + + if (value === undefined) { + return feed.send(data); + } + + if (!this.store) { const location = this.getParam('location'); this.store = createStore(ezs, 'expand', location); this.store.reset(); - const commands = ezs.createCommands({ + } + + if (!this.commands) { + this.commands = ezs.createCommands({ file: this.getParam('file'), script: this.getParam('script'), command: this.getParam('command'), @@ -82,26 +102,37 @@ export default async function expand(data, feed) { prepend: this.getParam('prepend'), append: this.getParam('append'), }); - const statements = ezs.compileCommands(commands, this.getEnv()); - this.input = ezs.createStream(ezs.objectMode()); - const output = ezs.createPipeline(this.input, statements) - .pipe(ezs(mergeWith, { path }, this.store)) - .pipe(ezs.catch()); - this.whenFinish = feed.flow(output); - } - if (this.isLast()) { - this.whenFinish.finally(() => feed.close()); - return this.input.end(); } try { - const id = this.getIndex().toString().padStart(20, '0'); await this.store.put(id, data); - const value = get(data, path); - if (value !== undefined) { - return ezs.writeTo(this.input, core(id, value), () => feed.end()); - } - return feed.send(data); } catch (e) { return feed.stop(e); } + + if (persistent) { + if (!this.input) { + debug('ezs')('[expand] with sub pipeline.'); + this.input = ezs.createStream(ezs.objectMode()); + const statements = ezs.compileCommands(this.commands, this.getEnv()); + const output = ezs.createPipeline(this.input, statements) + .pipe(ezs(mergeWith, { path }, this.store)) + .pipe(ezs.catch()); + this.whenFinish = feed.flow(output); + } + return ezs.writeTo(this.input, core(id, value), () => feed.end()); + } + const cache = this.getParam('cache'); + let statements; + const { commands } = this; + if (cache) { + statements = [ezs(cache, { commands }, this.getEnv())]; + } else { + statements = ezs.compileCommands(commands, this.getEnv()); + } + const stream = ezs.createStream(ezs.objectMode()); + const output = ezs.createPipeline(stream, statements) + .pipe(ezs(mergeWith, { path }, this.store)) + .pipe(ezs.catch()); + ezs.writeTo(stream, core(id, value), () => stream.end()); + feed.flow(output); }