diff --git a/src/Hub/Controller/SubscribeController.php b/src/Hub/Controller/SubscribeController.php index 6f606c7..1cffadb 100644 --- a/src/Hub/Controller/SubscribeController.php +++ b/src/Hub/Controller/SubscribeController.php @@ -8,6 +8,7 @@ use Freddie\Hub\HubControllerInterface; use Freddie\Hub\HubInterface; use Freddie\Message\Update; +use Freddie\Subscription\Subscriber; use Lcobucci\JWT\UnencryptedToken; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; @@ -57,6 +58,8 @@ public function __invoke( $allowedTopics = $this->extractAllowedTopics($request); $lastEventId = extract_last_event_id($request); + $subscriber = new Subscriber($subscribedTopics); + if (null !== $lastEventId) { async( function () use ($lastEventId, $stream, $subscribedTopics, $allowedTopics) { @@ -68,15 +71,16 @@ function () use ($lastEventId, $stream, $subscribedTopics, $allowedTopics) { } async( - function () use ($stream, $subscribedTopics, $allowedTopics) { + function () use ($stream, $subscribedTopics, $allowedTopics, $subscriber) { $callback = fn(Update $update) => $this->sendUpdate( $update, $stream, $subscribedTopics, $allowedTopics ); - $this->hub->subscribe($callback); - $stream->on('close', fn() => $this->hub->unsubscribe($callback)); + $subscriber->setCallback($callback); + $this->hub->subscribe($subscriber); + $stream->on('close', fn() => $this->hub->unsubscribe($subscriber)); } )(); diff --git a/src/Hub/Hub.php b/src/Hub/Hub.php index b23c9f7..0cadef0 100644 --- a/src/Hub/Hub.php +++ b/src/Hub/Hub.php @@ -9,6 +9,7 @@ use Freddie\Hub\Transport\PHP\PHPTransport; use Freddie\Hub\Transport\TransportInterface; use Freddie\Message\Update; +use Freddie\Subscription\Subscriber; use Generator; use InvalidArgumentException; use React\EventLoop\Loop; @@ -75,14 +76,14 @@ public function publish(Update $update): PromiseInterface }); } - public function subscribe(callable $callback): void + public function subscribe(Subscriber $subscriber): void { - $this->transport->subscribe($callback); + $this->transport->subscribe($subscriber); } - public function unsubscribe(callable $callback): void + public function unsubscribe(Subscriber $subscriber): void { - $this->transport->unsubscribe($callback); + $this->transport->unsubscribe($subscriber); } public function reconciliate(string $lastEventID): Generator diff --git a/src/Hub/HubInterface.php b/src/Hub/HubInterface.php index ac68b82..154974d 100644 --- a/src/Hub/HubInterface.php +++ b/src/Hub/HubInterface.php @@ -5,6 +5,7 @@ namespace Freddie\Hub; use Freddie\Message\Update; +use Freddie\Subscription\Subscriber; use Generator; use React\Promise\PromiseInterface; @@ -17,9 +18,9 @@ public function getOption(string $name): mixed; */ public function publish(Update $update): PromiseInterface; - public function subscribe(callable $callback): void; + public function subscribe(Subscriber $subscriber): void; - public function unsubscribe(callable $callback): void; + public function unsubscribe(Subscriber $subscriber): void; /** * @param string $lastEventID diff --git a/src/Message/Message.php b/src/Message/Message.php index 7ee6938..8030436 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -4,6 +4,8 @@ namespace Freddie\Message; +use Symfony\Component\Uid\Ulid; + use function explode; use function str_contains; @@ -18,6 +20,7 @@ public function __construct( public ?string $event = null, public ?int $retry = null, ) { + $this->id ??= (string) new Ulid(); } public function __toString(): string diff --git a/src/Message/Update.php b/src/Message/Update.php index 2397575..cf724ae 100644 --- a/src/Message/Update.php +++ b/src/Message/Update.php @@ -7,16 +7,23 @@ use Freddie\Helper\TopicHelper; use function Freddie\topic; +use function is_string; final class Update { + /** + * @var string[] + */ + public array $topics; + /** * @param string[] $topics */ public function __construct( - public array $topics, + array|string $topics, public Message $message, ) { + $this->topics = is_string($topics) ? [$topics] : $topics; } /** diff --git a/src/Subscription/Subscriber.php b/src/Subscription/Subscriber.php new file mode 100644 index 0000000..ff78f6a --- /dev/null +++ b/src/Subscription/Subscriber.php @@ -0,0 +1,50 @@ +subscriptions = array_map( + fn(string $topic) => new Subscription($this, $topic), + $this->topics, + ); + } + + public function setCallback(callable $callback): void + { + $this->callback = $callback; + } + + /** + * @param mixed ...$args + */ + public function __invoke(...$args): void + { + ($this->callback)(...$args); + } +} diff --git a/src/Subscription/Subscription.php b/src/Subscription/Subscription.php new file mode 100644 index 0000000..f3d0cd7 --- /dev/null +++ b/src/Subscription/Subscription.php @@ -0,0 +1,14 @@ + 'bar'; + $subscriber = new Subscriber(['foo']); + $subscriber->setCallback($subscribeFn); $lastEventId = Ulid::generate(); // When $hub->publish($update); - $hub->subscribe($subscribeFn); + $hub->subscribe($subscriber); iterator_to_array($hub->reconciliate($lastEventId)); - $hub->unsubscribe($subscribeFn); + $hub->unsubscribe($subscriber); // Then expect($transport->called['publish'])->toBe([$update]); - expect($transport->called['subscribe'])->toBe([$subscribeFn]); + expect($transport->called['subscribe'])->toBe([$subscriber]); expect($transport->called['reconciliate'])->toBe([$lastEventId]); - expect($transport->called['unsubscribe'])->toBe([$subscribeFn]); + expect($transport->called['unsubscribe'])->toBe([$subscriber]); }); it('complains when requesting an unrecognized option', function () { diff --git a/tests/Unit/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php index 21c2e82..39908cd 100644 --- a/tests/Unit/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -5,6 +5,7 @@ namespace Freddie\Tests\Unit\Message; use Freddie\Message\Message; +use Symfony\Component\Uid\Ulid; it('stringifies messages', function (Message $message, string $expected) { expect((string) $message)->toBe($expected); @@ -34,3 +35,9 @@ "id:1\nevent:message\ndata:foo\ndata:bar\n\n", ]; }); + +it('has a default id', function () { + $message = new Message(); + expect($message->id)->not()->toBeNull(); + expect(Ulid::isValid($message->id))->toBeTrue(); +}); diff --git a/tests/Unit/Message/UpdateTest.php b/tests/Unit/Message/UpdateTest.php index 54f4b0a..6b02c3b 100644 --- a/tests/Unit/Message/UpdateTest.php +++ b/tests/Unit/Message/UpdateTest.php @@ -103,3 +103,8 @@ 'expected' => true, // Private update allowed on all ]; }); + +it('accepts a single topic', function () { + $update = new Update('/foo', new Message()); + expect($update->topics)->toBe(['/foo']); +});