Skip to content

Commit

Permalink
[Fix serverlessworkflow#677] Atomic produce and consume actions
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
  • Loading branch information
fjtirado committed Feb 28, 2024
1 parent 3b6f976 commit f7ae643
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 49 deletions.
13 changes: 9 additions & 4 deletions examples/event-based-service-invocation.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@
"actions": [
{
"name": "make-appointment-action",
"eventRef": {
"produceEventRef": "make-vet-appointment",
"data": "${ .patientInfo }",
"consumeEventRef": "vet-appointment-info"
"produceEventRef": {
"name": "make-vet-appointment",
"data": "${ .patientInfo }"
}
},
{
"name": "wait-appointement-confirmation",
"consumeEventRef": {
"name": "vet-appointment-info"
},
"actionDataFilter": {
"results": "${ .appointmentInfo }"
Expand Down
48 changes: 31 additions & 17 deletions schema/workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,13 @@
"description": "References a function to be invoked",
"$ref": "#/definitions/functionref"
},
"eventRef": {
"description": "References a `produce` and `consume` reusable event definitions",
"$ref": "#/definitions/eventref"
"produceEventRef": {
"description": "References a `produce` reusable event definition",
"$ref": "#/definitions/produceEventref"
},
"consumeEventRef": {
"description": "References a `consume` reusable event definition",
"$ref": "#/definitions/consumeEventref"
},
"subFlowRef": {
"description": "References a sub-workflow to invoke",
Expand Down Expand Up @@ -521,24 +525,15 @@
}
]
},
"eventref": {
"produceEventRef": {
"type": "object",
"description": "Event References",
"description": "Publish an event",
"properties": {
"produceEventRef": {
"name": {
"type": "string",
"description": "Reference to the unique name of a 'produced' event definition",
"pattern": "^[a-z0-9](-?[a-z0-9])*$"
},
"consumeEventRef": {
"type": "string",
"description": "Reference to the unique name of a 'consumed' event definition",
"pattern": "^[a-z0-9](-?[a-z0-9])*$"
},
"consumeEventTimeout": {
"type": "string",
"description": "Maximum amount of time (ISO 8601 format) to wait for the result event. If not defined it should default to the actionExecutionTimeout"
},
"data": {
"type": [
"string",
Expand All @@ -556,7 +551,26 @@
},
"additionalProperties": false,
"required": [
"produceEventRef"
"name", "data"
]
},
"consumeEventref": {
"type": "object",
"description": "Waits for an event",
"properties": {
"name": {
"type": "string",
"description": "Reference to the unique name of a 'consumed' event definition",
"pattern": "^[a-z0-9](-?[a-z0-9])*$"
},
"consumeEventTimeout": {
"type": "string",
"description": "Maximum amount of time (ISO 8601 format) to wait for the result event. If not defined it should default to the actionExecutionTimeout"
}
},
"additionalProperties": false,
"required": [
"name"
]
},
"subflowref": {
Expand Down Expand Up @@ -1690,7 +1704,7 @@
"name",
"type",
"action",
"eventRef"
""
]
},
"else": {
Expand Down
86 changes: 58 additions & 28 deletions specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -3987,7 +3987,8 @@ Actions specify invocations of services or other workflows during workflow execu
Service invocation can be done in three different ways:

* Reference [functions definitions](#Function-Definition) by its unique name using the `functionRef` property.
* Reference a `produced` and `consumed` [event definitions](#Event-Definition) via the `eventRef` property.
* Reference a `produced` [event definitions](#Event-Definition) via the `produceEventRef` property.
* Reference a `consumer` [event definitions](#Event-Definition) via the `consumeEventRef` property.
* Reference a sub-workflow invocation via the `subFlowRef` property.

Note that `functionRef`, `eventRef`, and `subFlowRef` are mutually exclusive, meaning that only one of them can be
Expand All @@ -3997,8 +3998,7 @@ The `name` property specifies the action name.

In the event-based scenario a service, or a set of services we want to invoke
are not exposed via a specific resource URI for example, but can only be invoked via an event.
The [eventRef](#EventRef-Definition) property defines the
referenced `produced` event via its `produceEventRef` property and a `consumed` event via its `consumeEventRef` property.
In that case, a `produced` event might be referenced via its `produceEventRef` property and a `consumed` event via its `consumeEventRef` property.

The `sleep` property can be used to define time periods that workflow execution should sleep
before and/or after function execution. It can have two properties:
Expand Down Expand Up @@ -4125,18 +4125,15 @@ In addition, functions that are invoked async do not propagate their errors to t
workflow state, meaning that any errors that happen during their execution cannot be handled in the workflow states
onErrors definition. Note that errors raised during functions that are invoked async should not fail workflow execution.

##### EventRef Definition
##### ProduceEventRef Definition

Allows defining invocation of a function via event.
Publish an event. It references the unique name of a `produced` event definition. Must follow the [Serverless Workflow Naming Convention](#naming-convention)

| Parameter | Description | Type | Required |
| --- | --- | --- | --- |
| [produceEventRef](#Event-Definition) | Reference to the unique name of a `produced` event definition. Must follow the [Serverless Workflow Naming Convention](#naming-convention) | string | yes |
| [consumeEventRef](#Event-Definition) | Reference to the unique name of a `consumed` event definition. Must follow the [Serverless Workflow Naming Convention](#naming-convention) | string | no |
| consumeEventTimeout | Maximum amount of time (ISO 8601 format literal or expression) to wait for the consume event. If not defined it be set to the [actionExecutionTimeout](#Workflow-Timeout-Definition) | string | no |
| data | If string type, an expression which selects parts of the states data output to become the data (payload) of the event referenced by `produceEventRef`. If object type, a custom object to become the data (payload) of the event referenced by `produceEventRef`. | string or object | no |
| [name](#Event-Definition) | Reference to the unique name of a `produced` event definition. Must follow the [Serverless Workflow Naming Convention](#naming-convention) | string | yes |
| data | If string type, an expression which selects parts of the states data output to become the data (payload) of the event referenced by `produceEventRef`. If object type, a custom object to become the data (payload) of the event referenced by `produceEventRef`. | string or object | yes |
| contextAttributes | Add additional event extension context attributes to the trigger/produced event | object | no |
| invoke | Specifies if the function should be invoked sync or async. Default is sync | enum | no |

<details><summary><strong>Click to view example definition</strong></summary>
<p>
Expand All @@ -4151,10 +4148,9 @@ Allows defining invocation of a function via event.

```json
{
"eventRef": {
"produceEventRef": "make-vet-appointment",
"produceEventRef": {
"name": "make-vet-appointment",
"data": "${ .patientInfo }",
"consumeEventRef": "vet-appointment-info"
}
}
```
Expand All @@ -4163,10 +4159,9 @@ Allows defining invocation of a function via event.
<td valign="top">

```yaml
eventRef:
produceEventRef: make-vet-appointment
produceEventRef:
name: make-vet-appointment
data: "${ .patientInfo }"
consumeEventRef: vet-appointment-info
```

</td>
Expand All @@ -4175,27 +4170,62 @@ eventRef:

</details>

References a `produced` and `consumed` [event definitions](#Event-Definition) via the `produceEventRef` and `consumeEventRef` properties, respectively.
References a `produced` [event definitions](#Event-Definition) via the `name` property.

The `data` property can have two types: string or object. If it is of string type, it is an expression that can select parts of state data
to be used as payload of the event referenced by `produceEventRef`. If it is of object type, you can define a custom object to be the event payload.

The `contextAttributes` property allows you to add one or more [extension context attributes](https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#extension-context-attributes)
to the trigger/produced event.

##### ConsumeEventRef Definition

Wait for an event to arrive.

| Parameter | Description | Type | Required |
| --- | --- | --- | --- |
| [name](#Event-Definition) | Reference to the unique name of a `consumed` event definition. Must follow the [Serverless Workflow Naming Convention](#naming-convention) | string | yes |
| consumeEventTimeout | Maximum amount of time (ISO 8601 format literal or expression) to wait for the consume event. If not defined it be set to the [actionExecutionTimeout](#Workflow-Timeout-Definition) | string | no |

<details><summary><strong>Click to view example definition</strong></summary>
<p>

<table>
<tr>
<th>JSON</th>
<th>YAML</th>
</tr>
<tr>
<td valign="top">

```json
{
"consumeEventRef": {
"name": "approved-appointment",
}
}
```

</td>
<td valign="top">

```yaml
eventRef:
consumeEventRef: approved-appointment
```

</td>
</tr>
</table>

</details>

References a `consumed` [event definitions](#Event-Definition) via the `name` property.

The `consumeEventTimeout` property defines the maximum amount of time (ISO 8601 format literal or expression) to wait for the result event. If not defined it should default to the [actionExecutionTimeout](#Workflow-Timeout-Definition).
If the event defined by the `consumeEventRef` property is not received in that set time, action invocation should raise an error
that can be handled in the states `onErrors` definition. In case the `consumeEventRef` is not defined, the `consumeEventTimeout` property is ignored.
If the event defined by the `name` property is not received in that set time, action invocation should raise an error that can be handled in the states `onErrors` definition.

The `invoke` property defines how the function is invoked (sync or async). Default value of this property is
`sync`, meaning that workflow execution should wait until the function completes (the result event is received).
If set to `async`, workflow execution should just produce the trigger event and should not wait for the result event.
Note that in this case the action does not produce any results (payload of the result event) and the associated actions eventDataFilter as well as
its retry definition, if defined, should be ignored.
Functions that are invoked via events (sync or async) do not propagate their errors to the associated action definition and the
workflow state, meaning that any errors that happen during their execution cannot be handled in the workflow states
onErrors definition. Note that errors raised during functions that are invoked sync or async in this case
should not fail workflow execution.

##### SubFlowRef Definition

Expand Down

0 comments on commit f7ae643

Please sign in to comment.