Skip to content

Commit

Permalink
add subscription manager
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Sep 27, 2024
1 parent 534e192 commit 7be9347
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 102 deletions.
94 changes: 34 additions & 60 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Closure;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
Expand All @@ -13,7 +12,6 @@
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Status;
use Patchlevel\EventSourcing\Subscription\Store\LockableSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchSubscriberAccessor;
Expand All @@ -30,18 +28,21 @@

final class DefaultSubscriptionEngine implements SubscriptionEngine
{
private SubscriptionManager $subscriptionManager;

private bool $processing = false;

/** @var WeakMap<Subscription, true> */
private WeakMap $batching;

public function __construct(
private readonly Store $messageStore,
private readonly SubscriptionStore $subscriptionStore,
SubscriptionStore $subscriptionStore,
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly LoggerInterface|null $logger = null,
) {
$this->subscriptionManager = new SubscriptionManager($subscriptionStore);
$this->batching = new WeakMap();
}

Expand All @@ -56,7 +57,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk
$this->discoverNewSubscriptions();
$this->retrySubscriptions($criteria);

return $this->findForUpdate(
return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand Down Expand Up @@ -91,7 +92,7 @@ function (array $subscriptions) use ($skipBooting): Result {
$skipBooting ? $subscription->active() : $subscription->booting();
}

$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->debug(sprintf(
'Subscription Engine: Subscriber "%s" for "%s" has no setup method, set to %s.',
Expand All @@ -113,7 +114,7 @@ function (array $subscriptions) use ($skipBooting): Result {
$skipBooting ? $subscription->active() : $subscription->booting();
}

$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->debug(sprintf(
'Subscription Engine: For Subscriber "%s" for "%s" the setup method has been executed, set to %s.',
Expand Down Expand Up @@ -164,7 +165,7 @@ public function boot(
$this->discoverNewSubscriptions();
$this->retrySubscriptions($criteria);

return $this->findForUpdate(
return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand Down Expand Up @@ -276,11 +277,7 @@ function ($subscriptions) use ($limit): ProcessedResult {
$errors[] = $error;
}

if (!$subscription->isBooting()) {
continue; // why?
}

$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);
}
}
}
Expand All @@ -294,7 +291,7 @@ function ($subscriptions) use ($limit): ProcessedResult {

if ($subscription->runMode() === RunMode::Once) {
$subscription->finished();
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscription "%s" run only once and has been set to finished.',
Expand All @@ -305,7 +302,7 @@ function ($subscriptions) use ($limit): ProcessedResult {
}

$subscription->active();
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscription "%s" has been set to active after booting.',
Expand Down Expand Up @@ -346,7 +343,7 @@ public function run(
$this->markDetachedSubscriptions($criteria);
$this->retrySubscriptions($criteria);

return $this->findForUpdate(
return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand Down Expand Up @@ -455,7 +452,7 @@ function (array $subscriptions) use ($limit): ProcessedResult {
continue; // why?
}

$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);
}
}
}
Expand All @@ -470,7 +467,7 @@ function (array $subscriptions) use ($limit): ProcessedResult {
}

$subscription->finished();
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscription "%s" run only once and has been set to finished.',
Expand Down Expand Up @@ -501,7 +498,7 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): Resu

$this->logger?->info('Subscription Engine: Start teardown detached subscriptions.');

return $this->findForUpdate(
return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand All @@ -528,7 +525,7 @@ function (array $subscriptions): Result {
$teardownMethod = $subscriber->teardownMethod();

if (!$teardownMethod) {
$this->subscriptionStore->remove($subscription);
$this->subscriptionManager->remove($subscription);

$this->logger?->info(
sprintf(
Expand Down Expand Up @@ -568,7 +565,7 @@ function (array $subscriptions): Result {
continue;
}

$this->subscriptionStore->remove($subscription);
$this->subscriptionManager->remove($subscription);

$this->logger?->info(
sprintf(
Expand All @@ -591,7 +588,7 @@ public function remove(SubscriptionEngineCriteria|null $criteria = null): Result

$this->discoverNewSubscriptions();

return $this->findForUpdate(
return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand All @@ -604,7 +601,7 @@ function (array $subscriptions): Result {
$subscriber = $this->subscriber($subscription->id());

if (!$subscriber) {
$this->subscriptionStore->remove($subscription);
$this->subscriptionManager->remove($subscription);

$this->logger?->info(
sprintf(
Expand All @@ -619,7 +616,7 @@ function (array $subscriptions): Result {
$teardownMethod = $subscriber->teardownMethod();

if (!$teardownMethod) {
$this->subscriptionStore->remove($subscription);
$this->subscriptionManager->remove($subscription);

$this->logger?->info(
sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()),
Expand All @@ -646,7 +643,7 @@ function (array $subscriptions): Result {
);
}

$this->subscriptionStore->remove($subscription);
$this->subscriptionManager->remove($subscription);

$this->logger?->info(
sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()),
Expand All @@ -664,7 +661,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Re

$this->discoverNewSubscriptions();

return $this->findForUpdate(
return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand Down Expand Up @@ -696,7 +693,7 @@ function (array $subscriptions): Result {
$subscription->doRetry();
$subscription->resetRetry();

$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscriber "%s" for "%s" is reactivated.',
Expand All @@ -708,7 +705,7 @@ function (array $subscriptions): Result {
}

$subscription->active();
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscriber "%s" for "%s" is reactivated.',
Expand All @@ -728,7 +725,7 @@ public function pause(SubscriptionEngineCriteria|null $criteria = null): Result

$this->discoverNewSubscriptions();

return $this->findForUpdate(
return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand All @@ -755,7 +752,7 @@ function (array $subscriptions): Result {
}

$subscription->pause();
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscriber "%s" for "%s" is paused.',
Expand All @@ -776,7 +773,7 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):

$this->discoverNewSubscriptions();

return $this->subscriptionStore->find(
return $this->subscriptionManager->find(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand Down Expand Up @@ -866,7 +863,7 @@ private function subscriber(string $subscriberId): SubscriberAccessor|null

private function markDetachedSubscriptions(SubscriptionEngineCriteria $criteria): void
{
$this->findForUpdate(
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand All @@ -881,7 +878,7 @@ function (array $subscriptions): void {
}

$subscription->detached();
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(
sprintf(
Expand All @@ -896,7 +893,7 @@ function (array $subscriptions): void {

private function retrySubscriptions(SubscriptionEngineCriteria $criteria): void
{
$this->findForUpdate(
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
Expand Down Expand Up @@ -926,7 +923,7 @@ function (array $subscriptions): void {
}

$subscription->doRetry();
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

$this->logger?->info(
sprintf(
Expand All @@ -943,7 +940,7 @@ function (array $subscriptions): void {

private function discoverNewSubscriptions(): void
{
$this->findForUpdate(
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(),
function (array $subscriptions): void {
$latestIndex = null;
Expand All @@ -970,7 +967,7 @@ function (array $subscriptions): void {
$subscription->active();
}

$this->subscriptionStore->add($subscription);
$this->subscriptionManager->add($subscription);

$this->logger?->info(
sprintf(
Expand Down Expand Up @@ -1010,33 +1007,10 @@ private function lowestSubscriptionPosition(array $subscriptions): int
return $min;
}

/**
* @param Closure(list<Subscription>):T $closure
*
* @return T
*
* @template T
*/
private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure): mixed
{
if (!$this->subscriptionStore instanceof LockableSubscriptionStore) {
return $closure($this->subscriptionStore->find($criteria));
}

return $this->subscriptionStore->inLock(
/** @return T */
function () use ($closure, $criteria): mixed {
$subscriptions = $this->subscriptionStore->find($criteria);

return $closure($subscriptions);
},
);
}

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
$this->subscriptionStore->update($subscription);
$this->subscriptionManager->update($subscription);

if (!isset($this->batching[$subscription])) {
return;
Expand Down
Loading

0 comments on commit 7be9347

Please sign in to comment.