From f6baae0527a80acfd423e4efe1c2f2b79e60bb8c Mon Sep 17 00:00:00 2001 From: Niels Klomp Date: Sat, 9 Mar 2024 01:07:47 +0100 Subject: [PATCH] feat: Add support to start and resume xstate statemachines, with automatic persistence on state changes --- .../MachineStatePersistenceAgentLogic.ts | 108 ++++++++- .../src/agent/MachineStatePersistence.ts | 6 +- .../src/functions/machineRegistration.ts | 214 +++++++++++++++++- .../src/functions/stateMapper.ts | 4 + .../xstate-persistence/src/types/types.ts | 37 ++- 5 files changed, 358 insertions(+), 11 deletions(-) diff --git a/packages/xstate-persistence/src/__tests__/shared/MachineStatePersistenceAgentLogic.ts b/packages/xstate-persistence/src/__tests__/shared/MachineStatePersistenceAgentLogic.ts index 1f7259a4b..87701ee5e 100644 --- a/packages/xstate-persistence/src/__tests__/shared/MachineStatePersistenceAgentLogic.ts +++ b/packages/xstate-persistence/src/__tests__/shared/MachineStatePersistenceAgentLogic.ts @@ -10,7 +10,7 @@ import { ServiceMap, TypegenDisabled, } from 'xstate' -import { IMachineStatePersistence, MachineStatePersistArgs, machineStatePersistRegistration } from '../../index' +import { IMachineStatePersistence, interpreterStartOrResume, MachineStatePersistArgs, machineStatePersistRegistration } from '../../index' type ConfiguredAgent = TAgent @@ -60,7 +60,7 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro let context: IAgentContext beforeEach(() => { - instance = interpret(counterMachine).start() + instance = interpret(counterMachine) }) afterEach(() => { @@ -76,6 +76,7 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro afterAll(testContext.tearDown) it('should store xstate state changes', async (): Promise => { + instance.start() const machineStateInit = await agent.machineStateInit({ machineName: counterMachine.id, expiresAt: new Date(new Date().getTime() + 100000), @@ -133,6 +134,7 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro }) it('should automatically store xstate state changes', async (): Promise => { + instance.start() const init = await machineStatePersistRegistration({ context, interpreter: instance, machineName: instance.machine.id }) console.log(JSON.stringify(init, null, 2)) if (!init) { @@ -143,11 +145,11 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro const { instanceId, machineName } = init // Wait some time since events are async - await new Promise((res) => setTimeout(res, 100)) + await new Promise((res) => setTimeout(res, 50)) instance.send('increment') // Wait some time since events are async - await new Promise((res) => setTimeout(res, 100)) + await new Promise((res) => setTimeout(res, 50)) let activeStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) expect(activeStates).toHaveLength(1) expect(activeStates[0].instanceId).toEqual(instanceId) @@ -158,7 +160,7 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro instance.send('increment') // Wait some time since events are async - await new Promise((res) => setTimeout(res, 100)) + await new Promise((res) => setTimeout(res, 50)) activeStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) expect(activeStates).toHaveLength(1) expect(activeStates[0].state.context.count).toEqual(2) @@ -175,7 +177,7 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro // Let's move to the final state. There should be no more active state available afterwards instance.send('finalize') // Wait some time since events are async - await new Promise((res) => setTimeout(res, 100)) + await new Promise((res) => setTimeout(res, 50)) const finalActiveStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) expect(finalActiveStates).toHaveLength(0) @@ -193,6 +195,100 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro await expect(agent.machineStatesDeleteExpired({ deleteDoneStates: true, machineName })).resolves.toEqual(1) await expect(agent.machineStateGet({ instanceId })).rejects.toThrowError() }) + + it('should automatically start a new state machine with provided id', async (): Promise => { + await interpreterStartOrResume({ + stateType: 'new', + machineName: 'counter', + instanceId: 'autoStart', + context, + singletonCheck: true, + interpreter: instance, + }) + + instance.send('increment') + + // Wait some time since events are async + await new Promise((res) => setTimeout(res, 50)) + let activeStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) + expect(activeStates).toHaveLength(1) + expect(activeStates[0].state).toBeDefined() + await agent.machineStateDelete({ instanceId: 'autoStart' }) + }) + + it('should not automatically start a new state machine with for the same machine in case singleton check is true', async (): Promise => { + await interpreterStartOrResume({ stateType: 'new', machineName: 'counter', context, singletonCheck: true, interpreter: instance }) + // Wait some time since events are async + await new Promise((res) => setTimeout(res, 50)) + let activeStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) + expect(activeStates).toHaveLength(1) + expect(activeStates[0].state).toBeDefined() + + await expect( + interpreterStartOrResume({ stateType: 'new', machineName: 'counter', context, singletonCheck: true, interpreter: interpret(counterMachine) }) + ).rejects.toThrowError() + await agent.machineStateDelete({ instanceId: activeStates[0].instanceId }) + }) + + it('should automatically start 2 new state machines with for the same machine in case singleton check is false', async (): Promise => { + await interpreterStartOrResume({ stateType: 'new', machineName: 'counter', context, singletonCheck: false, interpreter: instance }) + // Wait some time since events are async + await new Promise((res) => setTimeout(res, 50)) + let activeStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) + expect(activeStates).toHaveLength(1) + expect(activeStates[0].state).toBeDefined() + + await expect( + interpreterStartOrResume({ stateType: 'new', machineName: 'counter', context, singletonCheck: false, interpreter: interpret(counterMachine) }) + ).resolves.toBeDefined() + await new Promise((res) => setTimeout(res, 50)) + activeStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) + + expect(activeStates).toHaveLength(2) + expect(activeStates[1].state).toBeDefined() + activeStates.forEach(async (state) => await agent.machineStateDelete({ instanceId: state.instanceId })) + }) + + it('should automatically start 1 new state machine and resume it after it was stopped', async (): Promise => { + const info = await interpreterStartOrResume({ stateType: 'new', machineName: 'counter', context, singletonCheck: true, interpreter: instance }) + // Wait some time since events are async + await new Promise((res) => setTimeout(res, 50)) + instance.send('increment') + + // Wait some time since events are async + await new Promise((res) => setTimeout(res, 50)) + let activeStates = await agent.machineStatesFindActive({ machineName: info.init.machineName }) + expect(activeStates).toHaveLength(1) + console.log(JSON.stringify(activeStates[0], null, 2)) + const originalSessionId = instance.sessionId + instance.stop() + + const resumeInterpreter = interpret(counterMachine) + const resumeInfo = await interpreterStartOrResume({ + stateType: 'existing', + instanceId: info.init.instanceId, + machineName: 'counter', + context, + singletonCheck: true, + interpreter: resumeInterpreter, + }) + expect(originalSessionId).not.toEqual(resumeInterpreter.sessionId) + expect(resumeInfo.init.instanceId).toEqual(info.init.instanceId) + await new Promise((res) => setTimeout(res, 50)) + activeStates = await agent.machineStatesFindActive({ machineName: instance.machine.id }) + + expect(activeStates).toHaveLength(1) + expect(activeStates[0].state).toBeDefined() + + resumeInterpreter.send('increment') + // Wait some time since events are async + await new Promise((res) => setTimeout(res, 50)) + activeStates = await agent.machineStatesFindActive({ machineName: info.init.machineName }) + expect(activeStates).toHaveLength(1) + console.log(JSON.stringify(activeStates[0], null, 2)) + + activeStates.forEach(async (state) => await agent.machineStateDelete({ instanceId: state.instanceId })) + }) }) } diff --git a/packages/xstate-persistence/src/agent/MachineStatePersistence.ts b/packages/xstate-persistence/src/agent/MachineStatePersistence.ts index 78760d170..d328ee2f3 100644 --- a/packages/xstate-persistence/src/agent/MachineStatePersistence.ts +++ b/packages/xstate-persistence/src/agent/MachineStatePersistence.ts @@ -99,13 +99,13 @@ export class MachineStatePersistence implements IAgentPlugin { if (customInstanceId && existingInstanceId) { return Promise.reject(new Error(`Cannot have both a custom and existing instance id at the same time`)) } else if (existingInstanceId) { - // An instanceId is provided. First lookup whether this id is persisted, if not an error is thrown + // A existing instanceId is provided. First lookup whether this id is persisted, if not an error is thrown debug(`machineStateInit is using a previously persisted instance id (${existingInstanceId})`) const state = await this.store.getMachineState({ tenantId, instanceId: existingInstanceId }) machineInit = storeInfoToMachineInit({ ...state, stateType: 'existing' }) } else if (customInstanceId) { - // An custom instanceId is provided. - debug(`machineStateInit is using a custom instance id (${existingInstanceId})`) + // A custom instanceId is provided. + debug(`machineStateInit is using a custom instance id (${customInstanceId})`) } if (!machineInit) { machineInit = { diff --git a/packages/xstate-persistence/src/functions/machineRegistration.ts b/packages/xstate-persistence/src/functions/machineRegistration.ts index 81585eb71..a5a712b8e 100644 --- a/packages/xstate-persistence/src/functions/machineRegistration.ts +++ b/packages/xstate-persistence/src/functions/machineRegistration.ts @@ -1,7 +1,17 @@ import { IAgentContext } from '@veramo/core' import { DefaultContext, EventObject, Interpreter, StateSchema, TypegenDisabled, Typestate } from 'xstate' -import { IMachineStatePersistence, InitMachineStateArgs, MachineStateInit, MachineStatePersistenceOpts, MachineStatePersistEventType } from '../types' +import { + IMachineStatePersistence, + InitMachineStateArgs, + MachineStateInfo, + MachineStateInit, + MachineStateInitType, + MachineStatePersistenceOpts, + MachineStatePersistEventType, + StartedInterpreterInfo, +} from '../types' import { emitMachineStatePersistEvent } from './stateEventEmitter' +import { machineStateToMachineInit, machineStateToStoreInfo } from './stateMapper' /** * Initialize the machine state persistence. Returns a unique instanceId and the machine name amongst others @@ -113,3 +123,205 @@ export const machineStatePersistRegistration = async < } return init } + +const assertNonExpired = (args: { expiresAt?: Date; machineName: string }) => { + const { expiresAt, machineName } = args + if (expiresAt && expiresAt.getTime() < Date.now()) { + throw new Error(`Cannot resume ${machineName}. It expired at ${expiresAt.toLocaleString()}`) + } +} + +/** + * Resumes the interpreter from a given state. + * + * @param {Object} args - The arguments for resuming the interpreter. + * @param {MachineStateInfo} args.machineState - The machine state information. + * @param {boolean} [args.noRegistration] - If true, no registration will be performed. + * @param {Interpreter} args.interpreter - The interpreter instance. + * @param {IAgentContext} args.context - The context for machine state persistence. + * + * @returns {Promise} - A promise that resolves to the resumed interpreter. + */ +export const interpreterResumeFromState = async < + TContext = DefaultContext, + TStateSchema extends StateSchema = any, + TEvent extends EventObject = EventObject, + TTypestate extends Typestate = { + value: any + context: TContext + }, + TResolvedTypesMeta = TypegenDisabled +>(args: { + machineState: MachineStateInfo + noRegistration?: boolean + interpreter: Interpreter + context: IAgentContext +}): Promise> => { + const { interpreter, machineState, context, noRegistration } = args + const { machineName, instanceId, tenantId } = machineState + assertNonExpired(machineState) + if (noRegistration !== true) { + await machineStatePersistRegistration({ + stateType: 'existing', + machineName, + tenantId, + existingInstanceId: instanceId, + context, + interpreter, + }) + } + + return { + machineState, + init: machineStateToMachineInit( + { + ...machineState, + stateType: 'existing', + }, + machineStateToStoreInfo({ ...machineState, stateType: 'existing' }) + ), + // @ts-ignore + interpreter: interpreter.start(machineState.state), + } +} + +/** + * Resumes or starts the interpreter from the initial machine state. + * + * @async + * @param {Object} args - The arguments for the function. + * @param {MachineStateInit & {stateType?: MachineStateInitType}} args.init - The initialization state of the machine. + * @param {boolean} args.noRegistration - Whether registration is required, defaults to false. + * @param {Interpreter} args.interpreter - The interpreter object. + * @param {IAgentContext} args.context - The context object. + * @returns {Promise} - A promise that resolves to the interpreter instance. + * @throws {Error} - If the machine name from init does not match the interpreter id. + */ +export const interpreterStartOrResumeFromInit = async < + TContext = DefaultContext, + TStateSchema extends StateSchema = any, + TEvent extends EventObject = EventObject, + TTypestate extends Typestate = { + value: any + context: TContext + }, + TResolvedTypesMeta = TypegenDisabled +>(args: { + init: MachineStateInit & { stateType?: MachineStateInitType } + noRegistration?: boolean + interpreter: Interpreter + context: IAgentContext +}): Promise> => { + const { init, noRegistration, interpreter, context } = args + const { stateType, instanceId, machineName, tenantId, expiresAt } = init + if (init.machineName !== interpreter.id) { + throw new Error(`Machine state init machine name ${init.machineName} does not match name from state machine interpreter ${interpreter.id}`) + } + assertNonExpired({ machineName, expiresAt }) + if (noRegistration !== true) { + await machineStatePersistRegistration({ + stateType: stateType ?? 'existing', + machineName, + tenantId, + ...(stateType === 'existing' && { existingInstanceId: instanceId }), + ...(stateType === 'new' && { customInstanceId: instanceId }), + context, + interpreter, + }) + } + let machineState: MachineStateInfo | undefined + if (stateType === 'new') { + interpreter.start() + } else { + machineState = await context.agent.machineStateGet({ tenantId, instanceId }) + // @ts-ignore + interpreter.start(machineState.state) + } + return { + interpreter, + machineState, + init, + } +} + +/** + * Starts or resumes the given state machine interpreter. + * + * @async + * @param {Object} args - The arguments for starting or resuming the interpreter. + * @param {MachineStateInitType | 'auto'} [args.stateType] - The state type. Defaults to 'auto'. + * @param {string} [args.instanceId] - The instance ID. + * @param {string} [args.machineName] - The machine name. + * @param {string} [args.tenantId] - The tenant ID. + * @param {boolean} args.singletonCheck - Whether to perform a singleton check or not. If more than one machine instance is found an error will be thrown + * @param {boolean} [args.noRegistration] - Whether to skip state change event registration or not. + * @param {Interpreter} args.interpreter - The interpreter to start or resume. + * @param {IAgentContext} args.context - The agent context. + * @returns {Promise} A promise that resolves when the interpreter is started or resumed. + * @throws {Error} If there are multiple active instances of the machine and singletonCheck is true. + * @throws {Error} If a new instance was requested with the same ID as an existing active instance. + * @throws {Error} If the existing state machine with the given machine name and instance ID cannot be found. + */ +export const interpreterStartOrResume = async < + TContext = DefaultContext, + TStateSchema extends StateSchema = any, + TEvent extends EventObject = EventObject, + TTypestate extends Typestate = { + value: any + context: TContext + }, + TResolvedTypesMeta = TypegenDisabled +>(args: { + stateType?: MachineStateInitType | 'auto' + instanceId?: string + machineName?: string + tenantId?: string + singletonCheck: boolean + noRegistration?: boolean + interpreter: Interpreter + context: IAgentContext +}): Promise> => { + const { stateType, singletonCheck, instanceId, tenantId, noRegistration, context, interpreter } = args + const machineName = args.machineName ?? interpreter.id + const activeStates = await context.agent.machineStatesFindActive({ + machineName, + tenantId, + instanceId, + }) + if (singletonCheck && activeStates.length > 0) { + if (stateType === 'new' || activeStates.every((state) => state.instanceId !== instanceId)) { + return Promise.reject(new Error(`Found ${activeStates.length} active '${machineName}' instances, but only one is allows at the same time`)) + } + } + if (stateType === 'new') { + if (instanceId && activeStates.length > 0) { + // Since an instanceId was provided it means the activeStates includes a machine with this instance. But stateType is 'new' + return Promise.reject( + new Error(`Found an active '${machineName}' instance with id ${instanceId}, but a new instance was requested with the same id`) + ) + } + const init = await context.agent.machineStateInit({ + stateType: 'new', + customInstanceId: instanceId, + machineName: machineName ?? interpreter.id, + tenantId, + }) + return await interpreterStartOrResumeFromInit({ init, noRegistration, interpreter, context }) + } + if (activeStates.length === 0) { + if (stateType === 'existing') { + return Promise.reject(new Error(`Could not find existing state machine ${machineName}, instanceId ${instanceId}`)) + } + const init = await context.agent.machineStateInit({ + stateType: 'new', + customInstanceId: instanceId, + machineName: machineName ?? interpreter.id, + tenantId, + }) + return await interpreterStartOrResumeFromInit({ init, noRegistration, interpreter, context }) + } + + // activeStates length >= 1 + const activeState = activeStates[0] + return interpreterResumeFromState({ machineState: activeState, noRegistration, interpreter, context }) +} diff --git a/packages/xstate-persistence/src/functions/stateMapper.ts b/packages/xstate-persistence/src/functions/stateMapper.ts index 4aabe1acd..fcd4b50c7 100644 --- a/packages/xstate-persistence/src/functions/stateMapper.ts +++ b/packages/xstate-persistence/src/functions/stateMapper.ts @@ -52,6 +52,10 @@ export const storeInfoToMachineInit = (machineState: StoreMachineStateInfo & { s } } +export const machineStateToMachineInit = (machineInfo: MachineStatePersistArgs, existingState: Partial): MachineStateInit => { + return storeInfoToMachineInit({ ...machineStateToStoreInfo(machineInfo, existingState), stateType: 'existing' }) +} + /** * Serializes a machine state to a string representation. * diff --git a/packages/xstate-persistence/src/types/types.ts b/packages/xstate-persistence/src/types/types.ts index 0d85bbf19..0983d627b 100644 --- a/packages/xstate-persistence/src/types/types.ts +++ b/packages/xstate-persistence/src/types/types.ts @@ -5,7 +5,18 @@ import { StoreMachineStatesFindActiveArgs, } from '@sphereon/ssi-sdk.data-store' import { IAgentContext } from '@veramo/core' -import { AnyEventObject, EventObject, HistoryValue, SCXML, StateValue } from 'xstate' +import { + AnyEventObject, + DefaultContext, + EventObject, + HistoryValue, + Interpreter, + SCXML, + StateSchema, + StateValue, + TypegenDisabled, + Typestate, +} from 'xstate' import { IMachineStatePersistence } from './IMachineStatePersistence' @@ -135,6 +146,30 @@ export type MachineStateGetArgs = Pick +/** + * Represents the information for a started interpreter. + * + * @template TContext The type of the context object. + * @template TStateSchema The type of the state schema. + * @template TEvent The type of the event object. + * @template TTypestate The type of the typestate object. + * @template TResolvedTypesMeta The type of the resolved types meta object. + */ +export type StartedInterpreterInfo< + TContext = DefaultContext, + TStateSchema extends StateSchema = any, + TEvent extends EventObject = EventObject, + TTypestate extends Typestate = { + value: any + context: TContext + }, + TResolvedTypesMeta = TypegenDisabled +> = { + interpreter: Interpreter + machineState?: MachineStateInfo + init: MachineStateInit +} + /** * Represents the serializable state of a machine. *