Skip to content

Commit

Permalink
cleanup delay logic
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jan 15, 2025
1 parent 41eb0c6 commit 1b67b23
Showing 1 changed file with 23 additions and 26 deletions.
49 changes: 23 additions & 26 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Psr\Log\LoggerInterface;
use Throwable;

use function array_key_exists;
use function count;
use function sprintf;

Expand All @@ -43,9 +42,6 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine

private readonly MessageLoader $messageLoader;

/** @var array<string, bool> */
private array $delaying = [];

public function __construct(
Store|MessageLoader $messageStore,
SubscriptionStore $subscriptionStore,
Expand Down Expand Up @@ -172,7 +168,6 @@ public function boot(

$this->processing = true;
$this->batching = [];
$this->delaying = [];

try {
$criteria ??= new SubscriptionEngineCriteria();
Expand Down Expand Up @@ -238,12 +233,27 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
}

if ($this->shouldDelay($subscription, $message)) {
$this->logger?->debug(
sprintf(
'Subscription Engine: Subscription "%s" is delayed, skip processing.',
$subscriptions->remove($subscription);

if ($subscription->runMode() === RunMode::Once) {
$subscription->finished();
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscription "%s" reached delayed message, set to finished.',
$subscription->id(),
),
);
));

continue;
}

$subscription->active();
$this->subscriptionManager->update($subscription);

$this->logger?->info(sprintf(
'Subscription Engine: Subscription "%s" reached delayed message, set to active.',
$subscription->id(),
));

continue;
}
Expand Down Expand Up @@ -356,7 +366,6 @@ public function run(

$this->processing = true;
$this->batching = [];
$this->delaying = [];

try {
$criteria ??= new SubscriptionEngineCriteria();
Expand Down Expand Up @@ -421,6 +430,8 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
}

if ($this->shouldDelay($subscription, $message)) {
$subscriptions->remove($subscription);

$this->logger?->debug(
sprintf(
'Subscription Engine: Subscription "%s" is delayed, skip processing.',
Expand Down Expand Up @@ -1148,23 +1159,15 @@ private function shouldCommitBatch(Subscription $subscription): bool

private function shouldDelay(Subscription $subscription, Message $message): bool
{
if (array_key_exists($subscription->id(), $this->delaying)) {
return $this->delaying[$subscription->id()];
}

$subscriber = $this->subscriber($subscription->id());

if (!$subscriber instanceof MetadataSubscriberAccessor) {
$this->delaying[$subscription->id()] = false;

return false;
}

$delay = $subscriber->metadata()->delay;

if ($delay === null) {
$this->delaying[$subscription->id()] = false;

return false;
}

Expand All @@ -1176,13 +1179,7 @@ private function shouldDelay(Subscription $subscription, Message $message): bool

$threshold = $this->clock->now()->sub($delay);

if ($recordedOn > $threshold) {
return false;
}

$this->delaying[$subscription->id()] = true;

return true;
return $recordedOn <= $threshold;
}

private function recordedOn(Message $message): DateTimeImmutable|null

Check failure on line 1185 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedInferredReturnType

src/Subscription/Engine/DefaultSubscriptionEngine.php:1185:52: MixedInferredReturnType: Could not verify return type 'DateTimeImmutable|null' for Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine::recordedOn (see https://psalm.dev/047)
Expand Down

0 comments on commit 1b67b23

Please sign in to comment.