Skip to content

Commit

Permalink
use stream name everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 18, 2024
1 parent 13d9b9c commit fc2706c
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 96 deletions.
2 changes: 1 addition & 1 deletion src/Aggregate/AggregateHeader.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ public function __construct(

public function streamName(): string
{
return StreamNameTranslator::streamName($this->aggregateName, $this->aggregateId);
return $this->aggregateName . '-' . $this->aggregateId;
}
}
40 changes: 0 additions & 40 deletions src/Aggregate/StreamNameTranslator.php

This file was deleted.

20 changes: 12 additions & 8 deletions src/Console/Command/ShowAggregateCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator;
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadataFactory;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootMetadataFactory;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
Expand Down Expand Up @@ -38,6 +39,7 @@ public function __construct(
private readonly EventSerializer $eventSerializer,
private readonly HeadersSerializer $headersSerializer,
private readonly AggregateRootRegistry $aggregateRootRegistry,
private readonly AggregateRootMetadataFactory $aggregateRootMetadataFactory = new AttributeAggregateRootMetadataFactory(),
) {
parent::__construct();
}
Expand Down Expand Up @@ -65,10 +67,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}

$id = InputHelper::nullableString($input->getArgument('id'));
if ($id === null) {
$question = new Question('Enter the aggregate id');
$id = InputHelper::string($console->askQuestion($question));
}

if (!$this->aggregateRootRegistry->hasAggregateName($aggregate)) {
$console->error(sprintf('aggregate type "%s" not exists', $aggregate));
Expand All @@ -77,14 +75,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}

