Skip to content

Commit

Permalink
add subscriber delay
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 5, 2024
1 parent 13b647a commit 8275581
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/Attribute/Delay.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;
use DateInterval;
use InvalidArgumentException;

#[Attribute(Attribute::TARGET_CLASS)]
final class Delay
{
public readonly DateInterval $delay;

public function __construct(
string $dateString,
) {
$interval = DateInterval::createFromDateString($dateString);

if ($interval === false) {
throw new InvalidArgumentException('Invalid date string');
}

$this->delay = $interval;
}
}
16 changes: 16 additions & 0 deletions src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Patchlevel\EventSourcing\Metadata\Subscriber;

use DateInterval;
use Patchlevel\EventSourcing\Attribute\Delay;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
Expand Down Expand Up @@ -87,6 +89,7 @@ public function metadata(string $subscriber): SubscriberMetadata
$subscribeMethods,
$setupMethod,
$teardownMethod,
$this->delay($reflector),
);

$this->subscriberMetadata[$subscriber] = $metadata;
Expand Down Expand Up @@ -128,4 +131,17 @@ private function subscribeMethod(ReflectionMethod $method): SubscribeMethodMetad
$arguments,
);
}

private function delay(ReflectionClass $reflector): DateInterval|null
{
$attributes = $reflector->getAttributes(Delay::class);

if ($attributes === []) {
return null;
}

$instance = $attributes[0]->newInstance();

return $instance->delay;
}
}
2 changes: 2 additions & 0 deletions src/Metadata/Subscriber/SubscriberMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Patchlevel\EventSourcing\Metadata\Subscriber;

use DateInterval;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Subscription;

Expand All @@ -17,6 +18,7 @@ public function __construct(
public readonly array $subscribeMethods = [],
public readonly string|null $setupMethod = null,
public readonly string|null $teardownMethod = null,
public readonly DateInterval|null $delay = null,
) {
}
}
84 changes: 84 additions & 0 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@

namespace Patchlevel\EventSourcing\Subscription\Engine;

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Status;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\RealSubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
use Throwable;

use function array_key_exists;
use function count;
use function in_array;
use function sprintf;
Expand All @@ -35,12 +43,16 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine
/** @var array<string, BatchableSubscriber> */
private array $batching = [];

/** @var array<string, bool> */
private array $delaying = [];

public function __construct(
private readonly Store $messageStore,
SubscriptionStore $subscriptionStore,
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly LoggerInterface|null $logger = null,
private readonly ClockInterface $clock = new SystemClock(),
) {
$this->subscriptionManager = new SubscriptionManager($subscriptionStore);
}
Expand Down Expand Up @@ -154,6 +166,7 @@ public function boot(

$this->processing = true;
$this->batching = [];
$this->delaying = [];

try {
$criteria ??= new SubscriptionEngineCriteria();
Expand Down Expand Up @@ -222,6 +235,17 @@ function ($subscriptions) use ($limit): ProcessedResult {
continue;
}

if ($this->shouldDelay($subscription, $message)) {
$this->logger?->debug(
sprintf(
'Subscription Engine: Subscription "%s" is delayed, skip processing.',
$subscription->id(),
),
);

continue;
}

$error = $this->handleMessage($index, $message, $subscription);

if (!$error) {
Expand Down Expand Up @@ -324,6 +348,7 @@ public function run(

$this->processing = true;
$this->batching = [];
$this->delaying = [];

try {
$criteria ??= new SubscriptionEngineCriteria();
Expand Down Expand Up @@ -390,6 +415,17 @@ function (array $subscriptions) use ($limit): ProcessedResult {
continue;
}

if ($this->shouldDelay($subscription, $message)) {
$this->logger?->debug(
sprintf(
'Subscription Engine: Subscription "%s" is delayed, skip processing.',
$subscription->id(),
),
);

continue;
}

$error = $this->handleMessage($index, $message, $subscription);

if (!$error) {
Expand Down Expand Up @@ -1126,4 +1162,52 @@ private function shouldCommitBatch(Subscription $subscription): bool

return $this->batching[$subscription->id()]->forceCommit();
}

private function shouldDelay(Subscription $subscription, Message $message): bool
{
if (array_key_exists($subscription->id(), $this->delaying)) {
return $this->delaying[$subscription->id()];
}

$subscriber = $this->subscriber($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
$this->delaying[$subscription->id()] = false;

return false;
}

$delay = $subscriber->metadata()->delay;

if ($delay === null) {
$this->delaying[$subscription->id()] = false;

return false;
}

$recordedOn = $this->recordedOn($message);

if ($recordedOn === null) {
return false;
}

$threshold = $this->clock->now()->sub($delay);

if ($recordedOn > $threshold) {
return false;
}

$this->delaying[$subscription->id()] = true;

return true;
}

private function recordedOn(Message $message): DateTimeImmutable|null
{
try {
return $message->header(AggregateHeader::class)->recordedOn;
} catch (HeaderNotFound) {
return $message->header(StreamHeader::class)->recordedOn;
}
}
}

0 comments on commit 8275581

Please sign in to comment.