-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Akka.Persistence.Query Throttling implementation - "QueryPermitter" #6436
Akka.Persistence.Query Throttling implementation - "QueryPermitter" #6436
Conversation
…f` param refactor all Akka.Persistence.Query implementations take an `IActorRef` param so we can swap the journal reference for a "throttler" actor in the middle
This reverts commit e9d7639.
SQlite test suite passed locally. Going to try running this inside https://github.com/Aaronontheweb/AkkaSqlQueryCrushTest |
Added a "crush test" to apply maximum stress to Akka.Persistence.SqlServer: https://github.com/Aaronontheweb/AkkaSqlQueryCrushTest 1000 recoveries, 1000 * 10 writes, 1000 * 10 persistence queries doing 1 event per second BaselineUsing Akka.NET v1.4.49 and latest for Akka.Persistence.SqlServer:
This PRUsing 1.5.0-beta2 with this PR and max concurrent queries = 30
|
Redesigned the benchmark to eliminate setup overhead. Baseline
This PR
|
Ran the benchmarks with write-side disabled, queries only - might be seeing some of the divergence I expected now... Baseline
This PR
Going to try this with 10,000 parallel queries |
10,000 concurrent queries, running continuously Baseline
This PR
Looks to me like whatever issues users were able to reproduce with PostgreSql absolutely melting down around ~3,000 concurrent queries, my dinky SQL Server instance running inside a Docker container appears to shrug off 10,000. Maybe I need to increase the event counts in the journal to strain the indicies some. |
Dialing up the tests to do 100,000 entities / queries with 100 events each. |
Buffer.DeliverBuffer(TotalDemand);
if (Buffer.IsEmpty && CurrentOffset > ToOffset)
OnCompleteThenStop(); EDIT: Not the cause of the test failure |
This reverts commit 6ae4f41.
New test - 100,000 entities with 100 events each, querying up to 10 events at a time. BaselineRecovery ran for 20 minutes, with multiple timeout failures occurring. Only 1/100_000 entities successfully recovered. This PR(WIP) |
This PRRecovered 1m events in
Some notes:
|
Completely recovered 10m events with 100,000 queries running continuously
|
Going to run our control one more time... |
Control experiment started persistently erroring out after about 1 minute - about 20% of query attempts made it through without erroring out beyond that. The throttler definitely preserves the integrity of the system. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performed a review of my own changes
@@ -40,39 +40,19 @@ | |||
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<PackageReleaseNotes>Version 1.5.0-beta1 contains **breaking API changes** and new API changes for Akka.NET. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore - this is just build.cmd
output.
|
||
private Continue() { } | ||
} | ||
|
||
public static Props Props(long fromOffset, TimeSpan? refreshInterval, int maxBufferSize, string writeJournalPluginId) | ||
public static Props Props(long fromOffset, TimeSpan? refreshInterval, int maxBufferSize, IActorRef writeJournal, IActorRef queryPermitter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IActorRef queryPermitter
parameter has been added to all queries - this is the actor responsible for managing token buckets for permitting individual queries to run.
Become(WaitingForQueryPermit); | ||
} | ||
|
||
protected bool WaitingForQueryPermit(object message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All queries now run with an execution flow that looks similar to this:
stateDiagram-v2
[*] --> Init: Request (from Akka.Streams)
Init --> RequestQueryPermit: RequestQueryStart
RequestQueryPermit --> Replay: QueryStartGranted
Replay --> Replay: ReplayedMessage
Replay --> Idle: RecoverySuccess (return token to permitter)
Idle --> RequestQueryPermit: Continue
At each stage in the query, we need permission from the QueryPermitter to continue - this is designed to help rate-limit the number of queries that can hit the database at any given time.
@@ -129,6 +157,7 @@ protected bool Replaying( object message ) | |||
Log.Error(failure.Cause, "event replay failed, due to [{0}]", failure.Cause.Message); | |||
Buffer.DeliverBuffer(TotalDemand); | |||
OnErrorThenStop(failure.Cause); | |||
QueryPermitter.Tell(ReturnQueryStart.Instance); // return token to permitter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return tokens when we fail and when we succeed - always return them after the database has completed the query, successfully or otherwise.
/// <summary> | ||
/// Request token from throttler | ||
/// </summary> | ||
internal sealed class RequestQueryStart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Message for requesting a token
/// <summary> | ||
/// Return token to throttler | ||
/// </summary> | ||
internal sealed class ReturnQueryStart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Token returned to pool.
/// <remarks> | ||
/// Works identically to the RecoveryPermitter built into Akka.Persistence. | ||
/// </remarks> | ||
internal sealed class QueryThrottler : ReceiveActor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this actor is almost an exact duplicate of the RecoveryPermitter
built into the base Akka.Persistence base class - that design has been working successfully for years to solve this problem during PeristentActor
recovery.
|
||
Receive<RequestQueryStart>(_ => | ||
{ | ||
Context.Watch(Sender); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Death watch requestors of tokens, so we can return used tokens in the event of dead requestors.
private void ReturnQueryPermit(IActorRef actorRef) | ||
{ | ||
_usedPermits--; | ||
Context.Unwatch(actorRef); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always unwatch someone who returns a permit.
_maxBufferSize = config.GetInt("max-buffer-size", 0); | ||
_maxConcurrentQueries = config.GetInt("max-concurrent-queries", 50); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this value configurable in HOCON - 50 by default.
Going to add some documentation for this feature in the "what's new" page - but we don't have a dedicated page for fine-tuning Akka.Persistence.Query today. |
Question regarding the token system, if queries are failing mid-way, will the available token eventually ran out because the failing queries never returned them? |
If the stream stage fails, the tokens are automatically returned when the actor kills itself. If the journal returns an error, we also return tokens then too @Arkatufus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Changes
Implements #6404 using the same design as Akka.Persistence's base
RecoveryPermitter
. Gets used on every single iteration of each query.Related: #6417, #6433
Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
Latest
dev
BenchmarksInclude data from the relevant benchmark prior to this change here.
This PR's Benchmarks
Include data from after this change here.