if ($this->store instanceof StreamStore) {
$aggregateClass = $this->aggregateRootRegistry->aggregateClass($aggregate);
$streamName = $this->aggregateRootMetadataFactory->metadata($aggregateClass)->streamName($id);

$stream = $this->store->load(
new Criteria(
new StreamCriterion(
StreamNameTranslator::streamName($aggregate, $id),
),
new StreamCriterion($streamName),
),
);
} else {
if ($id === null) {
$question = new Question('Enter the aggregate id');
$id = InputHelper::string($console->askQuestion($question));
}

$stream = $this->store->load(
new Criteria(
new AggregateNameCriterion($aggregate),
Expand Down
23 changes: 3 additions & 20 deletions src/Console/Command/WatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator;
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\StreamStore;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
Expand Down Expand Up @@ -88,24 +86,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$criteriaBuilder = new CriteriaBuilder();

if ($stream !== null) {
$criteriaBuilder->streamName($stream);
}

if ($this->store instanceof StreamStore) {
if ($aggregate !== null || $aggregateId !== null) {
if ($aggregate === null || $aggregateId === null) {
$console->error('You must provide both aggregate and aggregate-id or none of them');

return 1;
}

$criteriaBuilder->streamName(StreamNameTranslator::streamName($aggregate, $aggregateId));
}
} else {
$criteriaBuilder->aggregateName($aggregate);
$criteriaBuilder->aggregateId($aggregateId);
}
$criteriaBuilder->streamName($stream);
$criteriaBuilder->aggregateName($aggregate);
$criteriaBuilder->aggregateId($aggregateId);

$criteria = $criteriaBuilder->build();

Expand Down
21 changes: 20 additions & 1 deletion src/Metadata/AggregateRoot/AggregateRootMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

use function str_contains;
use function str_replace;

/** @template T of AggregateRoot */
final class AggregateRootMetadata
{
public readonly string $streamName;

public function __construct(
/** @var class-string<T> */
public readonly string $className,
Expand All @@ -22,7 +27,21 @@ public function __construct(
public readonly Snapshot|null $snapshot,
/** @var list<string> */
public readonly array $childAggregates = [],
public readonly string|null $streamName = null,
string|null $streamName = null,
) {
$this->streamName = $streamName ?? $this->name . '-{id}';
}

public function streamName(string|null $aggregateId = null): string
{
if ($aggregateId === null) {
if (str_contains($this->streamName, '{id}')) {
throw new AggregateIdMissing($this->className);

Check failure on line 39 in src/Metadata/AggregateRoot/AggregateRootMetadata.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Instantiated class Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateIdMissing not found.

Check failure on line 39 in src/Metadata/AggregateRoot/AggregateRootMetadata.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Throwing object of an unknown class Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateIdMissing.

Check failure on line 39 in src/Metadata/AggregateRoot/AggregateRootMetadata.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

UndefinedClass

src/Metadata/AggregateRoot/AggregateRootMetadata.php:39:27: UndefinedClass: Class, interface or enum named Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateIdMissing does not exist (see https://psalm.dev/019)
}

return $this->streamName;
}

return str_replace('{id}', $aggregateId, $this->streamName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public function metadata(string $aggregate): AggregateRootMetadata
[$suppressEvents, $suppressAll] = $this->findSuppressMissingApply($reflectionClass);
$applyMethods = $this->findApplyMethods($reflectionClass, $aggregate, $childAggregates);
$snapshot = $this->findSnapshot($reflectionClass);
$streamName = $this->findStreamName($reflectionClass);

$metadata = new AggregateRootMetadata(
$aggregate,
Expand All @@ -62,7 +61,7 @@ public function metadata(string $aggregate): AggregateRootMetadata
$suppressAll,
$snapshot,
array_map(static fn (array $list) => $list[0], $childAggregates),
$streamName ?? $aggregateName . '-{id}',
$this->findStreamName($reflectionClass),
);

$this->aggregateMetadata[$aggregate] = $metadata;
Expand Down
9 changes: 4 additions & 5 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Message;
Expand Down Expand Up @@ -113,7 +112,7 @@ public function load(AggregateRootId $id): AggregateRoot

if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->streamName($this->metadata->streamName($id->toString()))
->archived(false)
->build();
} else {
Expand Down Expand Up @@ -178,7 +177,7 @@ public function has(AggregateRootId $id): bool
{
if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->streamName($this->metadata->streamName($id->toString()))
->build();
} else {
$criteria = (new CriteriaBuilder())
Expand Down Expand Up @@ -242,7 +241,7 @@ public function save(AggregateRoot $aggregate): void
$clock = $this->clock;

$aggregateName = $this->metadata->name;
$streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($this->metadata, $aggregateId) : null;
$streamName = $this->useStreamHeader ? $this->metadata->streamName($aggregateId) : null;

$messages = array_map(
static function (object $event) use (
Expand Down Expand Up @@ -337,7 +336,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id):

if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->streamName($this->metadata->streamName($id->toString()))
->fromPlayhead($aggregate->playhead())
->build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;

use function class_exists;
use function is_a;
Expand All @@ -21,19 +18,9 @@ public function resolve(ArgumentMetadata $argument, Message $message): Aggregate
{
/** @var class-string<AggregateRootId> $class */
$class = $argument->type;
$id = $message->header(AggregateHeader::class)->aggregateId;

try {
$id = $message->header(AggregateHeader::class)->aggregateId;

return $class::fromString($id);
} catch (HeaderNotFound) {
// do nothing
}

$stream = $message->header(StreamNameHeader::class)->streamName;
$aggregateId = StreamNameTranslator::aggregateId($stream);

return $class::fromString($aggregateId);
return $class::fromString($id);
}

public function support(ArgumentMetadata $argument, string $eventClass): bool
Expand Down
2 changes: 2 additions & 0 deletions tests/Integration/ChildAggregate/Events/NameChanged.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events;

use Patchlevel\EventSourcing\Attribute\Event;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId;

#[Event('profile.name_changed')]
final class NameChanged
{
public function __construct(
public ProfileId $profileId,
public string $name,
) {
}
Expand Down
2 changes: 1 addition & 1 deletion tests/Integration/ChildAggregate/PersonalInformation.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public function name(): string

public function changeName(string $name): void
{
$this->recordThat(new NameChanged($name));
$this->recordThat(new NameChanged($this->id, $name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId;

#[Projector('profile-1')]
final class ProfileProjector
Expand Down Expand Up @@ -52,12 +51,12 @@ public function handleProfileCreated(ProfileCreated $profileCreated): void
}

#[Subscribe(NameChanged::class)]
public function handleNameChanged(NameChanged $nameChanged, ProfileId $profileId): void
public function handleNameChanged(NameChanged $nameChanged): void
{
$this->connection->update(
'projection_profile',
['name' => $nameChanged->name],
['id' => $profileId->toString()],
['id' => $nameChanged->profileId->toString()],
);
}
}

0 comments on commit fc2706c

Please sign in to comment.