From 665dbeae7dfb63b8662bdf76ec376b2b03f4ed42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sat, 2 Sep 2023 15:11:47 +0200 Subject: [PATCH] add loop node --- .../__snapshots__/runFlow.test.ts.snap | 60 ++++++++ .../src/engines/dataflow/constants.ts | 21 +-- .../src/engines/dataflow/runFlow.test.ts | 50 +++++++ .../ps-chains/src/engines/dataflow/runFlow.ts | 23 ++- .../@pufflig/ps-chains/src/mocks/chains.ts | 136 +++++++++++++++++- .../@pufflig/ps-chains/src/mocks/nodes.ts | 88 ++++++++++++ .../ps-nodes-config/src/core/forin.ts | 44 ++++++ .../ps-nodes-config/src/core/index.ts | 16 ++- .../ps-nodes-config/src/core/input.ts | 2 + .../ps-nodes-config/src/core/output.ts | 2 + .../@pufflig/ps-nodes-config/src/index.ts | 3 +- packages/@pufflig/ps-nodes/src/core/forin.ts | 38 +++++ packages/@pufflig/ps-nodes/src/core/index.ts | 11 +- packages/@pufflig/ps-nodes/src/core/input.ts | 6 +- packages/@pufflig/ps-nodes/src/core/output.ts | 6 +- packages/@pufflig/ps-types/src/types/nodes.ts | 16 ++- websites/docs/pages/changelog.md | 1 + 17 files changed, 485 insertions(+), 38 deletions(-) create mode 100644 packages/@pufflig/ps-nodes-config/src/core/forin.ts create mode 100644 packages/@pufflig/ps-nodes/src/core/forin.ts diff --git a/packages/@pufflig/ps-chains/src/engines/dataflow/__snapshots__/runFlow.test.ts.snap b/packages/@pufflig/ps-chains/src/engines/dataflow/__snapshots__/runFlow.test.ts.snap index 33d50da..7f1ee8c 100644 --- a/packages/@pufflig/ps-chains/src/engines/dataflow/__snapshots__/runFlow.test.ts.snap +++ b/packages/@pufflig/ps-chains/src/engines/dataflow/__snapshots__/runFlow.test.ts.snap @@ -1,5 +1,65 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP +exports[`a node can run its children multiple times 1`] = ` +{ + "n1": { + "input": { + "data": "", + }, + }, + "n2": { + "input": { + "list": [ + "a", + "b", + ], + }, + "status": "idle", + }, + "n3": { + "input": { + "data": "b", + }, + }, + "n4": { + "input": { + "data": "b", + }, + }, +} +`; + +exports[`a node can run its children multiple times 2 1`] = ` +{ + "n1": { + "input": { + "list": [ + "a", + "b", + ], + }, + "status": "idle", + }, + "n2": { + "input": { + "data": "", + "list": [ + "a", + "b", + ], + }, + }, + "n3": { + "input": { + "data": [ + "a", + "b", + ], + }, + }, +} +`; + exports[`avoid hanging on loops 1`] = ` { "n1": { diff --git a/packages/@pufflig/ps-chains/src/engines/dataflow/constants.ts b/packages/@pufflig/ps-chains/src/engines/dataflow/constants.ts index 5df25bd..3a486c7 100644 --- a/packages/@pufflig/ps-chains/src/engines/dataflow/constants.ts +++ b/packages/@pufflig/ps-chains/src/engines/dataflow/constants.ts @@ -1,16 +1,17 @@ -import { Node, ParamValueMap } from "@pufflig/ps-types"; +import { NextNode, Node, ParamValueMap } from "@pufflig/ps-types"; export const delimiterStart = "${{ps:ref:" as const; export const delimiterEnd = "}}" as const; export const executionPrefix = "exec:"; export const identity = (i: ParamValueMap, _: Partial) => i; -export const getDefaultTargets = (node: Node) => (results: ParamValueMap) => { - if (!node.execution?.outputs?.[0]?.id) return []; - return [ - { - execSource: node.execution.outputs[0].id, - inputs: results, - }, - ]; -}; +export const getDefaultTargets = + (node: Node) => (_input: ParamValueMap, _prev: ParamValueMap, results: ParamValueMap) => { + if (!node.execution?.outputs?.[0]?.id) return []; + return [ + { + execSource: node.execution.outputs[0].id, + inputs: results, + } as NextNode, + ]; + }; diff --git a/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.test.ts b/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.test.ts index 88210ae..910ed09 100644 --- a/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.test.ts +++ b/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.test.ts @@ -1,5 +1,7 @@ import { configOnlyFlow, + execWithLoop, + loopWithJoin, mappedExample, multiInput, multiInputWithOutput, @@ -419,3 +421,51 @@ test("when running a flow in dataflow mode, do no run child executable nodes", a expect(onNodeRunError).toHaveBeenCalledTimes(0); expect(res).toMatchSnapshot(); }); + +/** + * (1) => (🔄2) =2> (3) =2> (4) + * + */ +test("a node can run its children multiple times", async () => { + const onNodeInputUpdate = jest.fn(); + const onNodeRunComplete = jest.fn(); + const onNodeRunError = jest.fn(); + const res = await runFlow( + execWithLoop, + "n1", + {}, + { + onNodeRunComplete, + onNodeInputUpdate, + onNodeRunError, + } + ); + expect(onNodeInputUpdate).toHaveBeenCalledTimes(6); + expect(onNodeRunComplete).toHaveBeenCalledTimes(4); + expect(onNodeRunError).toHaveBeenCalledTimes(0); + expect(res).toMatchSnapshot(); +}); + +/** + * (🔄1) =2> (2) =2> (3) + */ +test("a node can run its children multiple times 2", async () => { + const onNodeInputUpdate = jest.fn(); + const onNodeRunComplete = jest.fn(); + const onNodeRunError = jest.fn(); + const res = await runFlow( + loopWithJoin, + "n1", + {}, + { + logLevel: "debug", + onNodeRunComplete, + onNodeInputUpdate, + onNodeRunError, + } + ); + expect(onNodeInputUpdate).toHaveBeenCalledTimes(7); + expect(onNodeRunComplete).toHaveBeenCalledTimes(3); + expect(onNodeRunError).toHaveBeenCalledTimes(0); + expect(res).toMatchSnapshot(); +}); diff --git a/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.ts b/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.ts index 24b6acb..3312c92 100644 --- a/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.ts +++ b/packages/@pufflig/ps-chains/src/engines/dataflow/runFlow.ts @@ -50,7 +50,7 @@ export async function runFlow(flow: Flow, nodeId: string, input: Record 0; if (isAlreadyRun) { - logger.debug("Node already run"); + logger.debug({ nodeId }, "Node already run"); return; } @@ -147,14 +147,29 @@ export async function runFlow(flow: Flow, nodeId: string, input: Record edge.sourceHandle.startsWith(executionPrefix)); - for (const execution of executionStack) { + logger.debug({ executions: executionOrder }, "Defined executions"); + + for (const execution of executionOrder) { visitedEdges.push(...executionTargets.map((e) => e.id)); const targetId = executionTargets.find((e) => e.sourceHandle === execution.execSource)?.target; + if (targetId) { - await run_flow_recursive(targetId, {}); + // override the infinite loop guard + runs[targetId] = runs[targetId] - 1; + + // data edges between this node and the target node + const dataEdges = Object.values(flow.definition.edges).filter( + (e) => e.source === nodeId && e.target === targetId && !e.sourceHandle.startsWith("exec:") + ); + + const edgeMap = getEdgeMap(dataEdges); + const mappedInput = edgeMap ? mapOutputToInput(execution.inputs, edgeMap) : {}; + + logger.debug({ nodeId, targetId, mappedInput }, "Running executable node"); + await run_flow_recursive(targetId, mappedInput); } } }; diff --git a/packages/@pufflig/ps-chains/src/mocks/chains.ts b/packages/@pufflig/ps-chains/src/mocks/chains.ts index 9e1a3b3..af5da6e 100644 --- a/packages/@pufflig/ps-chains/src/mocks/chains.ts +++ b/packages/@pufflig/ps-chains/src/mocks/chains.ts @@ -1,5 +1,12 @@ import { Flow } from "../types"; -import { configOnlyNode, multiInputDataNode, simpleDataNode, simpleExecNode } from "./nodes"; +import { + configOnlyNode, + joinNodeConfig, + loopNodeConfig, + multiInputDataNode, + simpleDataNode, + simpleExecNode, +} from "./nodes"; export const singleNodeFlow: Flow = { nodeTypes: { @@ -599,3 +606,130 @@ export const simpleExecWithData: Flow = { }, state: {}, }; + +export const execWithLoop: Flow = { + nodeTypes: { + simple_node: simpleExecNode, + loop_node: loopNodeConfig, + }, + definition: { + edges: { + e1: { + id: "e1", + source: "n1", + target: "n2", + sourceHandle: "exec:output", + targetHandle: "exec:input", + }, + e2: { + id: "e2", + source: "n2", + target: "n3", + sourceHandle: "exec:output", + targetHandle: "exec:input", + }, + e3: { + id: "e3", + source: "n2", + target: "n3", + sourceHandle: "data", + targetHandle: "data", + }, + e4: { + id: "e4", + source: "n3", + target: "n4", + sourceHandle: "data", + targetHandle: "data", + }, + }, + nodes: { + n1: { + id: "n1", + type: "simple_node", + }, + n2: { + id: "n2", + type: "loop_node", + }, + n3: { + id: "n3", + type: "simple_node", + }, + n4: { + id: "n4", + type: "simple_node", + }, + }, + }, + state: { + n2: { + status: "idle", + input: { + list: ["a", "b"], + }, + }, + }, +}; + +export const loopWithJoin: Flow = { + nodeTypes: { + simple_node: simpleExecNode, + loop_node: loopNodeConfig, + join_node: joinNodeConfig, + }, + definition: { + edges: { + e1: { + id: "e1", + source: "n1", + target: "n2", + sourceHandle: "exec:output", + targetHandle: "exec:input", + }, + e2: { + id: "e2", + source: "n2", + target: "n3", + sourceHandle: "exec:output", + targetHandle: "exec:input", + }, + e3: { + id: "e3", + source: "n1", + target: "n2", + sourceHandle: "data", + targetHandle: "data", + }, + e4: { + id: "e4", + source: "n2", + target: "n3", + sourceHandle: "list", + targetHandle: "data", + }, + }, + nodes: { + n1: { + id: "n1", + type: "loop_node", + }, + n2: { + id: "n2", + type: "join_node", + }, + n3: { + id: "n3", + type: "simple_node", + }, + }, + }, + state: { + n1: { + status: "idle", + input: { + list: ["a", "b"], + }, + }, + }, +}; diff --git a/packages/@pufflig/ps-chains/src/mocks/nodes.ts b/packages/@pufflig/ps-chains/src/mocks/nodes.ts index c5e9888..dbdf4b9 100644 --- a/packages/@pufflig/ps-chains/src/mocks/nodes.ts +++ b/packages/@pufflig/ps-chains/src/mocks/nodes.ts @@ -5,6 +5,22 @@ export const passthroughNode: NodeActions = { mapInput: async (i) => i, }; +export const loopNode: NodeActions = { + getTargets: async ({ list }) => { + return [ + { execSource: "exec:output", inputs: { data: list?.[0] } }, + { execSource: "exec:output", inputs: { data: list?.[1] } }, + { execSource: "exec:complete", inputs: {} }, + ]; + }, +}; + +export const joinNode: NodeActions<{ data: string; list: string[] }> = { + mapInput: async ({ data }, prev) => { + return { list: [...(prev?.list || []), data] }; + }, +}; + export const simpleDataNode: Node = { name: "simpleNode", parameters: [], @@ -150,3 +166,75 @@ export const simpleExecNode: Node = { ], ...passthroughNode, }; + +export const loopNodeConfig: Node = { + name: "loopNode", + execution: { + inputs: [ + { + id: "exec:input", + }, + ], + outputs: [ + { + id: "exec:output", + }, + ], + }, + parameters: [], + inputs: [ + { + name: "list", + type: "list", + defaultValue: [], + description: "", + id: "list", + }, + ], + outputs: [ + { + name: "data", + type: "text", + defaultValue: "", + description: "", + id: "data", + }, + ], + ...loopNode, +}; + +export const joinNodeConfig: Node = { + name: "joinNode", + execution: { + inputs: [ + { + id: "exec:input", + }, + ], + outputs: [ + { + id: "exec:output", + }, + ], + }, + parameters: [], + inputs: [ + { + name: "data", + type: "text", + defaultValue: "", + description: "", + id: "data", + }, + ], + outputs: [ + { + name: "list", + type: "list", + defaultValue: [], + description: "", + id: "list", + }, + ], + ...joinNode, +}; diff --git a/packages/@pufflig/ps-nodes-config/src/core/forin.ts b/packages/@pufflig/ps-nodes-config/src/core/forin.ts new file mode 100644 index 0000000..08d6bac --- /dev/null +++ b/packages/@pufflig/ps-nodes-config/src/core/forin.ts @@ -0,0 +1,44 @@ +import { NodeConfig } from "@pufflig/ps-types"; + +export const forinNodeType = "core/forin" as const; + +export const forinNodeConfig: NodeConfig = { + name: "Loop", + description: "Loop over items in a list", + tags: ["core", "forin"], + execution: { + inputs: [ + { + id: "input", + }, + ], + outputs: [ + { + id: "output", + }, + { + id: "complete", + }, + ], + }, + customSchema: "output", + parameters: [], + outputs: [ + { + id: "item", + type: "text", + defaultValue: "", + description: "Single item from the loop", + name: "Item", + }, + ], + inputs: [ + { + id: "list", + type: "list", + defaultValue: [], + description: "List of items to loop over", + name: "List", + }, + ], +}; diff --git a/packages/@pufflig/ps-nodes-config/src/core/index.ts b/packages/@pufflig/ps-nodes-config/src/core/index.ts index d558ef1..1879dd8 100644 --- a/packages/@pufflig/ps-nodes-config/src/core/index.ts +++ b/packages/@pufflig/ps-nodes-config/src/core/index.ts @@ -1,7 +1,15 @@ -import { inputNodeConfig } from "./input"; -import { outputNodeConfig } from "./output"; +import { forinNodeConfig, forinNodeType } from "./forin"; +import { inputNodeConfig, inputNodeType } from "./input"; +import { outputNodeConfig, outputNodeType } from "./output"; export const coreNodes = { - "core/input": inputNodeConfig, - "core/output": outputNodeConfig, + [inputNodeType]: inputNodeConfig, + [outputNodeType]: outputNodeConfig, + [forinNodeType]: forinNodeConfig, +}; + +export const coreNodeTypes = { + forinNodeType, + inputNodeType, + outputNodeType, }; diff --git a/packages/@pufflig/ps-nodes-config/src/core/input.ts b/packages/@pufflig/ps-nodes-config/src/core/input.ts index f26f423..f0df2bc 100644 --- a/packages/@pufflig/ps-nodes-config/src/core/input.ts +++ b/packages/@pufflig/ps-nodes-config/src/core/input.ts @@ -1,5 +1,7 @@ import { NodeConfig } from "@pufflig/ps-types"; +export const inputNodeType = "core/input" as const; + export const inputNodeConfig: NodeConfig = { name: "Start", description: "First node in a workflow", diff --git a/packages/@pufflig/ps-nodes-config/src/core/output.ts b/packages/@pufflig/ps-nodes-config/src/core/output.ts index ecc7a1f..573fa1f 100644 --- a/packages/@pufflig/ps-nodes-config/src/core/output.ts +++ b/packages/@pufflig/ps-nodes-config/src/core/output.ts @@ -1,5 +1,7 @@ import { NodeConfig } from "@pufflig/ps-types"; +export const outputNodeType = "core/output" as const; + export const outputNodeConfig: NodeConfig = { name: "End", description: "Final node in a workflow", diff --git a/packages/@pufflig/ps-nodes-config/src/index.ts b/packages/@pufflig/ps-nodes-config/src/index.ts index 4f81b8a..9d1296f 100644 --- a/packages/@pufflig/ps-nodes-config/src/index.ts +++ b/packages/@pufflig/ps-nodes-config/src/index.ts @@ -1,5 +1,5 @@ import { adapterNodeTypes, adapterNodes, adapterSettings, modelConfig } from "./adapters"; -import { coreNodes } from "./core"; +import { coreNodeTypes, coreNodes } from "./core"; import { dataNodeTypes, dataNodes } from "./data"; import { modifierNodes } from "./modifiers"; @@ -26,6 +26,7 @@ export const models = { export const nodeTypes = { ...dataNodeTypes, ...adapterNodeTypes, + ...coreNodeTypes, }; export * from "./types"; diff --git a/packages/@pufflig/ps-nodes/src/core/forin.ts b/packages/@pufflig/ps-nodes/src/core/forin.ts new file mode 100644 index 0000000..33bf4f6 --- /dev/null +++ b/packages/@pufflig/ps-nodes/src/core/forin.ts @@ -0,0 +1,38 @@ +import { nodeTypes, nodes } from "@pufflig/ps-nodes-config"; +import { NextNode, Node } from "@pufflig/ps-types"; + +export interface ForinNodeInputs { + list: Array; +} + +export interface ForinNodeOutputs { + item: string; +} + +const getTargets = async (input: ForinNodeInputs) => { + const stack: NextNode[] = []; + const { list } = input; + + for (let i = 0; i < list.length; i++) { + const item = list[i]; + const nextNode: NextNode = { + execSource: "output", + inputs: { + item, + }, + }; + stack.push(nextNode); + } + + stack.push({ + execSource: "complete", + inputs: {}, + }); + + return stack; +}; + +export const forinNode: Node = { + ...nodes[nodeTypes.forinNodeType], + getTargets, +}; diff --git a/packages/@pufflig/ps-nodes/src/core/index.ts b/packages/@pufflig/ps-nodes/src/core/index.ts index 810cd55..f3942d0 100644 --- a/packages/@pufflig/ps-nodes/src/core/index.ts +++ b/packages/@pufflig/ps-nodes/src/core/index.ts @@ -1,7 +1,10 @@ -import { inputNode, inputNodeType } from "./input"; -import { outputNode, outputNodeType } from "./output"; +import { nodeTypes } from "@pufflig/ps-nodes-config"; +import { forinNode } from "./forin"; +import { inputNode } from "./input"; +import { outputNode } from "./output"; export const coreNodes = { - [inputNodeType]: inputNode, - [outputNodeType]: outputNode, + [nodeTypes.inputNodeType]: inputNode, + [nodeTypes.outputNodeType]: outputNode, + [nodeTypes.forinNodeType]: forinNode, }; diff --git a/packages/@pufflig/ps-nodes/src/core/input.ts b/packages/@pufflig/ps-nodes/src/core/input.ts index 5a68c47..5639980 100644 --- a/packages/@pufflig/ps-nodes/src/core/input.ts +++ b/packages/@pufflig/ps-nodes/src/core/input.ts @@ -1,8 +1,6 @@ -import { nodes } from "@pufflig/ps-nodes-config"; +import { nodeTypes, nodes } from "@pufflig/ps-nodes-config"; import { Node } from "@pufflig/ps-types"; -export const inputNodeType = "core/input" as const; - export const inputNode: Node = { - ...nodes[inputNodeType], + ...nodes[nodeTypes.inputNodeType], }; diff --git a/packages/@pufflig/ps-nodes/src/core/output.ts b/packages/@pufflig/ps-nodes/src/core/output.ts index 3fb4258..ddf1c6d 100644 --- a/packages/@pufflig/ps-nodes/src/core/output.ts +++ b/packages/@pufflig/ps-nodes/src/core/output.ts @@ -1,8 +1,6 @@ -import { nodes } from "@pufflig/ps-nodes-config"; +import { nodeTypes, nodes } from "@pufflig/ps-nodes-config"; import { Node } from "@pufflig/ps-types"; -export const outputNodeType = "core/output" as const; - export const outputNode: Node = { - ...nodes[outputNodeType], + ...nodes[nodeTypes.outputNodeType], }; diff --git a/packages/@pufflig/ps-types/src/types/nodes.ts b/packages/@pufflig/ps-types/src/types/nodes.ts index d6ff303..f477a47 100644 --- a/packages/@pufflig/ps-types/src/types/nodes.ts +++ b/packages/@pufflig/ps-types/src/types/nodes.ts @@ -1,13 +1,13 @@ -import { Param, ParamValueMap } from "./params"; +import { Param } from "./params"; export interface Exec { id: string; name?: string; } -export interface NextNode { +export interface NextNode { execSource: string; - inputs: ParamValueMap; + inputs: I; } export interface NodeConfig { @@ -25,13 +25,17 @@ export interface NodeConfig { } export type Execute = (input: I, prevInput?: Partial) => Promise; -export type MapInput = (input: I, prevInput?: Partial) => Promise; -export type GetTargets = (result: O) => Promise; +export type MapInput = (input: I, prevInput?: Partial) => Promise | null>; +export type GetTargets = ( + input: I, + prevInput?: Partial, + result?: O +) => Promise>[]>; export interface NodeActions { execute?: Execute; mapInput?: MapInput; - getTargets?: GetTargets; + getTargets?: GetTargets; } export type Node = NodeConfig & NodeActions; diff --git a/websites/docs/pages/changelog.md b/websites/docs/pages/changelog.md index 38a19aa..36f78c3 100644 --- a/websites/docs/pages/changelog.md +++ b/websites/docs/pages/changelog.md @@ -20,6 +20,7 @@ head: - :warning: **API** [@pufflig/ps-chains] rename `parseInput` method to `mapInput` - **API** [@pufflig/ps-chains] add `getNext` lifecycle method. With this method a node can decide what target execution nodes to run and in what order. This method can be used to create loops / conditionals. - [@pufflig/ps-nodes] add a node to generate embeddings using the openai api. +- [@pufflig/ps-nodes] add a node to run loops ## 0.15.0