From 58b7c3ef67bf673e260de4a5c46e97a0d1b0d3ee Mon Sep 17 00:00:00 2001 From: Chris Date: Sun, 25 Sep 2022 22:08:14 +1300 Subject: [PATCH] Implement EventStoreManagement and extract interface (#11) --- src/DynamoDbEventStore.php | 18 +++++++++-- src/DynamoDbEventStoreInterface.php | 15 +++++++++ src/InputBuilder.php | 8 +++++ tests/DynamoDbEventStoreManagementTest.php | 37 ++++++++++++++++++++++ 4 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 src/DynamoDbEventStoreInterface.php create mode 100644 tests/DynamoDbEventStoreManagementTest.php diff --git a/src/DynamoDbEventStore.php b/src/DynamoDbEventStore.php index d2b372a..7ca5bc6 100644 --- a/src/DynamoDbEventStore.php +++ b/src/DynamoDbEventStore.php @@ -8,11 +8,12 @@ use AsyncAws\DynamoDb\Exception\ConditionalCheckFailedException; use Broadway\Domain\DomainEventStream; use Broadway\Domain\DomainMessage; -use Broadway\EventStore\EventStore; use Broadway\EventStore\EventStreamNotFoundException; +use Broadway\EventStore\EventVisitor; use Broadway\EventStore\Exception\DuplicatePlayheadException; +use Broadway\EventStore\Management\Criteria; -final class DynamoDbEventStore implements EventStore +final class DynamoDbEventStore implements DynamoDbEventStoreInterface { public function __construct( private readonly DynamoDbClient $client, @@ -93,4 +94,17 @@ public function createTable(): void $this->client->tableExists($this->inputBuilder->buildDescribeTableInput($this->table))->wait(); } + + public function visitEvents(Criteria $criteria, EventVisitor $eventVisitor): void + { + $result = $this->client->scan($this->inputBuilder->buildScanInput($this->table)); + + foreach ($result->getItems() as $normalizedDomainMessage) { + $domainMessage = $this->domainMessageNormalizer->denormalize($normalizedDomainMessage); + + if ($criteria->isMatchedBy($domainMessage)) { + $eventVisitor->doWithEvent($domainMessage); + } + } + } } diff --git a/src/DynamoDbEventStoreInterface.php b/src/DynamoDbEventStoreInterface.php new file mode 100644 index 0000000..3ed040a --- /dev/null +++ b/src/DynamoDbEventStoreInterface.php @@ -0,0 +1,15 @@ + $tableName, + ]); + } + public function buildDescribeTableInput(string $tableName): DescribeTableInput { return new DescribeTableInput([ diff --git a/tests/DynamoDbEventStoreManagementTest.php b/tests/DynamoDbEventStoreManagementTest.php new file mode 100644 index 0000000..788de77 --- /dev/null +++ b/tests/DynamoDbEventStoreManagementTest.php @@ -0,0 +1,37 @@ + 'http://dynamodb-local:8000', + 'accessKeyId' => '', + 'accessKeySecret' => '', + ])); + $inputBuilder = new InputBuilder(); + $domainMessageNormalizer = new DomainMessageNormalizer(new SimpleInterfaceSerializer(), new SimpleInterfaceSerializer(), new JsonEncoder(), new JsonDecoder()); + + $eventStore = new DynamoDbEventStore($client, $inputBuilder, $domainMessageNormalizer, 'table'); + + $eventStore->deleteTable(); + $eventStore->createTable(); + + return $eventStore; + } +}