-
Notifications
You must be signed in to change notification settings - Fork 3
/
workflow.py
68 lines (56 loc) · 2.94 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from abc import ABC
from iwf.communication_schema import CommunicationSchema
from iwf.persistence_schema import PersistenceSchema
from iwf.state_schema import StateSchema
class ObjectWorkflow(ABC):
"""ObjectWorkflow is the interface to define a workflow definition.
ObjectWorkflow is a top level concept in iWF. Any object that is long-lasting
can be modeled as an ObjectWorkflow.
"""
def get_workflow_states(self) -> StateSchema:
"""
GetWorkflowStates defines the states of the workflow. A state represents
a step of the workflow state machine.
A state can execute some commands (signal/timer) and wait for result
See more details in the WorkflowState interface.
It can return an empty list, meaning no states.
There can be at most one startingState in the list.
If there is no startingState or with the default empty state list, the workflow
will not start any state execution after workflow stated. Application can still
use RPC to invoke new state execution in the future.
Returns:
A list of workflow state definitions. Default to empty.
"""
return StateSchema()
def get_persistence_schema(self) -> PersistenceSchema:
"""
GetPersistenceSchema defines all the persistence fields for this workflow, includes:
1. Data attributes
2. Search attributes
Data attributes can be read/upsert in WorkflowState WaitUntil/Execute API
Data attributes can also be read by getDataAttributes API by external applications using Client
Search attributes can be read/upsert in WorkflowState WaitUntil/Execute API
Search attributes can also be read by GetSearchAttributes Client API by external applications.
External applications can also use "SearchWorkflow" API to find workflows by SQL-like query
Returns:
A persistence schema. Default to empty.
"""
return PersistenceSchema()
def get_communication_schema(self) -> CommunicationSchema:
"""
GetCommunicationSchema defines all the communication methods for this workflow, this includes
1. Signal channel
2. Interstate channel
Signal channel is for external applications to send signal to workflow execution.
ObjectWorkflow execution can listen on the signal in the WorkflowState WaitUntil API and receive in
the WorkflowState Execute API
InterStateChannel is for synchronization communications between WorkflowStates.
E.g. WorkflowStateA will continue after receiving a value from WorkflowStateB
Returns:
A communication schema. Default to empty.
"""
return CommunicationSchema()
def get_workflow_type(wf: ObjectWorkflow) -> str:
return wf.__class__.__name__
def get_workflow_type_by_class(wf_class: type[ObjectWorkflow]) -> str:
return wf_class.__name__