Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add early out in subscription engine #650

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ cs: vendor

.PHONY: phpstan
phpstan: vendor ## run phpstan static code analyser
vendor/bin/phpstan analyse
php -d memory_limit=312M vendor/bin/phpstan analyse

.PHONY: phpstan-baseline
phpstan-baseline: vendor ## run phpstan static code analyser
vendor/bin/phpstan analyse --generate-baseline
php -d memory_limit=312M vendor/bin/phpstan analyse --generate-baseline

.PHONY: psalm
psalm: vendor ## run psalm static code analyser
Expand Down
7 changes: 7 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@
<code><![CDATA[$update1]]></code>
</PossiblyUndefinedArrayOffset>
</file>
<file src="tests/Unit/Subscription/Engine/SubscriptionManagerTest.php">
<InvalidArgument>
<code><![CDATA[$result]]></code>
<code><![CDATA[$subscriptions]]></code>
<code><![CDATA[$subscriptions]]></code>
</InvalidArgument>
</file>
<file src="tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php">
<DeprecatedMethod>
<code><![CDATA[group]]></code>
Expand Down
92 changes: 40 additions & 52 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
groups: $criteria->groups,
status: [Status::New],
),
function (array $subscriptions) use ($skipBooting): Result {
function (SubscriptionCollection $subscriptions) use ($skipBooting): Result {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.');

Expand Down Expand Up @@ -171,14 +171,14 @@
groups: $criteria->groups,
status: [Status::Booting],
),
function ($subscriptions) use ($limit): ProcessedResult {
function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.');

return new ProcessedResult(0, true);
}

$startIndex = $this->lowestSubscriptionPosition($subscriptions);
$startIndex = $subscriptions->lowestPosition();

$this->logger?->debug(
sprintf(
Expand All @@ -198,17 +198,15 @@
);

foreach ($stream as $message) {
$messageCounter++;

$index = $stream->index();

if ($index === null) {
throw new UnexpectedError('Stream index is null, this should not happen.');
}

foreach ($subscriptions as $subscription) {
if (!$subscription->isBooting()) {
continue;
}

if ($subscription->position() >= $index) {
$this->logger?->debug(
sprintf(
Expand All @@ -229,9 +227,17 @@
}

$errors[] = $error;
}

$messageCounter++;
$subscriptions->remove($subscription);

if (count($subscriptions) === 0) {

Check warning on line 233 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Identical": --- Original +++ New @@ @@ } $errors[] = $error; $subscriptions->remove($subscription); - if (count($subscriptions) === 0) { + if (count($subscriptions) !== 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); break 2; }
$this->logger?->info(
'Subscription Engine: No subscriptions in booting status, finish booting.',
);

break 2;

Check warning on line 238 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + break 1; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position for booting: %s', $index));

Check warning on line 238 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Break_": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + continue; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position for booting: %s', $index));
}
}

$this->logger?->debug(
sprintf(
Expand Down Expand Up @@ -265,6 +271,8 @@

if ($error) {
$errors[] = $error;

$subscriptions->remove($subscription);
}

$this->subscriptionManager->update($subscription);
Expand All @@ -275,10 +283,6 @@
$this->logger?->debug('Subscription Engine: End of stream for booting has been reached.');

foreach ($subscriptions as $subscription) {
if (!$subscription->isBooting()) {
continue;
}

if ($subscription->runMode() === RunMode::Once) {
$subscription->finished();
$this->subscriptionManager->update($subscription);
Expand Down Expand Up @@ -340,14 +344,14 @@
groups: $criteria->groups,
status: [Status::Active],
),
function (array $subscriptions) use ($limit): ProcessedResult {
function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.');

return new ProcessedResult(0, true);
}

$startIndex = $this->lowestSubscriptionPosition($subscriptions);
$startIndex = $subscriptions->lowestPosition();

$this->logger?->debug(
sprintf(
Expand All @@ -366,17 +370,15 @@
$stream = $this->messageStore->load($criteria);

foreach ($stream as $message) {
$messageCounter++;

$index = $stream->index();

if ($index === null) {
throw new UnexpectedError('Stream index is null, this should not happen.');
}

foreach ($subscriptions as $subscription) {
if (!$subscription->isActive()) {
continue;
}

if ($subscription->position() >= $index) {
$this->logger?->debug(
sprintf(
Expand All @@ -397,9 +399,17 @@
}

$errors[] = $error;
}

$messageCounter++;
$subscriptions->remove($subscription);

Check warning on line 403 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ continue; } $errors[] = $error; - $subscriptions->remove($subscription); + if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); break 2;

if (count($subscriptions) === 0) {

Check warning on line 405 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Identical": --- Original +++ New @@ @@ } $errors[] = $error; $subscriptions->remove($subscription); - if (count($subscriptions) === 0) { + if (count($subscriptions) !== 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); break 2; }
$this->logger?->info(
'Subscription Engine: No subscriptions in booting status, finish booting.',
);

break 2;

Check warning on line 410 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + break 1; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $index));

Check warning on line 410 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Break_": --- Original +++ New @@ @@ $subscriptions->remove($subscription); if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - break 2; + continue; } } $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $index));
}
}

$this->logger?->debug(sprintf(
'Subscription Engine: Current event stream position: %s',
Expand Down Expand Up @@ -427,6 +437,8 @@

if ($error) {
$errors[] = $error;

$subscriptions->remove($subscription);

Check warning on line 441 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ $error = $this->ensureCommitBatch($subscription, $endIndex); if ($error) { $errors[] = $error; - $subscriptions->remove($subscription); + } $this->subscriptionManager->update($subscription); }
}

$this->subscriptionManager->update($subscription);
Expand All @@ -435,10 +447,6 @@
}

foreach ($subscriptions as $subscription) {
if (!$subscription->isActive()) {
continue;
}

if ($subscription->runMode() !== RunMode::Once) {
continue;
}
Expand Down Expand Up @@ -481,7 +489,7 @@
groups: $criteria->groups,
status: [Status::Detached],
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
/** @var list<Error> $errors */
$errors = [];

Expand Down Expand Up @@ -570,7 +578,7 @@
ids: $criteria->ids,
groups: $criteria->groups,
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
/** @var list<Error> $errors */
$errors = [];

Expand Down Expand Up @@ -662,7 +670,7 @@
Status::Finished,
],
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());

Expand Down Expand Up @@ -725,7 +733,7 @@
Status::Error,
],
),
function (array $subscriptions): Result {
function (SubscriptionCollection $subscriptions): Result {
/** @var Subscription $subscription */
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
Expand Down Expand Up @@ -868,7 +876,7 @@
groups: $criteria->groups,
status: [Status::Active, Status::Paused, Status::Finished],
),
function (array $subscriptions): void {
function (SubscriptionCollection $subscriptions): void {
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());

Expand Down Expand Up @@ -898,7 +906,7 @@
groups: $criteria->groups,
status: [Status::Error],
),
function (array $subscriptions): void {
function (SubscriptionCollection $subscriptions): void {
/** @var Subscription $subscription */
foreach ($subscriptions as $subscription) {
$error = $subscription->subscriptionError();
Expand Down Expand Up @@ -941,7 +949,7 @@
{
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(),
function (array $subscriptions): void {
function (SubscriptionCollection $subscriptions): void {
$latestIndex = null;

foreach ($this->subscriberRepository->all() as $subscriber) {
Expand Down Expand Up @@ -986,26 +994,6 @@
return $stream->index() ?: 0;
}

/** @param list<Subscription> $subscriptions */
private function lowestSubscriptionPosition(array $subscriptions): int
{
$min = null;

foreach ($subscriptions as $subscription) {
if ($min !== null && $subscription->position() >= $min) {
continue;
}

$min = $subscription->position();
}

if ($min === null) {
return 0;
}

return $min;
}

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
Expand Down
67 changes: 67 additions & 0 deletions src/Subscription/Engine/SubscriptionCollection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Countable;
use IteratorAggregate;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Traversable;

use function array_filter;
use function array_values;
use function count;

/**
* @interal
* @implements IteratorAggregate<Subscription>
*/
final class SubscriptionCollection implements IteratorAggregate, Countable
{
/** @param list<Subscription> $subscriptions */
public function __construct(
private array $subscriptions = [],
) {
}

/** @return Traversable<Subscription> */
public function getIterator(): Traversable
{
yield from $this->subscriptions;
}

public function remove(Subscription $subscription): void
{
$this->subscriptions = array_values(
array_filter(
$this->subscriptions,
static fn (Subscription $s) => $s !== $subscription,
),
);
}

public function count(): int
{
return count($this->subscriptions);
}

public function lowestPosition(): int
{
$min = null;

foreach ($this->subscriptions as $subscription) {
if ($min !== null && $subscription->position() >= $min) {

Check warning on line 54 in src/Subscription/Engine/SubscriptionCollection.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "GreaterThanOrEqualTo": --- Original +++ New @@ @@ { $min = null; foreach ($this->subscriptions as $subscription) { - if ($min !== null && $subscription->position() >= $min) { + if ($min !== null && $subscription->position() > $min) { continue; } $min = $subscription->position();
continue;

Check warning on line 55 in src/Subscription/Engine/SubscriptionCollection.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "Continue_": --- Original +++ New @@ @@ $min = null; foreach ($this->subscriptions as $subscription) { if ($min !== null && $subscription->position() >= $min) { - continue; + break; } $min = $subscription->position(); }
}

$min = $subscription->position();
}

if ($min === null) {
return 0;
}

return $min;
}
}
14 changes: 11 additions & 3 deletions src/Subscription/Engine/SubscriptionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public function __construct(
}

/**
* @param Closure(list<Subscription>):T $closure
* @param Closure(SubscriptionCollection):T $closure
*
* @return T
*
Expand All @@ -42,7 +42,11 @@ public function findForUpdate(SubscriptionCriteria $criteria, Closure $closure):
{
if (!$this->subscriptionStore instanceof LockableSubscriptionStore) {
try {
return $closure($this->subscriptionStore->find($criteria));
return $closure(
new SubscriptionCollection(
$this->subscriptionStore->find($criteria),
),
);
} finally {
$this->flush();
}
Expand All @@ -52,7 +56,11 @@ public function findForUpdate(SubscriptionCriteria $criteria, Closure $closure):
/** @return T */
function () use ($closure, $criteria): mixed {
try {
return $closure($this->subscriptionStore->find($criteria));
return $closure(
new SubscriptionCollection(
$this->subscriptionStore->find($criteria),
),
);
} finally {
$this->flush();
}
Expand Down
Loading
Loading