Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Subscription events #14

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"lcobucci/jwt": "^4.1",
"nyholm/dsn": "^2.0",
"phpdocumentor/reflection-docblock": "^5.3",
"react/async": "4.x-dev#ff11a7aa9eea7104af8f05bafbc85422dac4b8ab",
"react/async": "4.x-dev#cfd52ac426a765a593a3a5c03fceb5a31c9d9edc",
"rize/uri-template": "^0.3.4",
"symfony/console": "6.0.*",
"symfony/dotenv": "6.0.*",
Expand Down
70 changes: 48 additions & 22 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@
<directory>src/DependencyInjection</directory>
</exclude>
</coverage>

<extensions>
<extension class="Freddie\Tests\LoopExtension" />
</extensions>
</phpunit>
14 changes: 13 additions & 1 deletion src/Hub/Controller/SubscribeController.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ public function __invoke(
): ResponseInterface {
$subscribedTopics = $this->extractSubscribedTopics($request);
$allowedTopics = $this->extractAllowedTopics($request);
$payload = $this->extractPayload($request);
$lastEventId = extract_last_event_id($request);

$subscriber = new Subscriber($subscribedTopics);
$subscriber = new Subscriber($subscribedTopics, $payload);

if (null !== $lastEventId) {
async(
Expand Down Expand Up @@ -138,4 +139,15 @@ private function extractAllowedTopics(ServerRequestInterface $request): ?array

return $jwt->claims()->get('mercure')['subscribe'] ?? null;
}

private function extractPayload(ServerRequestInterface $request): mixed
{
/** @var UnencryptedToken|null $jwt */
$jwt = $request->getAttribute('token');
if (null === $jwt) {
return null;
}

return $jwt->claims()->get('mercure')['payload'] ?? null;
}
}
26 changes: 26 additions & 0 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Freddie\Hub;

use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use FrameworkX\App;
use Freddie\Hub\Middleware\HttpExceptionConverterMiddleware;
use Freddie\Hub\Transport\PHP\PHPTransport;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Message;
use Freddie\Message\Update;
use Freddie\Subscription\Subscriber;
use Generator;
Expand All @@ -23,6 +26,7 @@ final class Hub implements HubInterface
{
public const DEFAULT_OPTIONS = [
'allow_anonymous' => true,
'enable_subscription_events' => true,
];

/**
Expand All @@ -40,19 +44,26 @@ final class Hub implements HubInterface
public function __construct(
private App $app = new App(new HttpExceptionConverterMiddleware()),
private TransportInterface $transport = new PHPTransport(),
private EventEmitterInterface $eventEmitter = new EventEmitter(),
array $options = [],
iterable $controllers = [],
) {
$resolver = new OptionsResolver();
$resolver->setDefaults(self::DEFAULT_OPTIONS);
$resolver->setAllowedTypes('allow_anonymous', 'bool');
$resolver->setAllowedTypes('enable_subscription_events', 'bool');
$this->options = $resolver->resolve($options);
foreach ($controllers as $controller) {
$controller->setHub($this);
$method = $controller->getMethod();
$route = $controller->getRoute();
$this->app->{$method}($route, $controller);
}

if (true === $this->getOption('enable_subscription_events')) {
$eventEmitter->on('subscribe', fn(Subscriber $subscriber) => $this->notify($subscriber));
$eventEmitter->on('unsubscribe', fn(Subscriber $subscriber) => $this->notify($subscriber));
}
}

/**
Expand All @@ -66,6 +77,7 @@ public function run(): void

public function publish(Update $update): PromiseInterface
{
$this->eventEmitter->emit('publish', [$update]);
return $this->transport->publish($update)
->then(function (Update $update) {
if (false === $this->started) {
Expand All @@ -78,14 +90,28 @@ public function publish(Update $update): PromiseInterface

public function subscribe(Subscriber $subscriber): void
{
$this->eventEmitter->emit('subscribe', [$subscriber]);
$this->transport->subscribe($subscriber);
}

public function unsubscribe(Subscriber $subscriber): void
{
$subscriber->active = false;
$this->eventEmitter->emit('unsubscribe', [$subscriber]);
$this->transport->unsubscribe($subscriber);
}

private function notify(Subscriber $subscriber): void
{
foreach ($subscriber->subscriptions as $subscription) {
$update = new Update(
$subscription->id,
new Message(data: (string) json_encode($subscription), private: true)
);
$this->publish($update);
}
}

public function reconciliate(string $lastEventID): Generator
{
return $this->transport->reconciliate($lastEventID);
Expand Down
34 changes: 33 additions & 1 deletion src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,43 @@

namespace Freddie\Subscription;

final class Subscription
use JsonSerializable;

use function sprintf;
use function urlencode;

final class Subscription implements JsonSerializable
{
public readonly string $id;

public function __construct(
public readonly Subscriber $subscriber,
public readonly string $topic,
) {
$this->id = sprintf(
'/.well-known/mercure/subscriptions/%s/%s',
urlencode($this->topic),
urlencode((string) $subscriber->id)
);
}

/**
* @return array<string, mixed>
*/
public function jsonSerialize(): array
{
$output = [
'id' => $this->id,
'type' => 'Subscription',
'subscriber' => (string) $this->subscriber->id,
'topic' => $this->topic,
'active' => $this->subscriber->active,
];

if (null !== $this->subscriber->payload) {
$output['payload'] = $this->subscriber->payload;
}

return $output;
}
}
35 changes: 27 additions & 8 deletions tests/Integration/HubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use function sprintf;

it('works 🎉🥳', function () {
$bobsMessages = $alicesMessages = [];
$env = ['X_LISTEN' => $_ENV['X_LISTEN'] ?? '127.0.0.1:8080'];
foreach (explode(',', $_ENV['SYMFONY_DOTENV_VARS'] ?? '') as $key) {
$value = $_ENV[$key] ?? null;
Expand All @@ -30,12 +31,18 @@

usleep(1500000); // Wait for process to actually start

$messages = [];
Loop::addTimer(0.0, function () use ($endpoint, &$messages) {
$listener = new EventSource(sprintf('%s?topic=*', $endpoint));
$listener->on('message', function (MessageEvent $event) use (&$messages) {
$messages[] = $event->data;
Loop::stop();
// Given
Loop::addTimer(0.0, function () use ($endpoint, &$bobsMessages) {
$bob = new EventSource(sprintf('%s?topic=*', $endpoint));
$bob->on('message', function (MessageEvent $event) use (&$bobsMessages) {
$bobsMessages[] = $event->data;
});
});
Loop::addTimer(0.0, function () use ($endpoint, &$alicesMessages) {
$alice = new EventSource(sprintf('%s?topic=/foo', $endpoint));
$alice->on('message', function (MessageEvent $event) use (&$alicesMessages, $alice) {
$alicesMessages[] = $event->data;
$alice->close();
});
});
Loop::addTimer(0.05, function () use ($endpoint) {
Expand All @@ -49,7 +56,19 @@
],
]);
});
Loop::addTimer(1, fn() => Loop::stop());
Loop::addTimer(0.2, fn() => Loop::stop());

// When
Loop::run();
expect($messages[0] ?? null)->toBe('itworks');

// Then
expect($alicesMessages)->toHaveCount(1);
expect($alicesMessages[0])->toBe('itworks');

expect($bobsMessages)->toHaveCount(3);
expect(json_decode($bobsMessages[0], true)['type'] ?? null)->toBe('Subscription');
expect(json_decode($bobsMessages[0], true)['active'] ?? null)->toBe(true);
expect($bobsMessages[1])->toBe('itworks');
expect(json_decode($bobsMessages[2], true)['type'] ?? null)->toBe('Subscription');
expect(json_decode($bobsMessages[2], true)['active'] ?? null)->toBe(false);
});
Loading