Skip to content

Commit

Permalink
transactions (#87)
Browse files Browse the repository at this point in the history
* transactions

* bump version 0.1.41
  • Loading branch information
geoffhendrey authored Oct 13, 2024
1 parent 6225553 commit 6bd3fea
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 17 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "stated-js",
"version": "0.1.40",
"version": "0.1.41",
"license": "Apache-2.0",
"description": "JSONata embedded in JSON",
"main": "./dist/src/index.js",
Expand Down
6 changes: 5 additions & 1 deletion src/JsonPointer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ export default class JsonPointer {
*/
static get(obj:object, pointer:JsonPointerString|JsonPointerStructureArray) {
const refTokens = Array.isArray(pointer) ? pointer : JsonPointer.parse(pointer as JsonPointerString);

//technically the json pointer for the root object is "". However I find this ridiculous and we adopt our
//more sensible convention that "/" is the root pointer. So the if block below is there to treat "" as "/"
if(refTokens[0] === "" && refTokens.length === 1){
return obj;
}
for (let i = 0; i < refTokens.length; ++i) {
const tok = refTokens[i];
if (!(typeof obj === 'object' && tok in obj)) {
Expand Down
150 changes: 135 additions & 15 deletions src/TemplateProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,43 @@ export type PlanStep = {
forkId:string,
didUpdate:boolean
}
export type Mutation = {jsonPtr:JsonPointerString, op:Op, value:any};

//A Transaction is a set of changes applied atomically.
export type Transaction ={
op: "transaction",
mutations: Mutation[]
}

/**
* a FunctionGenerator is used to generate functions that need the context of which expression they were called from
* which is made available to them in the MetaInf
*/
export type FunctionGenerator = (metaInfo: MetaInfo, templateProcessor: TemplateProcessor) => (Promise<(arg: any) => Promise<any>>);



/**
* defines the function signature for data change callbacks, called when data at the ptr changes
* A callback function that is triggered when data changes.
*
* This callback supports both the legacy `removed` boolean parameter and the new `op` parameter
* to avoid breaking existing clients while allowing for more descriptive operations.
*
* - When `removed` is provided, it indicates whether the data was removed (`true` for delete, `false` for set).
* - When `op` is provided, it specifies the operation performed on the data:
* - `"set"`: The data was set or updated.
* - `"delete"`: The data was deleted.
* - `"forceSetInternal"`: A forced internal set operation was performed.
*
* Both `removed` and `op` are optional. If both are provided, `op` takes precedence in interpreting the operation.
*
* @param data - The data that was changed and is pointed to by ptr
* @param ptr - The JSON pointer string indicating where in the root object the change occurred. In the case of callbacks
* registered on "/", ptr will be an array of JSON pointers into the `data` field represents the root object.
* @param removed - (optional) A boolean indicating whether the data was removed. `true` for delete, `false` for set.
* @param op - (optional) A string describing the operation. Can be `"set"`, `"delete"`, or `"forceSetInternal"`.
*/
export type DataChangeCallback = (data:any, ptr:JsonPointerString, removed?:boolean)=>void

export type DataChangeCallback = (data: any, ptr: JsonPointerString|JsonPointerString[], removed?: boolean, op?: Op) => void;


/**
Expand Down Expand Up @@ -276,7 +302,7 @@ export default class TemplateProcessor {
private executionPlans: { [key: JsonPointerString]: JsonPointerString[] }={};

/** A queue of execution plans awaiting processing. */
private readonly executionQueue:(Plan|SnapshotPlan)[] = [];
private readonly executionQueue:(Plan|SnapshotPlan|Transaction)[] = [];

/** function generators can be provided by a caller when functions need to be
* created in such a way that they are somehow 'responsive' or dependent on their
Expand Down Expand Up @@ -939,7 +965,7 @@ export default class TemplateProcessor {
if (jp.has(this.output, jsonPtr)) {
const current = jp.get(this.output, jsonPtr);
jp.remove(this.output, jsonPtr);
this.callDataChangeCallbacks(current, jsonPtr, true)
this.callDataChangeCallbacks(current, jsonPtr, true, "delete");
}
});
}
Expand Down Expand Up @@ -1161,11 +1187,13 @@ export default class TemplateProcessor {
private async drainExecutionQueue(removeTmpVars:boolean=true){
while (this.executionQueue.length > 0 && !this.isClosed) {
try {
const plan: Plan | SnapshotPlan = this.executionQueue[0];
const plan: Plan | SnapshotPlan | Transaction= this.executionQueue[0];
if (plan.op === "snapshot") {
(plan as SnapshotPlan).generatedSnapshot = this.executionStatus.toJsonString();;
} else {
await this.executePlan(plan);
} else if(plan.op === "transaction" ){
this.applyTransaction(plan as Transaction);
}else{
await this.executePlan(plan as Plan);
}
removeTmpVars && this.removeTemporaryVariables(this.tempVars, "/");
}finally {
Expand All @@ -1174,6 +1202,95 @@ export default class TemplateProcessor {
}
}

/**
* Applies a transaction by processing each mutation within the transaction.
*
* For each mutation, this method applies the specified operation (`set` or `delete`)
* to the `output` object based on the `jsonPtr` (JSON pointer).
* It also triggers data change callbacks after each mutation.
*
* @param transaction - The transaction object containing a list of mutations to apply.
* @throws {Error} If the operation (`op`) is neither `"set"` nor `"delete"`.
*
* The transaction is processed as follows:
* - `"set"`: Sets the value at the location specified by `jsonPtr` using `jp.set`.
* - `"delete"`: Removes the value at the location specified by `jsonPtr` using `jp.remove`.
*
* After each mutation, `callDataChangeCallbacks` is called to notify of the change.
* Finally, a batch data change callback is triggered for all affected JSON pointers.
*
* @private
*/
private async applyTransaction(transaction: Transaction) {
const ptrs: JsonPointerString[] = [];
for (const { jsonPtr, value, op } of transaction.mutations) {
ptrs.push(jsonPtr);
if (op === 'set') {
jp.set(this.output, jsonPtr, value);
} else if (op === 'delete') {
jp.remove(this.output, jsonPtr);
} else {
throw new Error(`Transaction cannot include Op type ${op}`);
}
await this.callDataChangeCallbacks(value, jsonPtr, op === 'delete', op);
}
await this.callDataChangeCallbacks(this.output, ptrs);
}

/**
* Registers a transaction callback to handle batched data changes.
*
* When setData is called, a set of changes (a DAG) is calculated and the changes are sequentially applied. These
* changes can be 'bundled' into a single Transaction for the purpose of capturing a single set of changes that
* if atomically applied, has the exact same effect as the DAG propagation. Therefore, a Transaction can be a
* less chatty way to capture and apply changes from one template instance A to template instance B without
* incurring the cost of for B to compute the change DAG.
*
* @param cb - A callback function that handles a `Transaction` object. The callback is expected
* to return a `Promise<void>`.
*
* @throws {Error} If the callback is registered for any path other than `'/'`.
*
* @public
*/
public setTransactionCallback(cb: (transaction: Transaction) => Promise<void>) {
const dataChangeCallback = async (value: any, jsonPtrs: JsonPointerString | JsonPointerString[], removed?: boolean, op?: Op) => {
if (!Array.isArray(jsonPtrs)) { // This is the case where the update is for the root document
throw new Error(`DataChangeHandler for transaction bundling was illegally registered on ${jsonPtrs} (it can only be registered on '/'`);
}
const mutations: Mutation[] = jsonPtrs.map(jsonPtr => {
const value = jp.has(this.output, jsonPtr) ? jp.get(this.output, jsonPtr) : undefined;
return {
value,
jsonPtr,
op: value === undefined ? "delete" : "set",
};
});
const transaction: Transaction = {
op: "transaction",
mutations
};
await cb(transaction);
};
this.setDataChangeCallback("/", dataChangeCallback);
}

/**
* Removes a previously registered transaction callback.
*
* This method removes the callback that was registered with `setTransactionCallback`
* for the root path `'/'`.
*
* @param cb - The callback function to remove, which should match the previously registered callback.
*
* @public
*/
public removeTransactionCallback(cb: DataChangeCallback) {
this.removeDataChangeCallback("/", cb);
}



private isEnabled(logLevel:Levels):boolean{
return LOG_LEVELS[this.logger.level] >= LOG_LEVELS[logLevel];
}
Expand Down Expand Up @@ -1299,12 +1416,14 @@ export default class TemplateProcessor {
}

if (anyUpdates || everything) {
const {op="set"} = plan;
// current callback APIs are not interested in deferred updates, so we reduce op to boolean "removed"
const removed = plan.op==="delete";
const removed = op==="delete";
//admittedly this structure of this common callback is disgusting. Essentially if you are using the
//common callback you don't want to get passed any data that changed because you are saying in essence
//"I don't care what changed".
await this.callDataChangeCallbacks(plan.output, jsonPtrArray, removed);
//ToDO - other calls to callDataChangeCallbacks are not awaiting. Reconcile this
await this.callDataChangeCallbacks(plan.output, jsonPtrArray, removed, op);
}
}

Expand Down Expand Up @@ -1465,7 +1584,7 @@ export default class TemplateProcessor {
"materialized": true
}
);
await this.callDataChangeCallbacks(data, jsonPtr);
await this.callDataChangeCallbacks(data, jsonPtr, op==="delete", op);
return true;
}

Expand Down Expand Up @@ -1552,7 +1671,7 @@ export default class TemplateProcessor {
if(op === 'delete'){
if(jp.has(output, jsonPtr)) {
jp.remove(output, jsonPtr);
this.callDataChangeCallbacks(data, jsonPtr, true);
this.callDataChangeCallbacks(data, jsonPtr, true, op);
return true;
}
return false;
Expand Down Expand Up @@ -1734,7 +1853,7 @@ export default class TemplateProcessor {
}

// TODO: change it to pass the plan
private async callDataChangeCallbacks(data: any, jsonPointer: JsonPointerString|JsonPointerString[], removed: boolean = false) {
private async callDataChangeCallbacks(data: any, jsonPointer: JsonPointerString|JsonPointerString[], removed: boolean = false, op:Op="set") {
let _jsonPointer:JsonPointerString;
if(Array.isArray(jsonPointer)){
_jsonPointer = "/"; //when an array of pointers is provided, it means it was a change callback on "/"
Expand All @@ -1750,14 +1869,15 @@ export default class TemplateProcessor {
if (callbacks) {
const promises = Array.from(callbacks).map(cbFn =>
Promise.resolve().then(() => {
cbFn(data, jsonPointer as JsonPointerString, removed)
//to do ... return here so asyn functions are actually awaited by Promise.all
cbFn(data, jsonPointer as JsonPointerString, removed, op);
}) //works with cbFn that is either sync or async by wrapping in promise
);

try {
await Promise.all(promises);
} catch (error:any) {
this.logger.error(`Error in dataChangeCallback at ${jsonPointer}: ${error.message}`);
this.logger.error(`Error in dataChangeCallback at ${JSON.stringify(jsonPointer)}: ${error.message}`);
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/test/TemplateProcessor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3219,4 +3219,34 @@ test("test lifecycle manager", async () => {
} finally {
await tp.close();
}
});

test("test transaction", async () => {
const o = {
"a": 1,
"b": "replace me",
"c": "remove me",
"d": "${41+1}"
};
const transaction = {
op: "transaction",
mutations:[
{op: "set", jsonPtr: "/b", value: 42},
{op: "delete", jsonPtr: "/c", value: undefined},
{op: "set", jsonPtr: "/d", value: 42}
]
}
const tp = new TemplateProcessor(o);
try {
await tp.initialize();
let receivedTransaction;
tp.setTransactionCallback((transaction)=>{
receivedTransaction = transaction;
});
await tp.applyTransaction(transaction);
await expect(tp.output).toStrictEqual({a:1, b:42, d:42})
expect(receivedTransaction).toStrictEqual(transaction);
} finally {
await tp.close();
}
});

0 comments on commit 6bd3fea

Please sign in to comment.