diff --git a/src/Aggregate/AggregateHeader.php b/src/Aggregate/AggregateHeader.php index 9fe5f1ec3..6b23f1deb 100644 --- a/src/Aggregate/AggregateHeader.php +++ b/src/Aggregate/AggregateHeader.php @@ -22,6 +22,6 @@ public function __construct( public function streamName(): string { - return StreamNameTranslator::streamName($this->aggregateName, $this->aggregateId); + return $this->aggregateName . '-' . $this->aggregateId; } } diff --git a/src/Aggregate/StreamNameTranslator.php b/src/Aggregate/StreamNameTranslator.php deleted file mode 100644 index bb35f6a90..000000000 --- a/src/Aggregate/StreamNameTranslator.php +++ /dev/null @@ -1,40 +0,0 @@ -streamName !== null) { - return str_replace('{id}', $aggregateId, $aggregate->streamName); - } - - return $aggregate . '-' . $aggregateId; - } - - public static function aggregateId(string $stream): string - { - $pos = strpos($stream, '-'); - - if ($pos === false) { - throw new InvalidAggregateStreamName($stream); - } - - return substr($stream, $pos + 1); - } -} diff --git a/src/Console/Command/ShowAggregateCommand.php b/src/Console/Command/ShowAggregateCommand.php index 1b71c30ce..6cb5fe716 100644 --- a/src/Console/Command/ShowAggregateCommand.php +++ b/src/Console/Command/ShowAggregateCommand.php @@ -4,11 +4,12 @@ namespace Patchlevel\EventSourcing\Console\Command; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; +use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadataFactory; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; +use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootMetadataFactory; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion; use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion; @@ -38,6 +39,7 @@ public function __construct( private readonly EventSerializer $eventSerializer, private readonly HeadersSerializer $headersSerializer, private readonly AggregateRootRegistry $aggregateRootRegistry, + private readonly AggregateRootMetadataFactory $aggregateRootMetadataFactory = new AttributeAggregateRootMetadataFactory(), ) { parent::__construct(); } @@ -65,10 +67,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int } $id = InputHelper::nullableString($input->getArgument('id')); - if ($id === null) { - $question = new Question('Enter the aggregate id'); - $id = InputHelper::string($console->askQuestion($question)); - } if (!$this->aggregateRootRegistry->hasAggregateName($aggregate)) { $console->error(sprintf('aggregate type "%s" not exists', $aggregate)); @@ -77,14 +75,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int } if ($this->store instanceof StreamStore) { + $aggregateClass = $this->aggregateRootRegistry->aggregateClass($aggregate); + $streamName = $this->aggregateRootMetadataFactory->metadata($aggregateClass)->streamName($id); + $stream = $this->store->load( new Criteria( - new StreamCriterion( - StreamNameTranslator::streamName($aggregate, $id), - ), + new StreamCriterion($streamName), ), ); } else { + if ($id === null) { + $question = new Question('Enter the aggregate id'); + $id = InputHelper::string($console->askQuestion($question)); + } + $stream = $this->store->load( new Criteria( new AggregateNameCriterion($aggregate), diff --git a/src/Console/Command/WatchCommand.php b/src/Console/Command/WatchCommand.php index e7a82af6e..91fc67af8 100644 --- a/src/Console/Command/WatchCommand.php +++ b/src/Console/Command/WatchCommand.php @@ -4,7 +4,6 @@ namespace Patchlevel\EventSourcing\Console\Command; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; @@ -12,7 +11,6 @@ use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\StreamStore; use Patchlevel\EventSourcing\Store\SubscriptionStore; use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; @@ -88,24 +86,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int $criteriaBuilder = new CriteriaBuilder(); - if ($stream !== null) { - $criteriaBuilder->streamName($stream); - } - - if ($this->store instanceof StreamStore) { - if ($aggregate !== null || $aggregateId !== null) { - if ($aggregate === null || $aggregateId === null) { - $console->error('You must provide both aggregate and aggregate-id or none of them'); - - return 1; - } - - $criteriaBuilder->streamName(StreamNameTranslator::streamName($aggregate, $aggregateId)); - } - } else { - $criteriaBuilder->aggregateName($aggregate); - $criteriaBuilder->aggregateId($aggregateId); - } + $criteriaBuilder->streamName($stream); + $criteriaBuilder->aggregateName($aggregate); + $criteriaBuilder->aggregateId($aggregateId); $criteria = $criteriaBuilder->build(); diff --git a/src/Metadata/AggregateRoot/AggregateRootMetadata.php b/src/Metadata/AggregateRoot/AggregateRootMetadata.php index 45ada1f0d..4072e75f5 100644 --- a/src/Metadata/AggregateRoot/AggregateRootMetadata.php +++ b/src/Metadata/AggregateRoot/AggregateRootMetadata.php @@ -6,9 +6,14 @@ use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use function str_contains; +use function str_replace; + /** @template T of AggregateRoot */ final class AggregateRootMetadata { + public readonly string $streamName; + public function __construct( /** @var class-string */ public readonly string $className, @@ -22,7 +27,21 @@ public function __construct( public readonly Snapshot|null $snapshot, /** @var list */ public readonly array $childAggregates = [], - public readonly string|null $streamName = null, + string|null $streamName = null, ) { + $this->streamName = $streamName ?? $this->name . '-{id}'; + } + + public function streamName(string|null $aggregateId = null): string + { + if ($aggregateId === null) { + if (str_contains($this->streamName, '{id}')) { + throw new AggregateIdMissing($this->className); + } + + return $this->streamName; + } + + return str_replace('{id}', $aggregateId, $this->streamName); } } diff --git a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php index 0d3ab327f..e622b4910 100644 --- a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php +++ b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php @@ -51,7 +51,6 @@ public function metadata(string $aggregate): AggregateRootMetadata [$suppressEvents, $suppressAll] = $this->findSuppressMissingApply($reflectionClass); $applyMethods = $this->findApplyMethods($reflectionClass, $aggregate, $childAggregates); $snapshot = $this->findSnapshot($reflectionClass); - $streamName = $this->findStreamName($reflectionClass); $metadata = new AggregateRootMetadata( $aggregate, @@ -62,7 +61,7 @@ public function metadata(string $aggregate): AggregateRootMetadata $suppressAll, $snapshot, array_map(static fn (array $list) => $list[0], $childAggregates), - $streamName ?? $aggregateName . '-{id}', + $this->findStreamName($reflectionClass), ); $this->aggregateMetadata[$aggregate] = $metadata; diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index 9971f5e9a..c17b3c4c9 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -7,7 +7,6 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Aggregate\AggregateRoot; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Message\Message; @@ -113,7 +112,7 @@ public function load(AggregateRootId $id): AggregateRoot if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) + ->streamName($this->metadata->streamName($id->toString())) ->archived(false) ->build(); } else { @@ -178,7 +177,7 @@ public function has(AggregateRootId $id): bool { if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) + ->streamName($this->metadata->streamName($id->toString())) ->build(); } else { $criteria = (new CriteriaBuilder()) @@ -242,7 +241,7 @@ public function save(AggregateRoot $aggregate): void $clock = $this->clock; $aggregateName = $this->metadata->name; - $streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($this->metadata, $aggregateId) : null; + $streamName = $this->useStreamHeader ? $this->metadata->streamName($aggregateId) : null; $messages = array_map( static function (object $event) use ( @@ -337,7 +336,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id): if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) + ->streamName($this->metadata->streamName($id->toString())) ->fromPlayhead($aggregate->playhead()) ->build(); } else { diff --git a/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php index ef56019c4..2ed963cfd 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php @@ -6,11 +6,8 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; -use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; -use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use function class_exists; use function is_a; @@ -21,19 +18,9 @@ public function resolve(ArgumentMetadata $argument, Message $message): Aggregate { /** @var class-string $class */ $class = $argument->type; + $id = $message->header(AggregateHeader::class)->aggregateId; - try { - $id = $message->header(AggregateHeader::class)->aggregateId; - - return $class::fromString($id); - } catch (HeaderNotFound) { - // do nothing - } - - $stream = $message->header(StreamNameHeader::class)->streamName; - $aggregateId = StreamNameTranslator::aggregateId($stream); - - return $class::fromString($aggregateId); + return $class::fromString($id); } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/tests/Integration/ChildAggregate/Events/NameChanged.php b/tests/Integration/ChildAggregate/Events/NameChanged.php index 75c85e6f5..3e5c665d5 100644 --- a/tests/Integration/ChildAggregate/Events/NameChanged.php +++ b/tests/Integration/ChildAggregate/Events/NameChanged.php @@ -5,11 +5,13 @@ namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events; use Patchlevel\EventSourcing\Attribute\Event; +use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId; #[Event('profile.name_changed')] final class NameChanged { public function __construct( + public ProfileId $profileId, public string $name, ) { } diff --git a/tests/Integration/ChildAggregate/PersonalInformation.php b/tests/Integration/ChildAggregate/PersonalInformation.php index d1e83c520..dfee9bf16 100644 --- a/tests/Integration/ChildAggregate/PersonalInformation.php +++ b/tests/Integration/ChildAggregate/PersonalInformation.php @@ -41,6 +41,6 @@ public function name(): string public function changeName(string $name): void { - $this->recordThat(new NameChanged($name)); + $this->recordThat(new NameChanged($this->id, $name)); } } diff --git a/tests/Integration/ChildAggregate/Projection/ProfileProjector.php b/tests/Integration/ChildAggregate/Projection/ProfileProjector.php index 57dd12331..7ef232458 100644 --- a/tests/Integration/ChildAggregate/Projection/ProfileProjector.php +++ b/tests/Integration/ChildAggregate/Projection/ProfileProjector.php @@ -12,7 +12,6 @@ use Patchlevel\EventSourcing\Attribute\Teardown; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; -use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId; #[Projector('profile-1')] final class ProfileProjector @@ -52,12 +51,12 @@ public function handleProfileCreated(ProfileCreated $profileCreated): void } #[Subscribe(NameChanged::class)] - public function handleNameChanged(NameChanged $nameChanged, ProfileId $profileId): void + public function handleNameChanged(NameChanged $nameChanged): void { $this->connection->update( 'projection_profile', ['name' => $nameChanged->name], - ['id' => $profileId->toString()], + ['id' => $nameChanged->profileId->toString()], ); } }