Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Persistence.Query: need backpressure control for queries #6404

Closed
Aaronontheweb opened this issue Feb 14, 2023 · 2 comments
Closed

Akka.Persistence.Query: need backpressure control for queries #6404

Aaronontheweb opened this issue Feb 14, 2023 · 2 comments

Comments

@Aaronontheweb
Copy link
Member

Is your feature request related to a problem? Please describe.

Here's a problem that affects users trying to run large-scale CQRS with Akka.NET: Akka.Persistence.Query does not currently constrain the number of parallel database connections being opened on any implementation of Akka.Persistence, therefore it's quite ease to instantly melt even a well-provisioned Postgres instance if each node is running thousands of EventsByEntyId queries, all of which poll at a rate of once per-second.

Akka.Persistence Recovery and writes both regulate the number of open parallel connections through the internals of the AsyncWriteJournal actor and in order to solve this very problem from occurring inside normal Akka.Persistence operations. We should incorporate an equivalent mechanism for adding backpressure support to Akka.Persistence.Query as well.

Describe the solution you'd like

I'd probably use Akka.Streams to do this - a pipeline with a fixed number of parallel queries (SelectAsync(degreeOfParallelism) which queues any pending query request until the ones ahead of it have completed. No need to worry about ordering here - a single requestor for a large, paginated query has to send its requests to the journal in chunks anyway.

Describe alternatives you've considered

We might have to implement this on a per-journal basis, as there's no unified implementation for handling them currently:

https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Persistence.Query/PersistenceQuery.cs

We could try to develop some unified APIs around this but that might need to occur after the initial 1.5 release.

@Aaronontheweb
Copy link
Member Author

Design

In order to rate-limit the number of open database connections at any given time, I'd propose we put a ReadJournal "front-end" in front of the AsyncWriteJournal implementation :

flowchart TD
    A[QuerySubmitted] -->|Offset,QueryType,Tag or EntityId| B(ReadJournal.ReceiveQuery)
    B --> |Buffer into Akka.Stream|C(SelectAsync w/ Max Parallelism = N)
    C --> D{SelectAsync}
    D --> |Capacity Avaiable| E(Query to AsyncWriteJournal)
    D --> |Capacity Unavailable - Backpressure| D
Loading

The SelectAsync is key - we're basically implementing a simple token bucket throttler that weights all queries the same. This likely means:

  • All journal implementation have to have some sort of custom acknowledgement message or
  • We bake a standard one into Akka.Persistence.Query that all journal implementations have to support in order to fulfill the Akka.Persistence.TCK.

Existing Problems with Akka.Persistence.Query's Design

In order to address this backpressure problem, there's a larger logistical issue at-hand - namely that every Akka.Persistence.Query implementation is decentralized: there's no consistent methodology or messaging designed to implement all of the query types optionally supported by the IReadJournal interface. It's all decentralized and up to each plugin to implement individually.

So this raises an interesting question - do we add a centralized means of rate-limiting or have each plugin follow a reproducible pattern on their own?

@Aaronontheweb
Copy link
Member Author

My Opinion it's probably most feasible to let each plugin implement this themselves. Some plugins, like Akka.Persistence.Azure, probably don't even need this feature since it's all HTTP-based anyway - no need to burden that plugin with backpressure support it doesn't need.

In addition to that - it'd be a pretty massive re-organization of Akka.Persistence.Query to force a common base implementation spread across all plugins. Organizing that, implementing it in the TCK, and getting all plugins up to date in a timely fashion to support Akka.NET v1.5 is probably the greater of two evils. Better to establish a pattern and have other plugins follow it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Development

No branches or pull requests

1 participant