From 3ae0fd08541a66cdc76a9ab73a771a3f7f5e078d Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 20 Sep 2024 19:22:26 +0200 Subject: [PATCH] improve batching --- baseline.xml | 18 +- .../AttributeSubscriberMetadataFactory.php | 20 +- .../Subscriber/DuplicateBeginBatchMethod.php | 1 + .../Subscriber/DuplicateCommitBatchMethod.php | 1 + .../DuplicateRollbackBatchMethod.php | 1 + .../Engine/DefaultSubscriptionEngine.php | 218 ++++++++++-------- .../Projection/BatchProfileProjector.php | 8 +- .../Subscriber/ProfileProjection.php | 1 - 8 files changed, 152 insertions(+), 116 deletions(-) diff --git a/baseline.xml b/baseline.xml index cf017af90..5448dfd0e 100644 --- a/baseline.xml +++ b/baseline.xml @@ -1,5 +1,5 @@ - + @@ -102,6 +102,11 @@ + + + + + @@ -119,6 +124,9 @@ + + + @@ -174,6 +182,14 @@ + + + + + + + + diff --git a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php index 7d8e0981b..c29681c4f 100644 --- a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php +++ b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php @@ -112,17 +112,19 @@ public function metadata(string $subscriber): SubscriberMetadata $commitBatchMethod = $method->getName(); } - if ($method->getAttributes(RollbackBatch::class)) { - if ($rollbackBatchMethod !== null) { - throw new DuplicateBeginBatchMethod( - $subscriber, - $rollbackBatchMethod, - $method->getName(), - ); - } + if (!$method->getAttributes(RollbackBatch::class)) { + continue; + } - $rollbackBatchMethod = $method->getName(); + if ($rollbackBatchMethod !== null) { + throw new DuplicateBeginBatchMethod( + $subscriber, + $rollbackBatchMethod, + $method->getName(), + ); } + + $rollbackBatchMethod = $method->getName(); } $metadata = new SubscriberMetadata( diff --git a/src/Metadata/Subscriber/DuplicateBeginBatchMethod.php b/src/Metadata/Subscriber/DuplicateBeginBatchMethod.php index 1a275f9a9..d9a7f02a7 100644 --- a/src/Metadata/Subscriber/DuplicateBeginBatchMethod.php +++ b/src/Metadata/Subscriber/DuplicateBeginBatchMethod.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Metadata\Subscriber; use Patchlevel\EventSourcing\Metadata\MetadataException; + use function sprintf; final class DuplicateBeginBatchMethod extends MetadataException diff --git a/src/Metadata/Subscriber/DuplicateCommitBatchMethod.php b/src/Metadata/Subscriber/DuplicateCommitBatchMethod.php index 16c102321..aaa85937e 100644 --- a/src/Metadata/Subscriber/DuplicateCommitBatchMethod.php +++ b/src/Metadata/Subscriber/DuplicateCommitBatchMethod.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Metadata\Subscriber; use Patchlevel\EventSourcing\Metadata\MetadataException; + use function sprintf; final class DuplicateCommitBatchMethod extends MetadataException diff --git a/src/Metadata/Subscriber/DuplicateRollbackBatchMethod.php b/src/Metadata/Subscriber/DuplicateRollbackBatchMethod.php index 1cd8336a8..b48cabc45 100644 --- a/src/Metadata/Subscriber/DuplicateRollbackBatchMethod.php +++ b/src/Metadata/Subscriber/DuplicateRollbackBatchMethod.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Metadata\Subscriber; use Patchlevel\EventSourcing\Metadata\MetadataException; + use function sprintf; final class DuplicateRollbackBatchMethod extends MetadataException diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index e72dabd77..d95a8037e 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -22,6 +22,7 @@ use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; use Throwable; +use WeakMap; use function count; use function in_array; @@ -31,6 +32,9 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine { private bool $processing = false; + /** @var WeakMap */ + private WeakMap $batching; + public function __construct( private readonly Store $messageStore, private readonly SubscriptionStore $subscriptionStore, @@ -38,6 +42,7 @@ public function __construct( private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(), private readonly LoggerInterface|null $logger = null, ) { + $this->batching = new WeakMap(); } public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result @@ -181,12 +186,11 @@ function ($subscriptions) use ($limit): ProcessedResult { ), ); + /** @var list $errors */ + $errors = []; $stream = null; $messageCounter = 0; - /** @var list $errors */ - $errors = $this->beginBatch($subscriptions); - try { $stream = $this->messageStore->load( new Criteria(new FromIndexCriterion($startIndex)), @@ -236,10 +240,15 @@ function ($subscriptions) use ($limit): ProcessedResult { ); if ($limit !== null && $messageCounter >= $limit) { - $errors = array_merge( - $errors, - $this->commitBatch($subscriptions, $index), - ); + foreach ($subscriptions as $subscription) { + $error = $this->ensureCommitBatch($subscription, $index); + + if (!$error) { + continue; + } + + $errors[] = $error; + } $this->logger?->info( sprintf( @@ -259,15 +268,16 @@ function ($subscriptions) use ($limit): ProcessedResult { $endIndex = $stream?->index() ?: $startIndex; $stream?->close(); - $errors = array_merge( - $errors, - $this->commitBatch($subscriptions, $endIndex), - ); - if ($messageCounter > 0) { foreach ($subscriptions as $subscription) { + $error = $this->ensureCommitBatch($subscription, $endIndex); + + if ($error) { + $errors[] = $error; + } + if (!$subscription->isBooting()) { - continue; + continue; // why? } $this->subscriptionStore->update($subscription); @@ -358,12 +368,11 @@ function (array $subscriptions) use ($limit): ProcessedResult { ), ); + /** @var list $errors */ + $errors = []; $stream = null; $messageCounter = 0; - /** @var list $errors */ - $errors = $this->beginBatch($subscriptions); - try { $criteria = new Criteria(new FromIndexCriterion($startIndex)); $stream = $this->messageStore->load($criteria); @@ -410,10 +419,15 @@ function (array $subscriptions) use ($limit): ProcessedResult { )); if ($limit !== null && $messageCounter >= $limit) { - $errors = array_merge( - $errors, - $this->commitBatch($subscriptions, $index), - ); + foreach ($subscriptions as $subscription) { + $error = $this->ensureCommitBatch($subscription, $index); + + if (!$error) { + continue; + } + + $errors[] = $error; + } $this->logger?->info( sprintf( @@ -429,15 +443,16 @@ function (array $subscriptions) use ($limit): ProcessedResult { $endIndex = $stream?->index() ?: $startIndex; $stream?->close(); - $errors = array_merge( - $errors, - $this->commitBatch($subscriptions, $endIndex), - ); - if ($messageCounter > 0) { foreach ($subscriptions as $subscription) { + $error = $this->ensureCommitBatch($subscription, $endIndex); + + if ($error) { + $errors[] = $error; + } + if (!$subscription->isActive()) { - continue; + continue; // why? } $this->subscriptionStore->update($subscription); @@ -780,8 +795,7 @@ private function handleMessage(int $index, Message $message, Subscription $subsc $subscribeMethods = $subscriber->subscribeMethods($message->event()::class); if ($subscribeMethods === []) { - - if (!$subscriber instanceof BatchSubscriberAccessor || !$subscriber->batch()) { + if (!isset($this->batching[$subscription])) { $subscription->changePosition($index); } @@ -797,6 +811,12 @@ private function handleMessage(int $index, Message $message, Subscription $subsc return null; } + $error = $this->checkAndBeginBatch($subscription); + + if ($error) { + return $error; + } + try { foreach ($subscribeMethods as $subscribeMethod) { $subscribeMethod($message); @@ -821,7 +841,7 @@ private function handleMessage(int $index, Message $message, Subscription $subsc ); } - if (!$subscriber instanceof BatchSubscriberAccessor || !$subscriber->batch()) { + if (!isset($this->batching[$subscription])) { $subscription->changePosition($index); } @@ -1018,10 +1038,16 @@ private function handleError(Subscription $subscription, Throwable $throwable): $subscription->error($throwable); $this->subscriptionStore->update($subscription); + if (!isset($this->batching[$subscription])) { + return; + } + + unset($this->batching[$subscription]); + $subscriber = $this->subscriber($subscription->id()); - if (!$subscriber instanceof BatchSubscriberAccessor || !$subscriber->batch()) { - return; + if (!$subscriber instanceof BatchSubscriberAccessor) { + throw new UnexpectedError('should not happen'); } $rollbackBatchMethod = $subscriber->rollbackBatchMethod(); @@ -1042,97 +1068,89 @@ private function handleError(Subscription $subscription, Throwable $throwable): } } - /** - * @param list $subscriptions - * @return list - */ - private function commitBatch(array $subscriptions, int $index): array + private function ensureCommitBatch(Subscription $subscription, int $index): Error|null { - $errors = []; + if (!isset($this->batching[$subscription])) { + return null; + } - foreach ($subscriptions as $subscription) { - if ($subscription->isError()) { - continue; - } + unset($this->batching[$subscription]); - $subscriber = $this->subscriber($subscription->id()); + $subscriber = $this->subscriber($subscription->id()); - if (!$subscriber instanceof BatchSubscriberAccessor || !$subscriber->batch()) { - continue; - } + if (!$subscriber instanceof BatchSubscriberAccessor) { + throw new UnexpectedError('should not happen'); + } - $commitBatchMethod = $subscriber->commitBatchMethod(); + $commitBatchMethod = $subscriber->commitBatchMethod(); - if (!$commitBatchMethod) { - $subscription->changePosition($index); - continue; - } + if (!$commitBatchMethod) { + $subscription->changePosition($index); - try { - $commitBatchMethod(); - $subscription->changePosition($index); - } catch (Throwable $e) { - $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" has an error in the commit batch method: %s', - $subscriber::class, - $subscription->id(), - $e->getMessage(), - )); + return null; + } - $this->handleError($subscription, $e); + try { + $commitBatchMethod(); + $subscription->changePosition($index); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has an error in the commit batch method: %s', + $subscriber::class, + $subscription->id(), + $e->getMessage(), + )); - $errors[] = new Error( - $subscription->id(), - $e->getMessage(), - $e, - ); - } + $this->handleError($subscription, $e); + + return new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } - return $errors; + return null; } - /** - * @param list $subscriptions - * @return list - */ - private function beginBatch(array $subscriptions): array + private function checkAndBeginBatch(Subscription $subscription): Error|null { - $errors = []; + if (isset($this->batching[$subscription])) { + return null; + } - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscriber($subscription->id()); + $subscriber = $this->subscriber($subscription->id()); - if (!$subscriber instanceof BatchSubscriberAccessor || !$subscriber->batch()) { - continue; - } + if (!$subscriber instanceof BatchSubscriberAccessor || !$subscriber->batch()) { + return null; + } - $beginMethod = $subscriber->beginBatchMethod(); + $this->batching[$subscription] = true; + $beginMethod = $subscriber->beginBatchMethod(); - if (!$beginMethod) { - continue; - } + if (!$beginMethod) { + return null; + } - try { - $beginMethod(); - } catch (Throwable $e) { - $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" has an error in the begin batch method: %s', - $subscriber::class, - $subscription->id(), - $e->getMessage(), - )); + try { + $beginMethod(); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has an error in the begin batch method: %s', + $subscriber::class, + $subscription->id(), + $e->getMessage(), + )); - $this->handleError($subscription, $e); + $this->handleError($subscription, $e); - $errors[] = new Error( - $subscription->id(), - $e->getMessage(), - $e, - ); - } + return new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } - return $errors; + return null; } } diff --git a/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php b/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php index 0175d40c9..a771d22f9 100644 --- a/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php +++ b/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php @@ -24,9 +24,7 @@ final class BatchProfileProjector { use SubscriberUtil; - /** - * @var array - */ + /** @var array */ private array $nameChanged = []; public function __construct( @@ -61,7 +59,7 @@ public function onProfileCreated(ProfileCreated $profileCreated): void #[Subscribe(NameChanged::class)] public function onNameChanged(NameChanged $nameChanged, ProfileId $profileId): void { - $nameChanged[$profileId->toString()] = $nameChanged->name; + $this->nameChanged[$profileId->toString()] = $nameChanged->name; } public function table(): string @@ -79,7 +77,7 @@ public function beginBatch(): void public function commitBatch(): void { try { - $this->connection->transactional(function () { + $this->connection->transactional(function (): void { foreach ($this->nameChanged as $profileId => $name) { $this->connection->update( $this->table(), diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjection.php b/tests/Integration/Subscription/Subscriber/ProfileProjection.php index 429aba6af..a4f461d49 100644 --- a/tests/Integration/Subscription/Subscriber/ProfileProjection.php +++ b/tests/Integration/Subscription/Subscriber/ProfileProjection.php @@ -62,7 +62,6 @@ private function tableName(): string return 'projection_' . $this->subscriberId(); } - #[BeginBatch] public function beginBatch(): void {