From 1c75d78cba61658ab9e10474ee8a7f575586e839 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Mon, 4 Dec 2023 13:54:41 +0000 Subject: [PATCH] Adds cache managers for use with API Gateway (#26) * implement cache manager * implement contract * Fix code styling * wip * wip --- phpunit.xml.dist | 2 +- src/Channels/Channel.php | 5 +- src/Concerns/SerializesChannels.php | 27 + src/Contracts/ChannelConnectionManager.php | 5 + src/Contracts/Connection.php | 22 - src/Contracts/ConnectionManager.php | 36 ++ .../ArrayChannelConnectionManager.php | 12 + .../CacheChannelConnectionManager.php | 111 ++++ src/Managers/CacheChannelManager.php | 123 +++++ src/Managers/CacheConnectionManager.php | 103 ++++ src/Servers/ApiGateway/ApiGatewayProvider.php | 30 +- src/Servers/ApiGateway/Connection.php | 33 +- src/Servers/ApiGateway/Server.php | 24 +- tests/ApiGatewayTestCase.php | 8 +- tests/Feature/ApiGateway/ServerTest.php | 515 +++++++++--------- tests/Pest.php | 9 + tests/Unit/Channels/CacheChannelTest.php | 2 + tests/Unit/Channels/ChannelTest.php | 2 + .../Channels/PresenceCacheChannelTest.php | 2 + tests/Unit/Channels/PresenceChannelTest.php | 2 + .../Unit/Channels/PrivateCacheChannelTest.php | 2 + tests/Unit/Channels/PrivateChannelTest.php | 2 + tests/Unit/ClientEventTest.php | 2 + tests/Unit/EventTest.php | 4 + 24 files changed, 792 insertions(+), 291 deletions(-) create mode 100644 src/Concerns/SerializesChannels.php create mode 100644 src/Contracts/ConnectionManager.php create mode 100644 src/Managers/CacheChannelConnectionManager.php create mode 100644 src/Managers/CacheChannelManager.php create mode 100644 src/Managers/CacheConnectionManager.php diff --git a/phpunit.xml.dist b/phpunit.xml.dist index a453a02e..91b178c8 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -14,7 +14,7 @@ - + diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 753fc44b..3d4d1b6e 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -2,11 +2,14 @@ namespace Laravel\Reverb\Channels; +use Laravel\Reverb\Concerns\SerializesChannels; use Laravel\Reverb\Contracts\ChannelConnectionManager; use Laravel\Reverb\Contracts\Connection; class Channel { + use SerializesChannels; + /** * The channel connections. * @@ -16,7 +19,7 @@ class Channel public function __construct(protected string $name) { - $this->connections = app(ChannelConnectionManager::class); + $this->connections = app(ChannelConnectionManager::class)->for($this->name); } /** diff --git a/src/Concerns/SerializesChannels.php b/src/Concerns/SerializesChannels.php new file mode 100644 index 00000000..3e19516d --- /dev/null +++ b/src/Concerns/SerializesChannels.php @@ -0,0 +1,27 @@ + $this->name, + ]; + } + + /** + * Restore the connection after serialization. + */ + public function __unserialize(array $values): void + { + $this->name = $values['name']; + $this->connections = app(ChannelConnectionManager::class)->for($this->name); + } +} diff --git a/src/Contracts/ChannelConnectionManager.php b/src/Contracts/ChannelConnectionManager.php index 70bbee7d..65f23884 100644 --- a/src/Contracts/ChannelConnectionManager.php +++ b/src/Contracts/ChannelConnectionManager.php @@ -6,6 +6,11 @@ interface ChannelConnectionManager { + /** + * The channel name. + */ + public function for(string $name): ChannelConnectionManager; + /** * Add a connection. */ diff --git a/src/Contracts/Connection.php b/src/Contracts/Connection.php index dce500ae..295fe50f 100644 --- a/src/Contracts/Connection.php +++ b/src/Contracts/Connection.php @@ -16,8 +16,6 @@ abstract class Connection */ protected $hasBeenPinged = false; - protected $pusher; - public function __construct( protected Application $application, protected ?string $origin @@ -128,24 +126,4 @@ public function isStale(): bool { return $this->isInactive() && $this->hasBeenPinged; } - - /** - * Hydrate a serialized connection. - */ - public static function hydrate(Connection|string $connection): Connection - { - return is_object($connection) - ? $connection - : unserialize($connection); - } - - /** - * Hydrate a serialized connection. - */ - public static function dehydrate(Connection $connection): Connection|string - { - return $connection instanceof SerializableConnection - ? serialize($connection) - : $connection; - } } diff --git a/src/Contracts/ConnectionManager.php b/src/Contracts/ConnectionManager.php new file mode 100644 index 00000000..91800121 --- /dev/null +++ b/src/Contracts/ConnectionManager.php @@ -0,0 +1,36 @@ +name = $name; + + return $this; + } + /** * Add a connection. */ diff --git a/src/Managers/CacheChannelConnectionManager.php b/src/Managers/CacheChannelConnectionManager.php new file mode 100644 index 00000000..82bb1e31 --- /dev/null +++ b/src/Managers/CacheChannelConnectionManager.php @@ -0,0 +1,111 @@ +name = $name; + + return $this; + } + + /** + * Get the key for the channels. + */ + protected function key(): string + { + return "{$this->prefix}:{$this->name}"; + } + + /** + * Add a connection. + */ + public function add(Connection $connection, array $data): void + { + $connections = $this->repository->get($this->key(), []); + + $connections[$connection->identifier()] = $data; + + $this->repository->put($this->key(), $connections); + } + + /** + * Remove a connection. + */ + public function remove(Connection $connection): void + { + $connections = $this->repository->get($this->key()); + + unset($connections[$connection->identifier()]); + + $this->repository->put($this->key(), $connections); + } + + /** + * Find a connection in the set. + */ + public function find(Connection $connection): ?ChannelConnection + { + return $this->findById($connection->identifier()); + } + + /** + * Find a connection in the set by its ID. + */ + public function findById(string $id): ?ChannelConnection + { + $connection = $this->connections->find($id); + + if (! $connection) { + return null; + } + + return new ChannelConnection( + $connection, + $this->repository->get($this->key())[$id] ?? [] + ); + } + + /** + * Get all the connections. + */ + public function all(): array + { + $connections = $this->connections->all(); + $channelConnections = $this->repository->get($this->key(), []); + $allConnections = array_intersect_key($connections, $channelConnections); + + return array_map(function ($connection) use ($channelConnections) { + return new ChannelConnection($connection, $channelConnections[$connection->identifier()]); + }, $allConnections); + } + + /** + * Flush the channel connection manager. + */ + public function flush(): void + { + $this->repository->put($this->key(), []); + } +} diff --git a/src/Managers/CacheChannelManager.php b/src/Managers/CacheChannelManager.php new file mode 100644 index 00000000..fc97678d --- /dev/null +++ b/src/Managers/CacheChannelManager.php @@ -0,0 +1,123 @@ +application; + } + + /** + * Get all the channels. + */ + public function all(): array + { + return $this->channels(); + } + + /** + * Find the given channel + */ + public function find(string $channel): Channel + { + return $this->channels($channel); + } + + /** + * Get all the connections for the given channels. + * + * @return array + */ + public function connections(string $channel = null): array + { + $channels = Arr::wrap($this->channels($channel)); + + return array_reduce($channels, function ($carry, $channel) { + return $carry + $channel->connections(); + }, []); + } + + /** + * Unsubscribe from all channels. + */ + public function unsubscribeFromAll(Connection $connection): void + { + foreach ($this->channels() as $channel) { + $channel->unsubscribe($connection); + } + } + + /** + * Get the given channel. + */ + public function channel(string $channel): Channel + { + return $this->channels($channel); + } + + /** + * Get the channels. + * + * @return \Laravel\Reverb\Channels\Channel|array + */ + public function channels(string $channel = null): Channel|array + { + $channels = $this->repository->get($this->prefix, []); + + if (! isset($channels[$this->application->id()])) { + $channels[$this->application->id()] = []; + } + + if ($channel) { + if (! isset($channels[$this->application->id()][$channel])) { + $channel = ChannelBroker::create($channel); + $channels[$this->application->id()][$channel->name()] = serialize($channel); + $this->repository->forever($this->prefix, $channels); + + return $channel; + } + + return unserialize($channels[$this->application->id()][$channel]); + } + + return array_map('unserialize', $channels[$this->application->id()] ?: []); + } + + /** + * Flush the channel manager repository. + */ + public function flush(): void + { + $this->repository->forever($this->prefix, []); + } +} diff --git a/src/Managers/CacheConnectionManager.php b/src/Managers/CacheConnectionManager.php new file mode 100644 index 00000000..73a3b13d --- /dev/null +++ b/src/Managers/CacheConnectionManager.php @@ -0,0 +1,103 @@ +prefix}:connections"; + } + + /** + * Add a new connection. + */ + public function connect(Connection $connection): void + { + $connections = $this->get(); + + $connections[$connection->identifier()] = serialize($connection); + + $this->persist($connections); + } + + /** + * Find a connection. + */ + public function find(string $id): ?Connection + { + $connections = $this->get(); + + if (! isset($connections[$id])) { + return null; + } + + return unserialize($connections[$id]); + } + + /** + * Get all the connections. + */ + public function all(): array + { + return array_map('unserialize', $this->get()); + } + + /** + * Update the state of a connection. + */ + public function update(Connection $connection): void + { + $this->connect($connection); + } + + /** + * Forget a connection. + */ + public function forget(Connection $connection): void + { + $connections = $this->get(); + + unset($connections[$connection->identifier()]); + + $this->persist($connections); + } + + /** + * Flush all connections. + */ + public function flush(): void + { + $this->repository->forget($this->key()); + } + + /** + * Get all the connections from the store. + */ + protected function get(): array + { + return $this->repository->get($this->key(), []); + } + + /** + * Persist the connections to the store. + */ + protected function persist(array $connections): void + { + $this->repository->put($this->key(), $connections); + } +} diff --git a/src/Servers/ApiGateway/ApiGatewayProvider.php b/src/Servers/ApiGateway/ApiGatewayProvider.php index acd90ca3..659feb0c 100644 --- a/src/Servers/ApiGateway/ApiGatewayProvider.php +++ b/src/Servers/ApiGateway/ApiGatewayProvider.php @@ -10,12 +10,14 @@ use Laravel\Reverb\Contracts\ApplicationProvider; use Laravel\Reverb\Contracts\ChannelConnectionManager; use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Contracts\ServerProvider; use Laravel\Reverb\Event; use Laravel\Reverb\Jobs\PingInactiveConnections; use Laravel\Reverb\Jobs\PruneStaleConnections; -use Laravel\Reverb\Managers\ArrayChannelConnectionManager; -use Laravel\Reverb\Managers\ArrayChannelManager; +use Laravel\Reverb\Managers\CacheChannelConnectionManager; +use Laravel\Reverb\Managers\CacheChannelManager; +use Laravel\Reverb\Managers\CacheConnectionManager; class ApiGatewayProvider extends ServerProvider { @@ -50,6 +52,15 @@ public function register(): void return new JsonResponse((object) []); }); + + $this->app->singleton(ConnectionManager::class, function () { + return new CacheConnectionManager( + $this->app['cache']->store( + $this->config['connection_manager']['store'] + ), + $this->config['connection_manager']['prefix'] + ); + }); } /** @@ -57,7 +68,12 @@ public function register(): void */ public function buildChannelManager(): ChannelManager { - return new ArrayChannelManager; + return new CacheChannelManager( + $this->app['cache']->store( + $this->config['connection_manager']['store'] + ), + $this->config['connection_manager']['prefix'] + ); } /** @@ -65,6 +81,12 @@ public function buildChannelManager(): ChannelManager */ public function buildChannelConnectionManager(): ChannelConnectionManager { - return new ArrayChannelConnectionManager; + return new CacheChannelConnectionManager( + $this->app['cache']->store( + $this->config['connection_manager']['store'] + ), + app(ConnectionManager::class), + $this->config['connection_manager']['prefix'] + ); } } diff --git a/src/Servers/ApiGateway/Connection.php b/src/Servers/ApiGateway/Connection.php index c49b6882..a0e3b727 100644 --- a/src/Servers/ApiGateway/Connection.php +++ b/src/Servers/ApiGateway/Connection.php @@ -6,6 +6,7 @@ use Laravel\Reverb\Concerns\GeneratesPusherIdentifiers; use Laravel\Reverb\Concerns\SerializesConnections; use Laravel\Reverb\Contracts\Connection as BaseConnection; +use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Contracts\SerializableConnection; use Laravel\Reverb\Servers\ApiGateway\Jobs\SendToConnection; @@ -61,6 +62,36 @@ public function send(string $message): void */ public function terminate(): void { - // + app(ConnectionManager::class)->forget($this); + } + + /** + * Ping the connection to ensure it is still active. + */ + public function ping(): void + { + parent::ping(); + + $this->save(); + } + + /** + * Touch the connection last seen at timestamp. + */ + public function touch(): Connection + { + parent::touch(); + + $this->save(); + + return $this; + } + + /** + * Persist the state change to the connection manager. + */ + public function save(): void + { + app(ConnectionManager::class)->update($this); } } diff --git a/src/Servers/ApiGateway/Server.php b/src/Servers/ApiGateway/Server.php index 92b5d38b..8b624af9 100644 --- a/src/Servers/ApiGateway/Server.php +++ b/src/Servers/ApiGateway/Server.php @@ -4,16 +4,20 @@ use Laravel\Reverb\Application; use Laravel\Reverb\Contracts\ApplicationProvider; +use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Exceptions\InvalidApplication; use Laravel\Reverb\Server as ReverbServer; use Laravel\Reverb\Servers\ApiGateway\Jobs\SendToConnection; class Server { + protected ConnectionManager $connections; + public function __construct( protected ReverbServer $server, protected ApplicationProvider $applications, ) { + $this->connections = app(ConnectionManager::class); } /** @@ -27,7 +31,7 @@ public function handle(Request $request): void $this->connect($request) ), 'DISCONNECT' => $this->server->close( - $this->connect($request) + $this->connect($request), ), 'MESSAGE' => $this->server->message( $this->connect($request), @@ -52,11 +56,21 @@ public function handle(Request $request): void */ protected function connect(Request $request): Connection { - return new Connection( - $request->connectionId(), - $this->application($request), - $request->headers['origin'] ?? null + $connection = $this->connections->find($request->connectionId()); + + if ($connection) { + return $connection; + } + + $this->connections->connect( + $connection = new Connection( + $request->connectionId(), + $this->application($request), + $request->headers['origin'] ?? null + ) ); + + return $connection; } /** diff --git a/tests/ApiGatewayTestCase.php b/tests/ApiGatewayTestCase.php index 69fd6d10..e2f63a98 100644 --- a/tests/ApiGatewayTestCase.php +++ b/tests/ApiGatewayTestCase.php @@ -2,6 +2,7 @@ namespace Laravel\Reverb\Tests; +use Illuminate\Support\Arr; use Illuminate\Support\Facades\App; use Illuminate\Support\Facades\Bus; use Illuminate\Support\Str; @@ -64,6 +65,7 @@ protected function defineEnvironment($app) 'capacity' => null, 'allowed_origins' => ['*'], 'ping_interval' => 10, + 'max_message_size' => 1000000, ]); } @@ -124,7 +126,7 @@ public function subscribe(string $channel, ?array $data = [], string $auth = nul if (! $auth && Str::startsWith($channel, ['private-', 'presence-'])) { $this->connect($connectionId, $appKey); $managed = $this->managedConnection(); - $auth = validAuth($managed, $channel, $data); + $auth = validAuth($managed->id(), $channel, $data); } $this->send([ @@ -179,6 +181,8 @@ public function assertSent(string $connectionId = null, mixed $message = null, i */ public function managedConnection(): ?Connection { - return Connection::hydrate(connectionManager()->all()->last()); + $connection = Arr::last(connectionManager()->all()); + + return connectionManager()->find($connection->identifier()); } } diff --git a/tests/Feature/ApiGateway/ServerTest.php b/tests/Feature/ApiGateway/ServerTest.php index e2f8b65a..d2c52518 100644 --- a/tests/Feature/ApiGateway/ServerTest.php +++ b/tests/Feature/ApiGateway/ServerTest.php @@ -1,10 +1,8 @@ flush(); -// channelManager()->flush(); -// }); +afterEach(function () { + channelManager()->flush(); + connectionManager()->flush(); +}); -// it('can handle a new connection', function () { -// $this->connect(); +it('can handle a new connection', function () { + $this->connect(); -// $this->assertCount(1, connectionManager()->all()); -// })->skip(); + $this->assertCount(1, connectionManager()->all()); +}); -// it('can handle multiple new connections', function () { -// $this->connect(); -// $this->connect('def-456'); +it('can handle multiple new connections', function () { + $this->connect(); + $this->connect('def-456'); -// $this->assertCount(2, connectionManager()->all()); -// })->skip(); + $this->assertCount(2, connectionManager()->all()); +}); -// it('can handle connections to different applications', function () { -// $this->connect(); -// $this->connect(appKey: 'pusher-key-2'); +it('can handle connections to different applications', function () { + $this->connect(); + $this->connect('def-456', appKey: 'pusher-key-2'); -// foreach (App::make(ApplicationProvider::class)->all() as $app) { -// $this->assertCount(1, connectionManager()->for($app)->all()); -// } -// })->skip(); + $connections = connectionManager()->all(); -// it('can subscribe to a channel', function () { -// $this->subscribe('test-channel'); + expect(Arr::first($connections)->identifier())->toBe('abc-123'); + expect(Arr::first($connections)->app()->id())->toBe('123456'); + expect(Arr::last($connections)->identifier())->toBe('def-456'); + expect(Arr::last($connections)->app()->id())->toBe('654321'); +}); -// $this->assertCount(1, connectionManager()->all()); +it('can subscribe to a channel', function () { + $this->subscribe('test-channel'); -// $this->assertCount(1, channelManager()->connectionKeys(ChannelBroker::create('test-channel'))); + expect(connectionManager()->all())->toHaveCount(1); -// $this->assertSent('abc-123', '{"event":"pusher_internal:subscription_succeeded","channel":"test-channel"}'); -// })->skip(); + expect(channelManager()->find('test-channel')->connections())->toHaveCount(1); -// it('can subscribe to a private channel', function () { -// $this->subscribe('private-test-channel'); + $this->assertSent('abc-123', '{"event":"pusher_internal:subscription_succeeded","channel":"test-channel"}'); +}); -// $this->assertSent('abc-123', '{"event":"pusher_internal:subscription_succeeded","channel":"private-test-channel"}'); -// })->skip(); +it('can subscribe to a private channel', function () { + $this->subscribe('private-test-channel'); -// it('can subscribe to a presence channel', function () { -// $data = ['user_id' => 1, 'user_info' => ['name' => 'Test User']]; -// $this->subscribe('presence-test-channel', data: $data); + $this->assertSent('abc-123', '{"event":"pusher_internal:subscription_succeeded","channel":"private-test-channel"}'); +}); -// $this->assertSent('abc-123', [ -// 'pusher_internal:subscription_succeeded', -// '"hash\":{\"1\":{\"name\":\"Test User\"}}', -// ]); -// })->skip(); +it('can subscribe to a presence channel', function () { + $data = ['user_id' => 1, 'user_info' => ['name' => 'Test User']]; + $this->subscribe('presence-test-channel', data: $data); -// it('can notify other subscribers of a presence channel when a new member joins', function () { -// $data = ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]; -// $this->subscribe('presence-test-channel', data: $data); + $this->assertSent('abc-123', [ + 'pusher_internal:subscription_succeeded', + '"hash\":{\"1\":{\"name\":\"Test User\"}}', + ]); +}); -// $data = ['user_id' => 2, 'user_info' => ['name' => 'Test User 2']]; -// $this->subscribe('presence-test-channel', data: $data, connectionId: 'def-456'); -// $this->assertSent('abc-123', '{"event":"pusher_internal:member_added","data":{"user_id":2,"user_info":{"name":"Test User 2"}},"channel":"presence-test-channel"}'); +it('can notify other subscribers of a presence channel when a new member joins', function () { + $data = ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]; + $this->subscribe('presence-test-channel', data: $data); -// $data = ['user_id' => 3, 'user_info' => ['name' => 'Test User 3']]; -// $this->subscribe('presence-test-channel', data: $data, connectionId: 'ghi-789'); -// $this->assertSent('def-456', '{"event":"pusher_internal:member_added","data":{"user_id":3,"user_info":{"name":"Test User 3"}},"channel":"presence-test-channel"}'); -// })->skip(); + $data = ['user_id' => 2, 'user_info' => ['name' => 'Test User 2']]; + $this->subscribe('presence-test-channel', data: $data, connectionId: 'def-456'); + $this->assertSent('abc-123', '{"event":"pusher_internal:member_added","data":{"user_id":2,"user_info":{"name":"Test User 2"}},"channel":"presence-test-channel"}'); -// it('can notify other subscribers of a presence channel when a member leaves', function () { -// $this->withoutExceptionHandling(); -// $data = ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]; -// $this->subscribe('presence-test-channel', data: $data); + $data = ['user_id' => 3, 'user_info' => ['name' => 'Test User 3']]; + $this->subscribe('presence-test-channel', data: $data, connectionId: 'ghi-789'); + $this->assertSent('def-456', '{"event":"pusher_internal:member_added","data":{"user_id":3,"user_info":{"name":"Test User 3"}},"channel":"presence-test-channel"}'); +}); -// $data = ['user_id' => 2, 'user_info' => ['name' => 'Test User 2']]; -// $this->subscribe('presence-test-channel', data: $data, connectionId: 'def-456'); -// $this->assertSent('abc-123', '{"event":"pusher_internal:member_added","data":{"user_id":2,"user_info":{"name":"Test User 2"}},"channel":"presence-test-channel"}'); +it('can notify other subscribers of a presence channel when a member leaves', function () { + $data = ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]; + $this->subscribe('presence-test-channel', data: $data); -// $data = ['user_id' => 3, 'user_info' => ['name' => 'Test User 3']]; -// $this->subscribe('presence-test-channel', data: $data, connectionId: 'ghi-789'); -// $this->assertSent('def-456', '{"event":"pusher_internal:member_added","data":{"user_id":3,"user_info":{"name":"Test User 3"}},"channel":"presence-test-channel"}'); + $data = ['user_id' => 2, 'user_info' => ['name' => 'Test User 2']]; + $this->subscribe('presence-test-channel', data: $data, connectionId: 'def-456'); + $this->assertSent('abc-123', '{"event":"pusher_internal:member_added","data":{"user_id":2,"user_info":{"name":"Test User 2"}},"channel":"presence-test-channel"}'); -// $this->disconnect('ghi-789'); + $data = ['user_id' => 3, 'user_info' => ['name' => 'Test User 3']]; + $this->subscribe('presence-test-channel', data: $data, connectionId: 'ghi-789'); + $this->assertSent('def-456', '{"event":"pusher_internal:member_added","data":{"user_id":3,"user_info":{"name":"Test User 3"}},"channel":"presence-test-channel"}'); -// $this->assertSent( -// message: '{"event":"pusher_internal:member_removed","data":{"user_id":3},"channel":"presence-test-channel"}', -// times: 2 -// ); -// })->skip(); + expect(connectionManager()->all())->toHaveCount(3); -// it('can receive a message broadcast from the server', function () { -// $this->subscribe('test-channel'); -// $this->subscribe('test-channel', connectionId: 'def-456'); -// $this->subscribe('test-channel', connectionId: 'ghi789'); + $this->disconnect('ghi-789'); -// $this->post('apps/123456/events', [ -// 'name' => 'App\\Events\\TestEvent', -// 'channel' => 'test-channel', -// 'data' => ['foo' => 'bar'], -// ])->assertOk(); + expect(connectionManager()->all())->toHaveCount(2); -// $this->assertSent(message: '{"event":"App\\\\Events\\\\TestEvent","channel":"test-channel","data":{"foo":"bar"}}'); -// })->skip(); + $this->assertSent( + message: '{"event":"pusher_internal:member_removed","data":{"user_id":3},"channel":"presence-test-channel"}', + times: 2 + ); +}); -// it('can handle an event', function () { -// $this->subscribe('presence-test-channel', data: ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]); +it('can receive a message broadcast from the server', function () { + $this->subscribe('test-channel'); + $this->subscribe('test-channel', connectionId: 'def-456'); + $this->subscribe('test-channel', connectionId: 'ghi789'); -// $this->post('apps/123456/events', [ -// 'name' => 'App\\Events\\TestEvent', -// 'channel' => 'presence-test-channel', -// 'data' => ['foo' => 'bar'], -// ])->assertOk(); + $this->post('apps/123456/events', [ + 'name' => 'App\\Events\\TestEvent', + 'channel' => 'test-channel', + 'data' => ['foo' => 'bar'], + ])->assertOk(); -// $this->assertSent('abc-123', message: '{"event":"App\\\\Events\\\\TestEvent","channel":"presence-test-channel","data":{"foo":"bar"}}'); -// })->skip(); + $this->assertSent(message: '{"event":"App\\\\Events\\\\TestEvent","channel":"test-channel","data":{"foo":"bar"}}'); +}); -// it('can respond to a ping', function () { -// $this->send(['event' => 'pusher:ping']); +it('can handle an event', function () { + $this->subscribe('presence-test-channel', data: ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]); -// $this->assertSent('abc-123', '{"event":"pusher:pong"}', 1); -// })->skip(); + $this->post('apps/123456/events', [ + 'name' => 'App\\Events\\TestEvent', + 'channel' => 'presence-test-channel', + 'data' => ['foo' => 'bar'], + ])->assertOk(); -// it('it can ping inactive subscribers', function () { -// $this->connect(); + $this->assertSent('abc-123', message: '{"event":"App\\\\Events\\\\TestEvent","channel":"presence-test-channel","data":{"foo":"bar"}}'); +}); -// Carbon::setTestNow(now()->addMinutes(10)); +it('can respond to a ping', function () { + $this->send(['event' => 'pusher:ping']); -// (new PingInactiveConnections)->handle( -// connectionManager() -// ); + $this->assertSent('abc-123', '{"event":"pusher:pong"}', 1); +}); -// $this->assertSent('abc-123', '{"event":"pusher:ping"}', 1); -// })->skip(); +it('it can ping inactive subscribers', function () { + $this->connect(); + $this->subscribe('test-channel'); + $this->assertSent('abc-123', 'subscription_succeeded', 1); -// it('it can disconnect inactive subscribers', function () { -// $this->subscribe('test-channel'); + $connection = Arr::first(connectionManager()->all()); + $connection->setLastSeenAt(time() - 60 * 10); + connectionManager()->connect($connection); -// expect(connectionManager()->all())->toHaveCount(1); -// expect(channelManager()->connectionKeys(ChannelBroker::create('test-channel')))->toHaveCount(1); - -// Carbon::setTestNow(now()->addMinutes(10)); - -// (new PingInactiveConnections)->handle( -// connectionManager() -// ); -// $this->assertSent('abc-123', '{"event":"pusher:ping"}'); - -// (new PruneStaleConnections)->handle( -// connectionManager(), -// channelManager() -// ); - -// expect(connectionManager()->all())->toHaveCount(0); -// expect(channelManager()->connectionKeys(ChannelBroker::create('test-channel')))->toHaveCount(0); - -// $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4201,\"message\":\"Pong reply not received in time\"}"}', 1); -// })->skip(); - -// it('can handle a client whisper', function () { -// $this->subscribe('test-channel'); - -// $this->subscribe('test-channel', connectionId: 'def-456'); - -// $this->send([ -// 'event' => 'client-start-typing', -// 'channel' => 'test-channel', -// 'data' => [ -// 'id' => 123, -// 'name' => 'Joe Dixon', -// ], -// ], 'abc-123'); - -// $this->assertSent('def-456', '{"event":"client-start-typing","channel":"test-channel","data":{"id":123,"name":"Joe Dixon"}}', 1); -// })->skip(); - -// it('can subscribe a connection to multiple channels', function () { -// $this->subscribe('test-channel'); -// $this->subscribe('test-channel-2'); -// $this->subscribe('private-test-channel-3', data: ['foo' => 'bar']); -// $this->subscribe('presence-test-channel-4', data: ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]); - -// expect(connectionManager()->all())->toHaveCount(1); -// expect(channelManager()->all())->toHaveCount(4); - -// $connection = $this->managedConnection(); - -// channelManager()->all()->each(function ($channel) use ($connection) { -// expect(channelManager()->connectionKeys($channel))->toHaveCount(1); -// expect(channelManager()->connectionKeys($channel)->map(fn ($conn, $index) => (string) $index))->toContain($connection->identifier()); -// })->skip(); -// })->skip(); - -// it('can subscribe multiple connections to multiple channels', function () { -// $this->subscribe('test-channel'); -// $this->subscribe('test-channel-2'); -// $this->subscribe('private-test-channel-3', data: ['foo' => 'bar']); -// $this->subscribe('presence-test-channel-4', data: ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]); - -// $connection = $this->connect(); -// $this->subscribe('test-channel', connectionId: 'def-456'); -// $this->subscribe('private-test-channel-3', connectionId: 'def-456', data: ['foo' => 'bar']); - -// expect(connectionManager()->all())->toHaveCount(2); -// expect(channelManager()->all())->toHaveCount(4); - -// expect(channelManager()->connectionKeys(ChannelBroker::create('test-channel')))->toHaveCount(2); -// expect(channelManager()->connectionKeys(ChannelBroker::create('test-channel-2')))->toHaveCount(1); -// expect(channelManager()->connectionKeys(ChannelBroker::create('private-test-channel-3')))->toHaveCount(2); -// expect(channelManager()->connectionKeys(ChannelBroker::create('presence-test-channel-4')))->toHaveCount(1); -// })->skip(); - -// it('fails to subscribe to a private channel with invalid auth signature', function () { -// $this->subscribe('private-test-channel', auth: 'invalid-signature'); - -// $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4009,\"message\":\"Connection is unauthorized\"}"}'); -// })->skip(); - -// it('fails to subscribe to a presence channel with invalid auth signature', function () { -// $this->subscribe('presence-test-channel', auth: 'invalid-signature'); - -// $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4009,\"message\":\"Connection is unauthorized\"}"}'); -// })->skip(); - -// it('fails to connect when an invalid application is provided', function () { -// App::make(Server::class) -// ->handle(Request::fromLambdaEvent( -// [ -// 'requestContext' => [ -// 'eventType' => 'CONNECT', -// 'connectionId' => 'abc-123', -// ], -// 'queryStringParameters' => [ -// 'appId' => 'invalid-app-id', -// ], -// ] -// )); - -// $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4001,\"message\":\"Application does not exist\"}"}'); -// })->skip(); - -// it('cannot connect from an invalid origin', function () { -// $this->app['config']->set('reverb.apps.apps.0.allowed_origins', ['https://laravel.com']); - -// App::make(Server::class) -// ->handle(Request::fromLambdaEvent( -// [ -// 'requestContext' => [ -// 'eventType' => 'CONNECT', -// 'connectionId' => 'abc-123', -// ], -// 'queryStringParameters' => [ -// 'appId' => 'pusher-key', -// ], -// ] -// )); - -// $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4009,\"message\":\"Origin not allowed\"}"}', 1); -// })->skip(); - -// it('can connect from a valid origin', function () { -// $this->app['config']->set('reverb.apps.0.allowed_origins', ['laravel.com']); - -// App::make(Server::class) -// ->handle(Request::fromLambdaEvent( -// [ -// 'requestContext' => [ -// 'eventType' => 'CONNECT', -// 'connectionId' => 'abc-123', -// ], -// 'queryStringParameters' => [ -// 'appId' => 'pusher-key', -// ], -// 'headers' => [ -// 'origin' => 'https://laravel.com', -// ], -// ] -// )); - -// $this->assertSent('abc-123', 'connection_established', 1); -// })->skip(); - -// it('clears application state between requests', function () { -// $this->subscribe('test-channel'); - -// expect($this->app->make(ConnectionManager::class)->app())->toBeNull(); -// expect($this->app->make(ChannelManager::class)->app())->toBeNull(); -// })->skip(); + (new PingInactiveConnections)->handle( + channelManager() + ); + + $this->assertSent('abc-123', '{"event":"pusher:ping"}', 1); +}); + +it('it can disconnect inactive subscribers', function () { + $this->subscribe('test-channel'); + + $connection = Arr::first(connectionManager()->all()); + $connection->setLastSeenAt(time() - 60 * 10); + connectionManager()->connect($connection); + + (new PingInactiveConnections)->handle( + channelManager() + ); + $this->assertSent('abc-123', '{"event":"pusher:ping"}'); + + (new PruneStaleConnections)->handle( + channelManager() + ); + + // expect(connectionManager()->all())->toHaveCount(0); + expect(channelManager()->find('test-channel')->connections())->toHaveCount(0); + + $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4201,\"message\":\"Pong reply not received in time\"}"}', 1); +}); + +it('can handle a client whisper', function () { + $this->subscribe('test-channel'); + + $this->subscribe('test-channel', connectionId: 'def-456'); + + $this->send([ + 'event' => 'client-start-typing', + 'channel' => 'test-channel', + 'data' => [ + 'id' => 123, + 'name' => 'Joe Dixon', + ], + ], 'abc-123'); + + $this->assertSent('def-456', '{"event":"client-start-typing","channel":"test-channel","data":{"id":123,"name":"Joe Dixon"}}', 1); +}); + +it('can subscribe a connection to multiple channels', function () { + $this->subscribe('test-channel'); + $this->subscribe('test-channel-2'); + $this->subscribe('private-test-channel-3', data: ['foo' => 'bar']); + $this->subscribe('presence-test-channel-4', data: ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]); + + expect(connectionManager()->all())->toHaveCount(1); + expect(channelManager()->all())->toHaveCount(4); + + $connection = $this->managedConnection(); + + collect(channelManager()->all())->each(function ($channel) use ($connection) { + expect($channel->connections())->toHaveCount(1); + expect(collect($channel->connections())->map(fn ($conn, $index) => (string) $index))->toContain($connection->identifier()); + }); +}); + +it('can subscribe multiple connections to multiple channels', function () { + $this->subscribe('test-channel'); + $this->subscribe('test-channel-2'); + $this->subscribe('private-test-channel-3', data: ['foo' => 'bar']); + $this->subscribe('presence-test-channel-4', data: ['user_id' => 1, 'user_info' => ['name' => 'Test User 1']]); + + $this->connect(); + $this->subscribe('test-channel', connectionId: 'def-456'); + $this->subscribe('private-test-channel-3', connectionId: 'def-456', data: ['foo' => 'bar']); + + expect(connectionManager()->all())->toHaveCount(2); + expect(channelManager()->all())->toHaveCount(4); + + expect(channelManager()->find('test-channel')->connections())->toHaveCount(2); + expect(channelManager()->find('test-channel-2')->connections())->toHaveCount(1); + expect(channelManager()->find('private-test-channel-3')->connections())->toHaveCount(2); + expect(channelManager()->find('presence-test-channel-4')->connections())->toHaveCount(1); +}); + +it('fails to subscribe to a private channel with invalid auth signature', function () { + $this->subscribe('private-test-channel', auth: 'invalid-signature'); + + $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4009,\"message\":\"Connection is unauthorized\"}"}'); +}); + +it('fails to subscribe to a presence channel with invalid auth signature', function () { + $this->subscribe('presence-test-channel', auth: 'invalid-signature'); + + $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4009,\"message\":\"Connection is unauthorized\"}"}'); +}); + +it('fails to connect when an invalid application is provided', function () { + App::make(Server::class) + ->handle(Request::fromLambdaEvent( + [ + 'requestContext' => [ + 'eventType' => 'CONNECT', + 'connectionId' => 'abc-123', + ], + 'queryStringParameters' => [ + 'appId' => 'invalid-app-id', + ], + ] + )); + + $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4001,\"message\":\"Application does not exist\"}"}'); +}); + +it('cannot connect from an invalid origin', function () { + $this->app['config']->set('reverb.apps.apps.0.allowed_origins', ['https://laravel.com']); + + App::make(Server::class) + ->handle(Request::fromLambdaEvent( + [ + 'requestContext' => [ + 'eventType' => 'CONNECT', + 'connectionId' => 'abc-123', + ], + 'queryStringParameters' => [ + 'appId' => 'pusher-key', + ], + ] + )); + + $this->assertSent('abc-123', '{"event":"pusher:error","data":"{\"code\":4009,\"message\":\"Origin not allowed\"}"}', 1); +}); + +it('can connect from a valid origin', function () { + $this->app['config']->set('reverb.apps.0.allowed_origins', ['laravel.com']); + + App::make(Server::class) + ->handle(Request::fromLambdaEvent( + [ + 'requestContext' => [ + 'eventType' => 'CONNECT', + 'connectionId' => 'abc-123', + ], + 'queryStringParameters' => [ + 'appId' => 'pusher-key', + ], + 'headers' => [ + 'origin' => 'https://laravel.com', + ], + ] + )); + + $this->assertSent('abc-123', 'connection_established', 1); +}); + +it('clears application state between requests', function () { + $this->subscribe('test-channel'); + + expect($this->app->make(ChannelManager::class)->app())->toBeNull(); +})->todo(); diff --git a/tests/Pest.php b/tests/Pest.php index 0f3b627d..c5690b6e 100644 --- a/tests/Pest.php +++ b/tests/Pest.php @@ -5,6 +5,7 @@ use Laravel\Reverb\Application; use Laravel\Reverb\Contracts\ApplicationProvider; use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Managers\Connections; use Laravel\Reverb\Servers\Reverb\ChannelConnection; use Laravel\Reverb\Tests\Connection; @@ -49,6 +50,14 @@ function validAuth(string $connectionId, string $channel, string $data = null): /** * Return the connection manager. */ +function connectionManager(): ConnectionManager +{ + return App::make(ConnectionManager::class); +} + +/** + * Return the channel manager. + */ function channelManager(Application $app = null): ChannelManager { return App::make(ChannelManager::class) diff --git a/tests/Unit/Channels/CacheChannelTest.php b/tests/Unit/Channels/CacheChannelTest.php index bf2a26e9..31d949bf 100644 --- a/tests/Unit/Channels/CacheChannelTest.php +++ b/tests/Unit/Channels/CacheChannelTest.php @@ -8,6 +8,8 @@ beforeEach(function () { $this->connection = new Connection(); $this->channelConnectionManager = Mockery::spy(ChannelConnectionManager::class); + $this->channelConnectionManager->shouldReceive('for') + ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); }); diff --git a/tests/Unit/Channels/ChannelTest.php b/tests/Unit/Channels/ChannelTest.php index 42483501..fd65492f 100644 --- a/tests/Unit/Channels/ChannelTest.php +++ b/tests/Unit/Channels/ChannelTest.php @@ -7,6 +7,8 @@ beforeEach(function () { $this->connection = new Connection(); $this->channelConnectionManager = Mockery::spy(ChannelConnectionManager::class); + $this->channelConnectionManager->shouldReceive('for') + ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); }); diff --git a/tests/Unit/Channels/PresenceCacheChannelTest.php b/tests/Unit/Channels/PresenceCacheChannelTest.php index f81363c8..845bc735 100644 --- a/tests/Unit/Channels/PresenceCacheChannelTest.php +++ b/tests/Unit/Channels/PresenceCacheChannelTest.php @@ -9,6 +9,8 @@ beforeEach(function () { $this->connection = new Connection(); $this->channelConnectionManager = Mockery::spy(ChannelConnectionManager::class); + $this->channelConnectionManager->shouldReceive('for') + ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); }); diff --git a/tests/Unit/Channels/PresenceChannelTest.php b/tests/Unit/Channels/PresenceChannelTest.php index 74fd0162..c2cf7e10 100644 --- a/tests/Unit/Channels/PresenceChannelTest.php +++ b/tests/Unit/Channels/PresenceChannelTest.php @@ -9,6 +9,8 @@ beforeEach(function () { $this->connection = new Connection(); $this->channelConnectionManager = Mockery::spy(ChannelConnectionManager::class); + $this->channelConnectionManager->shouldReceive('for') + ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); }); diff --git a/tests/Unit/Channels/PrivateCacheChannelTest.php b/tests/Unit/Channels/PrivateCacheChannelTest.php index d68f4d7e..ee491c93 100644 --- a/tests/Unit/Channels/PrivateCacheChannelTest.php +++ b/tests/Unit/Channels/PrivateCacheChannelTest.php @@ -8,6 +8,8 @@ beforeEach(function () { $this->connection = new Connection(); $this->channelConnectionManager = Mockery::spy(ChannelConnectionManager::class); + $this->channelConnectionManager->shouldReceive('for') + ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); }); diff --git a/tests/Unit/Channels/PrivateChannelTest.php b/tests/Unit/Channels/PrivateChannelTest.php index 7f6a836c..5720ab9b 100644 --- a/tests/Unit/Channels/PrivateChannelTest.php +++ b/tests/Unit/Channels/PrivateChannelTest.php @@ -8,6 +8,8 @@ beforeEach(function () { $this->connection = new Connection(); $this->channelConnectionManager = Mockery::spy(ChannelConnectionManager::class); + $this->channelConnectionManager->shouldReceive('for') + ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); }); diff --git a/tests/Unit/ClientEventTest.php b/tests/Unit/ClientEventTest.php index fac3eb51..d3c2b80f 100644 --- a/tests/Unit/ClientEventTest.php +++ b/tests/Unit/ClientEventTest.php @@ -8,6 +8,8 @@ beforeEach(function () { $this->connection = new Connection; $this->channelConnectionManager = Mockery::spy(ChannelConnectionManager::class); + $this->channelConnectionManager->shouldReceive('for') + ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); }); diff --git a/tests/Unit/EventTest.php b/tests/Unit/EventTest.php index 0f2a7fad..099bd4af 100644 --- a/tests/Unit/EventTest.php +++ b/tests/Unit/EventTest.php @@ -21,6 +21,8 @@ it('can broadcast an event directly when publishing disabled', function () { $channelConnectionManager = Mockery::mock(ChannelConnectionManager::class); + $channelConnectionManager->shouldReceive('for') + ->andReturn($channelConnectionManager); $channelConnectionManager->shouldReceive('all')->once() ->andReturn([]); @@ -31,6 +33,8 @@ it('can broadcast an event for multiple channels', function () { $channelConnectionManager = Mockery::mock(ChannelConnectionManager::class); + $channelConnectionManager->shouldReceive('for') + ->andReturn($channelConnectionManager); $channelConnectionManager->shouldReceive('all')->twice() ->andReturn([]);