Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal for reusable Process components #1154

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion services/horizon/internal/docs/plans/new_horizon_ingest.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ Examples of sinks / stores:
- A postgres store for a wallet app that updates the app's `users` table any time a user's balance changes
- A streaming service that sends websockets notifications whenever the price for a particular asset changes

Unfortunately, `Store`s and `Process` objects are not reuseable, because they have a particular storage schema baked-in in order to know how to write out updates. This is why the `Store`s in the diagram below have names like `MyPostgresStore` rather than `PostgresStore`. However, `Store`s are extensible, because a developer can create and add a new store or `Process` to their app that writes to the same underlying storage (say a postgres database) and adds new tables or fields.
`Process` is an interface that transforms an input object into an output object. `Process` takes in a map for it's input and outputs a map as the result. The fields of the input map to a `Process` instance is defined by the `requiredFields() []string` method. The fields that a `Process` instance will output is defined by the `fieldUpdates() map[string]FieldAction` method, where `FieldAction` is an enum with one of the values `add`, `remove`, or `modify`. If the field remains untouched when passing through the `Process` instance then it should not be included in the map object output by `fieldUpdates()`.

We can chain multiple `Process` instances in a tree structure that is controlled by a `ProcessingPipeline` instance. Thus, the many implementations of the `Process` interface will all be reusable components. `ProcessingPipeline` will have the smarts to `validate()` a group of `Process`es to check that all their input and output fields are valid based on the ordering and setup of the process chain. This will ensure that the user does not accidentally configure an invalid `ProcessingPipeline` and we can find this out on app startup instead of at runtime. The onus of constructing a valid `ProcessingPipeline` is ultimately on the user.

`Process` will also have a `process(input map[string]interface{}) map[string]interface{}` method and we can use the output of one `Process` as an input to the next `Process`. The output of a `ProcessingPipeline` will also be a `map[string]interface{}` that can be consumed by a `Store`. The `Store` will be capable of converting this map into the necessary database actions for it's own specific database implementation.

Unfortunately, `Store`s are not reuseable, because they have a particular storage schema baked-in in order to know how to write out updates. This is why the `Store`s in the diagram below have names like `MyPostgresStore` rather than `PostgresStore`. However, `Store`s are extensible, because a developer can create and add a new store or `Process` to their app that writes to the same underlying storage (say a postgres database) and adds new tables or fields.

There's also a significant challenge keeping stores in sync. If multiple `Process`es write to the same underlying store, or to entirely different stores, then any read operation that reads across stores or across data updated by multiple `Processes`s is at risk of reading inconsistent values.

Expand Down Expand Up @@ -169,6 +175,8 @@ There are several open questions, and the design above isn't comprehensive. A fe
- How exactly do we organize `Store`s, `ProcessingPipeline`, and `Process`es?
- How do we keep reads from multiple stores consistent?
- How do we make the `Store`s and `Process`es in `ingest/stores` reusable?
- If the output of a `Process` results in a field with an empty list value, how does that affect `Process`es downstream?
- Should all `Process` objects be required to function correctly if provided with non-nil empty values for each required field?

## Implementation Plan

Expand Down