Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add batch subscription #632

Merged
merged 12 commits into from
Oct 15, 2024
Merged

add batch subscription #632

merged 12 commits into from
Oct 15, 2024

Conversation

DavidBadura
Copy link
Member

@DavidBadura DavidBadura commented Sep 19, 2024

This PR allows batching of events into the subscriber. This can provide significant performance benefits depending on how the user implements it. A new interface is added called BatchableSubscriber, which gives you several methods to control exactly what should happen. The subscriber engine then takes care of the rest.

Here is an example:

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;

#[Projector('profile_1')]
final class MigrationSubscriber implements BatchableSubscriber
{
    public function __construct(
        private readonly Connection $connection,
    ) {
    }

    /** @var array<string, int> */
    private array $nameChanged = [];

    #[Subscribe(NameChanged::class)]
    public function handleNameChanged(NameChanged $event): void
    {
        $this->nameChanged[$event->userId] = $event->name;
    }

    public function beginBatch(): void
    {
        $this->nameChanged = [];
        $this->connection->beginTransaction();
    }

    public function commitBatch(): void
    {
        foreach ($this->nameChanged as $userId => $name) {
            $this->connection->executeStatement(
                'UPDATE user SET name = :name WHERE id = :id',
                ['name' => $name, 'id' => $userId],
            );
        }

        $this->connection->commit();
        $this->nameChanged = [];
    }

    public function rollbackBatch(): void
    {
        $this->connection->rollBack();
    }

    public function forceCommit(): bool
    {
        return count($this->nameChanged) > 1000;
    }
}

@DavidBadura DavidBadura added this to the 3.5.0 milestone Sep 19, 2024
Copy link

github-actions bot commented Sep 19, 2024

Hello 👋

here is the most recent benchmark result:

SubscriptionEngineBench
=======================

+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
|                           | time (kde mode)                               | memory                                     |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
| subject                   | Tag: <current>  | Tag: base       | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
| benchHandle10000Events () | 3.004s (±0.00%) | 3.019s (±0.00%) | -0.49%    | 34.736mb        | 34.744mb   | -0.02%      |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+

SubscriptionEngineBatchBench
============================

+---------------------------+-------------------+-----------+-----------------+-------------+
|                           | time (kde mode)               | memory                        |
+---------------------------+-------------------+-----------+-----------------+-------------+
| subject                   | Tag: <current>    | time-diff | Tag: <current>  | memory-diff |
+---------------------------+-------------------+-----------+-----------------+-------------+
| benchHandle10000Events () | 68.244ms (±0.00%) | +0.00%    | 34.234mb        | +0.00%      |
+---------------------------+-------------------+-----------+-----------------+-------------+

SimpleSetupStreamStoreBench
===========================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 873.500μs (±0.00%) | 876.900μs (±0.00%) | -0.39%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchLoad10000Events ()                | 47.846ms (±0.00%)  | 47.845ms (±0.00%)  | +0.00%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave1Event ()                     | 888.000μs (±0.00%) | 957.900μs (±0.00%) | -7.30%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave10000Events ()                | 203.598ms (±0.00%) | 212.386ms (±0.00%) | -4.14%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave10000Aggregates ()            | 7.412s (±0.00%)    | 7.529s (±0.00%)    | -1.55%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 4.627s (±0.00%)    | 4.620s (±0.00%)    | +0.16%    | 34.744mb        | 34.744mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SnapshotsBench
==============

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad10000EventsMissingSnapshot () | 48.964ms (±0.00%)  | 47.953ms (±0.00%)  | +2.11%    | 33.835mb        | 33.835mb   | 0.00%       |
| benchLoad10000Events ()                | 897.300μs (±0.00%) | 893.600μs (±0.00%) | +0.41%    | 33.835mb        | 33.835mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SplitStreamBench
================

+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                         | time (kde mode)                                     | memory                                     |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                 | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad10000Events () | 4.344ms (±0.00%)   | 4.371ms (±0.00%)   | -0.61%    | 37.072mb        | 37.072mb   | 0.00%       |
| benchSave10000Events () | 338.018ms (±0.00%) | 343.986ms (±0.00%) | -1.73%    | 37.144mb        | 37.144mb   | -0.00%      |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

PersonalDataBench
=================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 925.100μs (±0.00%) | 871.800μs (±0.00%) | +6.11%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchLoad10000Events ()                | 80.437ms (±0.00%)  | 85.200ms (±0.00%)  | -5.59%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchSave1Event ()                     | 1.338ms (±0.00%)   | 1.338ms (±0.00%)   | +0.03%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchSave10000Events ()                | 241.396ms (±0.00%) | 243.336ms (±0.00%) | -0.80%    | 34.897mb        | 34.897mb   | 0.00%       |
| benchSave10000Aggregates ()            | 11.427s (±0.00%)   | 11.401s (±0.00%)   | +0.23%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 8.575s (±0.00%)    | 8.614s (±0.00%)    | -0.45%    | 35.396mb        | 35.396mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SimpleSetupBench
================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 935.600μs (±0.00%) | 918.800μs (±0.00%) | +1.83%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchLoad10000Events ()                | 48.442ms (±0.00%)  | 50.128ms (±0.00%)  | -3.36%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave1Event ()                     | 910.200μs (±0.00%) | 954.700μs (±0.00%) | -4.66%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave10000Events ()                | 214.159ms (±0.00%) | 213.322ms (±0.00%) | +0.39%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave10000Aggregates ()            | 7.351s (±0.00%)    | 7.460s (±0.00%)    | -1.47%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 4.658s (±0.00%)    | 4.636s (±0.00%)    | +0.46%    | 33.765mb        | 33.765mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

This comment gets update everytime a new commit comes in!

@DavidBadura DavidBadura force-pushed the batch-subscription branch 3 times, most recently from a5cb728 to 3ae0fd0 Compare September 20, 2024 17:37
@DavidBadura DavidBadura force-pushed the batch-subscription branch 5 times, most recently from ea08a3a to 0f6603d Compare October 4, 2024 16:51
@DavidBadura DavidBadura added the enhancement New feature or request label Oct 5, 2024
@DavidBadura DavidBadura merged commit 9c96e5a into 3.5.x Oct 15, 2024
39 of 40 checks passed
@DavidBadura DavidBadura deleted the batch-subscription branch October 15, 2024 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants