Skip to content

Commit

Permalink
feat(Ash.Reactor): Add bulk_update step type. (#1185)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimsynz authored May 22, 2024
1 parent a964b3b commit ab3786e
Show file tree
Hide file tree
Showing 11 changed files with 1,084 additions and 15 deletions.
256 changes: 252 additions & 4 deletions documentation/dsls/DSL:-Ash.Reactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ Caveats/differences from `Ash.bulk_create/4`:

### Examples
```
create :create_posts, MyApp.Post, :create do
initial inputs(:titles)
actor(result(:get_user))
tenant(result(:get_organisation, [:id]))
bulk_create :create_posts, MyApp.Post, :create do
initial input(:titles)
actor result(:get_user)
tenant result(:get_organisation, [:id])
end
```
Expand Down Expand Up @@ -421,6 +421,254 @@ Target: `Ash.Reactor.Dsl.BulkCreate`



## reactor.bulk_update
```elixir
bulk_update name, resource, action \\ nil
```


Declares a step which will call an update action on a resource with a collection of inputs.

> ### Check the docs! {: .warning}
>
> Make sure to thoroughly read and understand the documentation in `Ash.bulk_update/4` before using. Read each option and note the default values. By default, bulk updates don't return records or errors, and don't emit notifications.
Caveats/differences from `Ash.bulk_update/4`:

1. `max_concurrency` specifies the number of tasks that Ash will start to process batches, and has no effect on Reactor concurrency targets. It's could be possible to create a very large number of processes if a number of steps are running bulk actions with a high degree of concurrency.
2. Setting `notify?` to `true` will cause both `notify?` and `return_notifications?` to be set to true in the underlying call to `Ash.bulk_create/4`. Notifications will then be managed by the `Ash.Reactor.Notifications` Reactor middleware.
3. If you specify an undo action it must be a generic action which takes the bulk result as it's only argument.

> #### Undo behaviour {: .tip}
>
> This step has three different modes of undo.
>
> * `never` - The result of the action is never undone. This is the default.
> * `always` - The `undo_action` will always be called.
> * `outside_transaction` - The `undo_action` will not be called when running inside a `transaction` block, but will be otherwise.


### Nested DSLs
* [actor](#reactor-bulk_update-actor)
* [inputs](#reactor-bulk_update-inputs)
* [tenant](#reactor-bulk_update-tenant)
* [wait_for](#reactor-bulk_update-wait_for)


### Examples
```
bulk_update :publish_posts, MyApp.Post, :publish do
initial input(:posts),
actor result(:get_user)
end
```



### Arguments

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`name`](#reactor-bulk_update-name){: #reactor-bulk_update-name .spark-required} | `atom` | | A unique name for the step. |
| [`resource`](#reactor-bulk_update-resource){: #reactor-bulk_update-resource .spark-required} | `module` | | The resource to call the action on. |
| [`action`](#reactor-bulk_update-action){: #reactor-bulk_update-action } | `atom` | | The name of the action to call on the resource. |
### Options

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`initial`](#reactor-bulk_update-initial){: #reactor-bulk_update-initial .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | A collection of inputs to pass to the create action. Must implement the `Enumerable` protocol. |
| [`allow_stream_with`](#reactor-bulk_update-allow_stream_with){: #reactor-bulk_update-allow_stream_with } | `:keyset \| :offset \| :full_read` | `:keyset` | The 'worst' strategy allowed to be used to fetch records if the :stream strategy is chosen. See the `Ash.stream!/2` docs for more. |
| [`assume_casted?`](#reactor-bulk_update-assume_casted?){: #reactor-bulk_update-assume_casted? } | `boolean` | `false` | Whether or not to cast attributes and arguments as input. This is an optimization for cases where the input is already casted and/or not in need of casting |
| [`atomic_update`](#reactor-bulk_update-atomic_update){: #reactor-bulk_update-atomic_update } | `map` | | A map of atomic updates to apply. See `Ash.Changeset.atomic_update/3` for more. |
| [`authorize_changeset_with`](#reactor-bulk_update-authorize_changeset_with){: #reactor-bulk_update-authorize_changeset_with } | `:filter \| :error` | `:filter` | If set to `:error`, instead of filtering unauthorized changes, unauthorized changes will raise an appropriate forbidden error |
| [`authorize_query_with`](#reactor-bulk_update-authorize_query_with){: #reactor-bulk_update-authorize_query_with } | `:filter \| :error` | `:filter` | If set to `:error`, instead of filtering unauthorized query results, unauthorized query results will raise an appropriate forbidden error |
| [`authorize_query?`](#reactor-bulk_update-authorize_query?){: #reactor-bulk_update-authorize_query? } | `boolean` | `true` | If a query is given, determines whether or not authorization is run on that query. |
| [`batch_size`](#reactor-bulk_update-batch_size){: #reactor-bulk_update-batch_size } | `nil \| pos_integer` | | The number of records to include in each batch. Defaults to the `default_limit` or `max_page_size` of the action, or 100. |
| [`filter`](#reactor-bulk_update-filter){: #reactor-bulk_update-filter } | `map \| keyword` | | A filter to apply to records. This is also applied to a stream of inputs. |
| [`load`](#reactor-bulk_update-load){: #reactor-bulk_update-load } | `atom \| list(atom)` | `[]` | A load statement to apply to records. Ignored if `return_records?` is not true. |
| [`lock`](#reactor-bulk_update-lock){: #reactor-bulk_update-lock } | `any` | | A lock statement to add onto the query. |
| [`max_concurrency`](#reactor-bulk_update-max_concurrency){: #reactor-bulk_update-max_concurrency } | `non_neg_integer` | `0` | If set to a value greater than 0, up to that many tasks will be started to run batches asynchronously. |
| [`notification_metadata`](#reactor-bulk_update-notification_metadata){: #reactor-bulk_update-notification_metadata } | `map \| Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | `%{}` | Metadata to be merged into the metadata field for all notifications sent from this operation. |
| [`notify?`](#reactor-bulk_update-notify?){: #reactor-bulk_update-notify? } | `boolean` | `false` | Whether or not to generate any notifications. This may be intensive for large bulk actions. |
| [`page`](#reactor-bulk_update-page){: #reactor-bulk_update-page } | `keyword` | `[]` | Pagination options, see [the pagination docs for more](read-actions.md#pagination). |
| [`read_action`](#reactor-bulk_update-read_action){: #reactor-bulk_update-read_action } | `atom` | | The action to use when building the read query. |
| [`return_errors?`](#reactor-bulk_update-return_errors?){: #reactor-bulk_update-return_errors? } | `boolean` | `false` | Whether or not to return all of the errors that occur. Defaults to false to account for large inserts. |
| [`return_records?`](#reactor-bulk_update-return_records?){: #reactor-bulk_update-return_records? } | `boolean` | `false` | Whether or not to return all of the records that were inserted. Defaults to false to account for large inserts. |
| [`return_stream?`](#reactor-bulk_update-return_stream?){: #reactor-bulk_update-return_stream? } | `boolean` | `false` | If set to `true`, instead of an `Ash.BulkResult`, a mixed stream is returned. |
| [`reuse_values?`](#reactor-bulk_update-reuse_values?){: #reactor-bulk_update-reuse_values? } | `boolean` | `false` | Whether calculations are allowed to reuse values that have already been loaded, or must refetch them from the data layer. |
| [`rollback_on_error?`](#reactor-bulk_update-rollback_on_error?){: #reactor-bulk_update-rollback_on_error? } | `boolean` | `true` | Whether or not to rollback the transaction on error, if the resource is in a transaction. |
| [`select`](#reactor-bulk_update-select){: #reactor-bulk_update-select } | `atom \| list(atom)` | | A select statement to apply to records. Ignored if `return_records?` is not `true`. |
| [`skip_unknown_inputs`](#reactor-bulk_update-skip_unknown_inputs){: #reactor-bulk_update-skip_unknown_inputs } | `atom \| list(atom)` | | A list of inputs that, if provided, will be ignored if they are not recognized by the action. |
| [`sorted?`](#reactor-bulk_update-sorted?){: #reactor-bulk_update-sorted? } | `boolean` | `false` | Whether or not to sort results by their input position, in cases where `return_records?` is set to `true`. |
| [`stop_on_error?`](#reactor-bulk_update-stop_on_error?){: #reactor-bulk_update-stop_on_error? } | `boolean` | `false` | If `true`, the first encountered error will stop the action and be returned. Otherwise, errors will be skipped. |
| [`strategy`](#reactor-bulk_update-strategy){: #reactor-bulk_update-strategy } | `list(:atomic \| :atomic_batches \| :stream)` | `[:atomic]` | The strategy or strategies to enable. `:stream` is used in all cases if the data layer does not support atomics. |
| [`stream_batch_size`](#reactor-bulk_update-stream_batch_size){: #reactor-bulk_update-stream_batch_size } | `pos_integer` | | Batch size to use if provided a query and the query must be streamed. |
| [`stream_with`](#reactor-bulk_update-stream_with){: #reactor-bulk_update-stream_with } | `:keyset \| :offset \| :full_read` | | The specific strategy to use to fetch records. See `Ash.stream!/2` docs for more. |
| [`success_state`](#reactor-bulk_update-success_state){: #reactor-bulk_update-success_state } | `:success \| :partial_success` | `:success` | Bulk results can be entirely or partially successful. Chooses the `Ash.BulkResult` state to consider the step a success. |
| [`timeout`](#reactor-bulk_update-timeout){: #reactor-bulk_update-timeout } | `timeout` | | If none is provided, the timeout configured on the domain is used (which defaults to `30_000`). |
| [`transaction`](#reactor-bulk_update-transaction){: #reactor-bulk_update-transaction } | `:all \| :batch \| false` | `:batch` | Whether or not to wrap the entire execution in a transaction, each batch, or not at all. |
| [`domain`](#reactor-bulk_update-domain){: #reactor-bulk_update-domain } | `module` | | The Domain to use when calling the action. Defaults to the Domain set on the resource or in the `ash` section. |
| [`async?`](#reactor-bulk_update-async?){: #reactor-bulk_update-async? } | `boolean` | `true` | When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`. |
| [`authorize?`](#reactor-bulk_update-authorize?){: #reactor-bulk_update-authorize? } | `boolean \| nil` | | Explicitly enable or disable authorization for the action. |
| [`description`](#reactor-bulk_update-description){: #reactor-bulk_update-description } | `String.t` | | A description for the step |
| [`undo_action`](#reactor-bulk_update-undo_action){: #reactor-bulk_update-undo_action } | `atom` | | The name of the action to call on the resource when the step is to be undone. |
| [`undo`](#reactor-bulk_update-undo){: #reactor-bulk_update-undo } | `:always \| :never \| :outside_transaction` | `:never` | How to handle undoing this action |


## reactor.bulk_update.actor
```elixir
actor source
```


Specifies the action actor





### Arguments

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`source`](#reactor-bulk_update-actor-source){: #reactor-bulk_update-actor-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the actor. |
### Options

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`transform`](#reactor-bulk_update-actor-transform){: #reactor-bulk_update-actor-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the actor before it is passed to the action. |





### Introspection

Target: `Ash.Reactor.Dsl.Actor`

## reactor.bulk_update.inputs
```elixir
inputs template
```


Specify the inputs for an action



### Examples
```
inputs %{
author: result(:get_user),
title: input(:title),
body: input(:body)
}
```

```
inputs(author: result(:get_user))
```



### Arguments

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`template`](#reactor-bulk_update-inputs-template){: #reactor-bulk_update-inputs-template .spark-required} | `%{optional(atom) => Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value} \| keyword(Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value)` | | |
### Options

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`transform`](#reactor-bulk_update-inputs-transform){: #reactor-bulk_update-inputs-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which will transform the inputs before executing the action. |





### Introspection

Target: `Ash.Reactor.Dsl.Inputs`

## reactor.bulk_update.tenant
```elixir
tenant source
```


Specifies the action tenant





### Arguments

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`source`](#reactor-bulk_update-tenant-source){: #reactor-bulk_update-tenant-source .spark-required} | `Reactor.Template.Input \| Reactor.Template.Result \| Reactor.Template.Value` | | What to use as the source of the tenant. |
### Options

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`transform`](#reactor-bulk_update-tenant-transform){: #reactor-bulk_update-tenant-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the tenant before it is passed to the action. |





### Introspection

Target: `Ash.Reactor.Dsl.Tenant`

## reactor.bulk_update.wait_for
```elixir
wait_for names
```


Wait for the named step to complete before allowing this one to start.

Desugars to `argument :_, result(step_to_wait_for)`




### Examples
```
wait_for :create_user
```



### Arguments

| Name | Type | Default | Docs |
|------|------|---------|------|
| [`names`](#reactor-bulk_update-wait_for-names){: #reactor-bulk_update-wait_for-names .spark-required} | `atom \| list(atom)` | | The name of the step to wait for. |






### Introspection

Target: `Reactor.Dsl.WaitFor`




### Introspection

Target: `Ash.Reactor.Dsl.BulkUpdate`



## reactor.change
```elixir
change name, change
Expand Down
3 changes: 2 additions & 1 deletion lib/ash.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ defmodule Ash do
@read_opts_schema Spark.Options.merge(
[
page: [
doc: "Pagination options, see the pagination docs for more",
doc:
"Pagination options, see [the pagination docs for more](read-actions.md#pagination).",
type: {:custom, Ash.Page, :page_opts, []}
],
load: [
Expand Down
3 changes: 2 additions & 1 deletion lib/ash/reactor/builders/bulk_create.ex
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ defimpl Reactor.Dsl.Build, for: Ash.Reactor.Dsl.BulkCreate do
DslError.exception(
module: Transformer.get_persisted(dsl_state, :module),
path: error_path,
message: "The undo action for an create step should take a single `bulk_result` argument."
message:
"The undo action for a bulk create step should take a single `bulk_result` argument."
)}
end
end
Loading

0 comments on commit ab3786e

Please sign in to comment.