From 6bd3feaf7ebfe6801aa00252351c4e44c0541e32 Mon Sep 17 00:00:00 2001 From: Geoffrey Hendrey Date: Sun, 13 Oct 2024 07:51:07 -0700 Subject: [PATCH] transactions (#87) * transactions * bump version 0.1.41 --- package.json | 2 +- src/JsonPointer.ts | 6 +- src/TemplateProcessor.ts | 150 ++++++++++++++++++++++++++--- src/test/TemplateProcessor.test.js | 30 ++++++ 4 files changed, 171 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 9fbf43f3..fea60159 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "stated-js", - "version": "0.1.40", + "version": "0.1.41", "license": "Apache-2.0", "description": "JSONata embedded in JSON", "main": "./dist/src/index.js", diff --git a/src/JsonPointer.ts b/src/JsonPointer.ts index 3c41eedb..a22d389a 100644 --- a/src/JsonPointer.ts +++ b/src/JsonPointer.ts @@ -71,7 +71,11 @@ export default class JsonPointer { */ static get(obj:object, pointer:JsonPointerString|JsonPointerStructureArray) { const refTokens = Array.isArray(pointer) ? pointer : JsonPointer.parse(pointer as JsonPointerString); - + //technically the json pointer for the root object is "". However I find this ridiculous and we adopt our + //more sensible convention that "/" is the root pointer. So the if block below is there to treat "" as "/" + if(refTokens[0] === "" && refTokens.length === 1){ + return obj; + } for (let i = 0; i < refTokens.length; ++i) { const tok = refTokens[i]; if (!(typeof obj === 'object' && tok in obj)) { diff --git a/src/TemplateProcessor.ts b/src/TemplateProcessor.ts index 8ec1181a..8b1341d8 100644 --- a/src/TemplateProcessor.ts +++ b/src/TemplateProcessor.ts @@ -72,17 +72,43 @@ export type PlanStep = { forkId:string, didUpdate:boolean } +export type Mutation = {jsonPtr:JsonPointerString, op:Op, value:any}; + +//A Transaction is a set of changes applied atomically. +export type Transaction ={ + op: "transaction", + mutations: Mutation[] +} + /** * a FunctionGenerator is used to generate functions that need the context of which expression they were called from * which is made available to them in the MetaInf */ export type FunctionGenerator = (metaInfo: MetaInfo, templateProcessor: TemplateProcessor) => (Promise<(arg: any) => Promise>); + + /** - * defines the function signature for data change callbacks, called when data at the ptr changes + * A callback function that is triggered when data changes. + * + * This callback supports both the legacy `removed` boolean parameter and the new `op` parameter + * to avoid breaking existing clients while allowing for more descriptive operations. + * + * - When `removed` is provided, it indicates whether the data was removed (`true` for delete, `false` for set). + * - When `op` is provided, it specifies the operation performed on the data: + * - `"set"`: The data was set or updated. + * - `"delete"`: The data was deleted. + * - `"forceSetInternal"`: A forced internal set operation was performed. + * + * Both `removed` and `op` are optional. If both are provided, `op` takes precedence in interpreting the operation. + * + * @param data - The data that was changed and is pointed to by ptr + * @param ptr - The JSON pointer string indicating where in the root object the change occurred. In the case of callbacks + * registered on "/", ptr will be an array of JSON pointers into the `data` field represents the root object. + * @param removed - (optional) A boolean indicating whether the data was removed. `true` for delete, `false` for set. + * @param op - (optional) A string describing the operation. Can be `"set"`, `"delete"`, or `"forceSetInternal"`. */ -export type DataChangeCallback = (data:any, ptr:JsonPointerString, removed?:boolean)=>void - +export type DataChangeCallback = (data: any, ptr: JsonPointerString|JsonPointerString[], removed?: boolean, op?: Op) => void; /** @@ -276,7 +302,7 @@ export default class TemplateProcessor { private executionPlans: { [key: JsonPointerString]: JsonPointerString[] }={}; /** A queue of execution plans awaiting processing. */ - private readonly executionQueue:(Plan|SnapshotPlan)[] = []; + private readonly executionQueue:(Plan|SnapshotPlan|Transaction)[] = []; /** function generators can be provided by a caller when functions need to be * created in such a way that they are somehow 'responsive' or dependent on their @@ -939,7 +965,7 @@ export default class TemplateProcessor { if (jp.has(this.output, jsonPtr)) { const current = jp.get(this.output, jsonPtr); jp.remove(this.output, jsonPtr); - this.callDataChangeCallbacks(current, jsonPtr, true) + this.callDataChangeCallbacks(current, jsonPtr, true, "delete"); } }); } @@ -1161,11 +1187,13 @@ export default class TemplateProcessor { private async drainExecutionQueue(removeTmpVars:boolean=true){ while (this.executionQueue.length > 0 && !this.isClosed) { try { - const plan: Plan | SnapshotPlan = this.executionQueue[0]; + const plan: Plan | SnapshotPlan | Transaction= this.executionQueue[0]; if (plan.op === "snapshot") { (plan as SnapshotPlan).generatedSnapshot = this.executionStatus.toJsonString();; - } else { - await this.executePlan(plan); + } else if(plan.op === "transaction" ){ + this.applyTransaction(plan as Transaction); + }else{ + await this.executePlan(plan as Plan); } removeTmpVars && this.removeTemporaryVariables(this.tempVars, "/"); }finally { @@ -1174,6 +1202,95 @@ export default class TemplateProcessor { } } + /** + * Applies a transaction by processing each mutation within the transaction. + * + * For each mutation, this method applies the specified operation (`set` or `delete`) + * to the `output` object based on the `jsonPtr` (JSON pointer). + * It also triggers data change callbacks after each mutation. + * + * @param transaction - The transaction object containing a list of mutations to apply. + * @throws {Error} If the operation (`op`) is neither `"set"` nor `"delete"`. + * + * The transaction is processed as follows: + * - `"set"`: Sets the value at the location specified by `jsonPtr` using `jp.set`. + * - `"delete"`: Removes the value at the location specified by `jsonPtr` using `jp.remove`. + * + * After each mutation, `callDataChangeCallbacks` is called to notify of the change. + * Finally, a batch data change callback is triggered for all affected JSON pointers. + * + * @private + */ + private async applyTransaction(transaction: Transaction) { + const ptrs: JsonPointerString[] = []; + for (const { jsonPtr, value, op } of transaction.mutations) { + ptrs.push(jsonPtr); + if (op === 'set') { + jp.set(this.output, jsonPtr, value); + } else if (op === 'delete') { + jp.remove(this.output, jsonPtr); + } else { + throw new Error(`Transaction cannot include Op type ${op}`); + } + await this.callDataChangeCallbacks(value, jsonPtr, op === 'delete', op); + } + await this.callDataChangeCallbacks(this.output, ptrs); + } + + /** + * Registers a transaction callback to handle batched data changes. + * + * When setData is called, a set of changes (a DAG) is calculated and the changes are sequentially applied. These + * changes can be 'bundled' into a single Transaction for the purpose of capturing a single set of changes that + * if atomically applied, has the exact same effect as the DAG propagation. Therefore, a Transaction can be a + * less chatty way to capture and apply changes from one template instance A to template instance B without + * incurring the cost of for B to compute the change DAG. + * + * @param cb - A callback function that handles a `Transaction` object. The callback is expected + * to return a `Promise`. + * + * @throws {Error} If the callback is registered for any path other than `'/'`. + * + * @public + */ + public setTransactionCallback(cb: (transaction: Transaction) => Promise) { + const dataChangeCallback = async (value: any, jsonPtrs: JsonPointerString | JsonPointerString[], removed?: boolean, op?: Op) => { + if (!Array.isArray(jsonPtrs)) { // This is the case where the update is for the root document + throw new Error(`DataChangeHandler for transaction bundling was illegally registered on ${jsonPtrs} (it can only be registered on '/'`); + } + const mutations: Mutation[] = jsonPtrs.map(jsonPtr => { + const value = jp.has(this.output, jsonPtr) ? jp.get(this.output, jsonPtr) : undefined; + return { + value, + jsonPtr, + op: value === undefined ? "delete" : "set", + }; + }); + const transaction: Transaction = { + op: "transaction", + mutations + }; + await cb(transaction); + }; + this.setDataChangeCallback("/", dataChangeCallback); + } + + /** + * Removes a previously registered transaction callback. + * + * This method removes the callback that was registered with `setTransactionCallback` + * for the root path `'/'`. + * + * @param cb - The callback function to remove, which should match the previously registered callback. + * + * @public + */ + public removeTransactionCallback(cb: DataChangeCallback) { + this.removeDataChangeCallback("/", cb); + } + + + private isEnabled(logLevel:Levels):boolean{ return LOG_LEVELS[this.logger.level] >= LOG_LEVELS[logLevel]; } @@ -1299,12 +1416,14 @@ export default class TemplateProcessor { } if (anyUpdates || everything) { + const {op="set"} = plan; // current callback APIs are not interested in deferred updates, so we reduce op to boolean "removed" - const removed = plan.op==="delete"; + const removed = op==="delete"; //admittedly this structure of this common callback is disgusting. Essentially if you are using the //common callback you don't want to get passed any data that changed because you are saying in essence //"I don't care what changed". - await this.callDataChangeCallbacks(plan.output, jsonPtrArray, removed); + //ToDO - other calls to callDataChangeCallbacks are not awaiting. Reconcile this + await this.callDataChangeCallbacks(plan.output, jsonPtrArray, removed, op); } } @@ -1465,7 +1584,7 @@ export default class TemplateProcessor { "materialized": true } ); - await this.callDataChangeCallbacks(data, jsonPtr); + await this.callDataChangeCallbacks(data, jsonPtr, op==="delete", op); return true; } @@ -1552,7 +1671,7 @@ export default class TemplateProcessor { if(op === 'delete'){ if(jp.has(output, jsonPtr)) { jp.remove(output, jsonPtr); - this.callDataChangeCallbacks(data, jsonPtr, true); + this.callDataChangeCallbacks(data, jsonPtr, true, op); return true; } return false; @@ -1734,7 +1853,7 @@ export default class TemplateProcessor { } // TODO: change it to pass the plan - private async callDataChangeCallbacks(data: any, jsonPointer: JsonPointerString|JsonPointerString[], removed: boolean = false) { + private async callDataChangeCallbacks(data: any, jsonPointer: JsonPointerString|JsonPointerString[], removed: boolean = false, op:Op="set") { let _jsonPointer:JsonPointerString; if(Array.isArray(jsonPointer)){ _jsonPointer = "/"; //when an array of pointers is provided, it means it was a change callback on "/" @@ -1750,14 +1869,15 @@ export default class TemplateProcessor { if (callbacks) { const promises = Array.from(callbacks).map(cbFn => Promise.resolve().then(() => { - cbFn(data, jsonPointer as JsonPointerString, removed) + //to do ... return here so asyn functions are actually awaited by Promise.all + cbFn(data, jsonPointer as JsonPointerString, removed, op); }) //works with cbFn that is either sync or async by wrapping in promise ); try { await Promise.all(promises); } catch (error:any) { - this.logger.error(`Error in dataChangeCallback at ${jsonPointer}: ${error.message}`); + this.logger.error(`Error in dataChangeCallback at ${JSON.stringify(jsonPointer)}: ${error.message}`); } } } diff --git a/src/test/TemplateProcessor.test.js b/src/test/TemplateProcessor.test.js index bff75f18..27e08d53 100644 --- a/src/test/TemplateProcessor.test.js +++ b/src/test/TemplateProcessor.test.js @@ -3219,4 +3219,34 @@ test("test lifecycle manager", async () => { } finally { await tp.close(); } +}); + +test("test transaction", async () => { + const o = { + "a": 1, + "b": "replace me", + "c": "remove me", + "d": "${41+1}" + }; + const transaction = { + op: "transaction", + mutations:[ + {op: "set", jsonPtr: "/b", value: 42}, + {op: "delete", jsonPtr: "/c", value: undefined}, + {op: "set", jsonPtr: "/d", value: 42} + ] + } + const tp = new TemplateProcessor(o); + try { + await tp.initialize(); + let receivedTransaction; + tp.setTransactionCallback((transaction)=>{ + receivedTransaction = transaction; + }); + await tp.applyTransaction(transaction); + await expect(tp.output).toStrictEqual({a:1, b:42, d:42}) + expect(receivedTransaction).toStrictEqual(transaction); + } finally { + await tp.close(); + } }); \ No newline at end of file