From 963b42268929ef734adbbe7d5168eaa83f7f379e Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Wed, 6 Dec 2023 14:46:55 +0000 Subject: [PATCH 1/3] wip --- src/Contracts/ChannelManager.php | 7 +++++- src/Event.php | 5 ++++- src/Managers/ArrayChannelManager.php | 26 ++++++++++++++++------ src/Managers/CacheChannelManager.php | 33 ++++++++++++++++++---------- src/Pusher/Event.php | 4 ++-- 5 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index bdfdf243..e916d8b1 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -25,7 +25,12 @@ public function all(): array; /** * Find the given channel. */ - public function find(string $channel): Channel; + public function find(string $channel): ?Channel; + + /** + * Find the given channel or create it if it doesn't exist. + */ + public function findOrCreate(string $channel): Channel; /** * Get all the connections for the given channels. diff --git a/src/Event.php b/src/Event.php index 4aec1af8..32438586 100644 --- a/src/Event.php +++ b/src/Event.php @@ -36,7 +36,10 @@ public static function dispatchSynchronously(Application $app, array $payload, ? foreach ($channels as $channel) { unset($payload['channels']); - $channel = app(ChannelManager::class)->for($app)->find($channel); + if (! $channel = app(ChannelManager::class)->for($app)->find($channel)) { + continue; + } + $payload['channel'] = $channel->name(); $channel->broadcast($payload, $connection); diff --git a/src/Managers/ArrayChannelManager.php b/src/Managers/ArrayChannelManager.php index 5dc279ab..a7aa101f 100644 --- a/src/Managers/ArrayChannelManager.php +++ b/src/Managers/ArrayChannelManager.php @@ -48,11 +48,27 @@ public function all(): array /** * Find the given channel */ - public function find(string $channel): Channel + public function find(string $channel): ?Channel { return $this->channels($channel); } + /** + * Find the given channel or create it if it doesn't exist. + */ + public function findOrCreate(string $channelName): Channel + { + if ($channel = $this->find($channelName)) { + return $channel; + } + + $channel = ChannelBroker::create($channelName); + + $this->applications[$this->application->id()][$channel->name()] = $channel; + + return $channel; + } + /** * Get all the connections for the given channels. * @@ -98,18 +114,14 @@ public function channel(string $channel): Channel * * @return \Laravel\Reverb\Channels\Channel|array */ - public function channels(?string $channel = null): Channel|array + public function channels(?string $channel = null): Channel|array|null { if (! isset($this->applications[$this->application->id()])) { $this->applications[$this->application->id()] = []; } if ($channel) { - if (! isset($this->applications[$this->application->id()][$channel])) { - $this->applications[$this->application->id()][$channel] = ChannelBroker::create($channel); - } - - return $this->applications[$this->application->id()][$channel]; + return $this->applications[$this->application->id()][$channel] ?? null; } return $this->applications[$this->application->id()]; diff --git a/src/Managers/CacheChannelManager.php b/src/Managers/CacheChannelManager.php index 5b31bb5b..7fc059f9 100644 --- a/src/Managers/CacheChannelManager.php +++ b/src/Managers/CacheChannelManager.php @@ -48,11 +48,28 @@ public function all(): array /** * Find the given channel */ - public function find(string $channel): Channel + public function find(string $channel): ?Channel { return $this->channels($channel); } + /** + * Find the given channel or create it if it doesn't exist. + */ + public function findOrCreate(string $channelName): Channel + { + if ($channel = $this->channels($channelName)) { + return $channel; + } + + $channels = $this->repository->get($this->prefix, []); + $channel = ChannelBroker::create($channelName); + $channels[$this->application->id()][$$channel->name()] = serialize($channel); + $this->repository->forever($this->prefix, $channels); + + return $channel; + } + /** * Get all the connections for the given channels. * @@ -102,7 +119,7 @@ public function channel(string $channel): Channel * * @return \Laravel\Reverb\Channels\Channel|array */ - public function channels(?string $channel = null): Channel|array + public function channels(?string $channel = null): Channel|array|null { $channels = $this->repository->get($this->prefix, []); @@ -111,15 +128,9 @@ public function channels(?string $channel = null): Channel|array } if ($channel) { - if (! isset($channels[$this->application->id()][$channel])) { - $channel = ChannelBroker::create($channel); - $channels[$this->application->id()][$channel->name()] = serialize($channel); - $this->repository->forever($this->prefix, $channels); - - return $channel; - } - - return unserialize($channels[$this->application->id()][$channel]); + return isset($channels[$this->application->id()][$channel]) + ? unserialize($channels[$this->application->id()][$channel]) + : null; } return array_map('unserialize', $channels[$this->application->id()] ?: []); diff --git a/src/Pusher/Event.php b/src/Pusher/Event.php index 297bb1f6..5a45136b 100644 --- a/src/Pusher/Event.php +++ b/src/Pusher/Event.php @@ -54,7 +54,7 @@ public function subscribe(Connection $connection, string $channel, ?string $auth { $channel = $this->channels ->for($connection->app()) - ->find($channel); + ->findOrCreate($channel); $channel->subscribe($connection, $auth, $data); @@ -69,7 +69,7 @@ public function unsubscribe(Connection $connection, string $channel): void $channel = $this->channels ->for($connection->app()) ->find($channel) - ->unsubscribe($connection); + ?->unsubscribe($connection); } /** From 64480bd4a35430a480cd1d87d35897659a1f4526 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Wed, 6 Dec 2023 16:09:42 +0000 Subject: [PATCH 2/3] keep channels lean --- src/Managers/CacheChannelManager.php | 2 +- .../InteractsWithChannelInformation.php | 26 +++++++++++++++---- .../Controllers/ChannelUsersController.php | 6 ++++- .../Controllers/EventsBatchController.php | 2 +- .../Feature/Reverb/ChannelControllerTest.php | 2 +- .../Reverb/ChannelUsersControllerTest.php | 7 ++++- .../Reverb/EventsBatchControllerTest.php | 8 ++++-- tests/Feature/Reverb/ServerTest.php | 5 +++- tests/Unit/ClientEventTest.php | 1 + tests/Unit/EventTest.php | 5 ++++ tests/Unit/Managers/ChannelManagerTest.php | 14 +++++----- 11 files changed, 58 insertions(+), 20 deletions(-) diff --git a/src/Managers/CacheChannelManager.php b/src/Managers/CacheChannelManager.php index 7fc059f9..c98c04fb 100644 --- a/src/Managers/CacheChannelManager.php +++ b/src/Managers/CacheChannelManager.php @@ -64,7 +64,7 @@ public function findOrCreate(string $channelName): Channel $channels = $this->repository->get($this->prefix, []); $channel = ChannelBroker::create($channelName); - $channels[$this->application->id()][$$channel->name()] = serialize($channel); + $channels[$this->application->id()][$channel->name()] = serialize($channel); $this->repository->forever($this->prefix, $channels); return $channel; diff --git a/src/Pusher/Concerns/InteractsWithChannelInformation.php b/src/Pusher/Concerns/InteractsWithChannelInformation.php index e056197f..a430187a 100644 --- a/src/Pusher/Concerns/InteractsWithChannelInformation.php +++ b/src/Pusher/Concerns/InteractsWithChannelInformation.php @@ -30,21 +30,37 @@ protected function infoForChannels(array $channels, string $info): array protected function info(string $channel, string $info): array { $info = explode(',', $info); + $channel = app(ChannelManager::class)->find($channel); - if (! $channel = app(ChannelManager::class)->find($channel)) { - return []; - } + return array_filter( + $channel ? $this->occupiedInfo($channel, $info) : $this->unoccupiedInfo($info), + fn ($item) => $item !== null + ); + } + /** + * Get the channel information for the given occupied channel. + */ + protected function occupiedInfo(Channel $channel, array $info): array + { $count = count($channel->connections()); - $info = [ + return [ 'occupied' => in_array('occupied', $info) ? $count > 0 : null, 'user_count' => in_array('user_count', $info) && $this->isPresenceChannel($channel) ? $count : null, 'subscription_count' => in_array('subscription_count', $info) && ! $this->isPresenceChannel($channel) ? $count : null, 'cache' => in_array('cache', $info) && $this->isCacheChannel($channel) ? $channel->cachedPayload() : null, ]; + } - return array_filter($info, fn ($item) => $item !== null); + /** + * Get the channel information for the given unoccupied channel. + */ + protected function unoccupiedInfo(array $info): array + { + return [ + 'occupied' => in_array('occupied', $info) ? false : null, + ]; } /** diff --git a/src/Pusher/Http/Controllers/ChannelUsersController.php b/src/Pusher/Http/Controllers/ChannelUsersController.php index 075872cc..36c50a1d 100644 --- a/src/Pusher/Http/Controllers/ChannelUsersController.php +++ b/src/Pusher/Http/Controllers/ChannelUsersController.php @@ -21,6 +21,10 @@ public function __invoke(RequestInterface $request, Connection $connection, stri $channel = $this->channels->find($channel); + if(! $channel) { + return new JsonResponse((object) [], 404); + } + if (! $this->isPresenceChannel($channel)) { return new JsonResponse((object) [], 400); } @@ -30,6 +34,6 @@ public function __invoke(RequestInterface $request, Connection $connection, stri ->map(fn ($data) => ['id' => $data['user_id']]) ->values(); - return new JsonResponse((object) ['users' => $connections]); + return new JsonResponse(['users' => $connections]); } } diff --git a/src/Pusher/Http/Controllers/EventsBatchController.php b/src/Pusher/Http/Controllers/EventsBatchController.php index 3d746611..c713aed8 100644 --- a/src/Pusher/Http/Controllers/EventsBatchController.php +++ b/src/Pusher/Http/Controllers/EventsBatchController.php @@ -48,7 +48,7 @@ public function __invoke(RequestInterface $request, Connection $connection, stri if ($info->some(fn ($item) => count($item) > 0)) { return new JsonResponse( - ['batch' => $info->each(fn ($item) => (object) $item)->all()] + ['batch' => $info->map(fn ($item) => (object) $item)->all()] ); } diff --git a/tests/Feature/Reverb/ChannelControllerTest.php b/tests/Feature/Reverb/ChannelControllerTest.php index 8a26271c..32f30765 100644 --- a/tests/Feature/Reverb/ChannelControllerTest.php +++ b/tests/Feature/Reverb/ChannelControllerTest.php @@ -20,7 +20,7 @@ $response = await($this->signedRequest('channels/test-channel-one?info=user_count,subscription_count,cache')); expect($response->getStatusCode())->toBe(200); - expect($response->getBody()->getContents())->toBe('{"occupied":false,"subscription_count":0}'); + expect($response->getBody()->getContents())->toBe('{"occupied":false}'); }); it('can return cache channel attributes', function () { diff --git a/tests/Feature/Reverb/ChannelUsersControllerTest.php b/tests/Feature/Reverb/ChannelUsersControllerTest.php index d9cbbc56..60d21125 100644 --- a/tests/Feature/Reverb/ChannelUsersControllerTest.php +++ b/tests/Feature/Reverb/ChannelUsersControllerTest.php @@ -11,13 +11,18 @@ uses(ReverbTestCase::class); it('returns an error when presence channel not provided', function () { + subscribe('test-channel'); await($this->signedRequest('channels/test-channel/users')); })->throws(ResponseException::class); +it('returns an error when unoccupied channel provided', function () { + await($this->signedRequest('channels/presence-test-channel/users')); +})->throws(ResponseException::class); + it('returns the user data', function () { $channel = app(ChannelManager::class) ->for(app()->make(ApplicationProvider::class)->findByKey('pusher-key')) - ->find('presence-test-channel'); + ->findOrCreate('presence-test-channel'); $channel->subscribe($connection = new FakeConnection('test-connection-one'), validAuth($connection->id(), 'presence-test-channel', $data = json_encode(['user_id' => 1, 'user_info' => ['name' => 'Taylor']])), $data); $channel->subscribe($connection = new FakeConnection('test-connection-two'), validAuth($connection->id(), 'presence-test-channel', $data = json_encode(['user_id' => 2, 'user_info' => ['name' => 'Joe']])), $data); $channel->subscribe($connection = new FakeConnection('test-connection-three'), validAuth($connection->id(), 'presence-test-channel', $data = json_encode(['user_id' => 3, 'user_info' => ['name' => 'Jess']])), $data); diff --git a/tests/Feature/Reverb/EventsBatchControllerTest.php b/tests/Feature/Reverb/EventsBatchControllerTest.php index 9b6ebcee..33ef7a92 100644 --- a/tests/Feature/Reverb/EventsBatchControllerTest.php +++ b/tests/Feature/Reverb/EventsBatchControllerTest.php @@ -38,6 +38,9 @@ }); it('can receive an event batch trigger with multiple events and return info for each', function () { + subscribe('presence-test-channel'); + subscribe('test-channel-two'); + subscribe('test-channel-three'); $response = await($this->signedPostRequest('batch_events', ['batch' => [ [ 'name' => 'NewEvent', @@ -60,10 +63,11 @@ ]])); expect($response->getStatusCode())->toBe(200); - expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":0},{"subscription_count":0},{"subscription_count":0}]}'); + expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":1},{"subscription_count":1},{"subscription_count":1}]}'); }); it('can receive an event batch trigger with multiple events and return info for some', function () { + subscribe('presence-test-channel'); $response = await($this->signedPostRequest('batch_events', ['batch' => [ [ 'name' => 'NewEvent', @@ -79,5 +83,5 @@ ]])); expect($response->getStatusCode())->toBe(200); - expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":0},[]]}'); + expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":1},{}]}'); }); diff --git a/tests/Feature/Reverb/ServerTest.php b/tests/Feature/Reverb/ServerTest.php index ba3c5430..3e825bc2 100644 --- a/tests/Feature/Reverb/ServerTest.php +++ b/tests/Feature/Reverb/ServerTest.php @@ -98,6 +98,7 @@ it('can receive a cached message when joining a cache channel', function () { $connection = connect(); + subscribe('cache-test-channel'); $this->triggerEvent( 'cache-test-channel', @@ -112,6 +113,7 @@ it('can receive a cached message when joining a private cache channel', function () { $connection = connect(); + subscribe('private-cache-test-channel'); $this->triggerEvent( 'private-cache-test-channel', @@ -126,6 +128,7 @@ it('can receive a cached message when joining a presence cache channel', function () { $connection = connect(); + subscribe('presence-cache-test-channel'); $this->triggerEvent( 'presence-cache-test-channel', @@ -224,7 +227,7 @@ (new PruneStaleConnections)->handle(channels()); - expect(channels()->find('test-channel')->connections())->toHaveCount(0); + expect(channels()->find('test-channel'))->toBeNull(); $connection->assertReceived('{"event":"pusher:ping"}'); $connection->assertReceived('{"event":"pusher:error","data":"{\"code\":4201,\"message\":\"Pong reply not received in time\"}"}'); diff --git a/tests/Unit/ClientEventTest.php b/tests/Unit/ClientEventTest.php index 9700b58f..bdf2355b 100644 --- a/tests/Unit/ClientEventTest.php +++ b/tests/Unit/ClientEventTest.php @@ -11,6 +11,7 @@ $this->channelConnectionManager->shouldReceive('for') ->andReturn($this->channelConnectionManager); $this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager); + channels()->findOrCreate('test-channel'); }); it('can forward a client message', function () { diff --git a/tests/Unit/EventTest.php b/tests/Unit/EventTest.php index 892bc711..30e85d4a 100644 --- a/tests/Unit/EventTest.php +++ b/tests/Unit/EventTest.php @@ -27,6 +27,8 @@ $this->app->instance(ChannelConnectionManager::class, $channelConnectionManager); + channels()->findOrCreate('test-channel'); + Event::dispatch(app(ApplicationProvider::class)->findByKey('pusher-key'), ['channel' => 'test-channel']); }); @@ -39,5 +41,8 @@ $this->app->instance(ChannelConnectionManager::class, $channelConnectionManager); + channels()->findOrCreate('test-channel-one'); + channels()->findOrCreate('test-channel-two'); + Event::dispatch(app(ApplicationProvider::class)->findByKey('pusher-key'), ['channels' => ['test-channel-one', 'test-channel-two']]); }); diff --git a/tests/Unit/Managers/ChannelManagerTest.php b/tests/Unit/Managers/ChannelManagerTest.php index 8b7642a6..6a740339 100644 --- a/tests/Unit/Managers/ChannelManagerTest.php +++ b/tests/Unit/Managers/ChannelManagerTest.php @@ -7,7 +7,7 @@ $this->connection = new FakeConnection; $this->channelManager = $this->app->make(ChannelManager::class) ->for($this->connection->app()); - $this->channel = $this->channelManager->find('test-channel-0'); + $this->channel = $this->channelManager->findOrCreate('test-channel-0'); }); it('can subscribe to a channel', function () { @@ -29,7 +29,7 @@ it('can get all channels', function () { $channels = collect(['test-channel-1', 'test-channel-2', 'test-channel-3']); - $channels->each(fn ($channel) => $this->channelManager->find($channel)->subscribe($this->connection)); + $channels->each(fn ($channel) => $this->channelManager->findOrCreate($channel)->subscribe($this->connection)); foreach ($this->channelManager->all() as $index => $channel) { expect($channel->name())->toBe($index); @@ -46,10 +46,10 @@ ->toBeIn(array_keys($this->channel->connections()))); }); -it('can unsubscribe a connection for all channels', function () { +it('can unsubscribe a connection from all channels', function () { $channels = collect(['test-channel-0', 'test-channel-1', 'test-channel-2']); - $channels->each(fn ($channel) => $this->channelManager->find($channel)->subscribe($this->connection)); + $channels->each(fn ($channel) => $this->channelManager->findOrCreate($channel)->subscribe($this->connection)); collect($this->channelManager->all())->each(fn ($channel) => expect($channel->connections())->toHaveCount(1)); @@ -72,9 +72,9 @@ it('can get all connections for all channels', function () { $connections = factory(12); - $channelOne = $this->channelManager->find('test-channel-0'); - $channelTwo = $this->channelManager->find('test-channel-1'); - $channelThree = $this->channelManager->find('test-channel-2'); + $channelOne = $this->channelManager->findOrCreate('test-channel-0'); + $channelTwo = $this->channelManager->findOrCreate('test-channel-1'); + $channelThree = $this->channelManager->findOrCreate('test-channel-2'); $connections = collect($connections)->split(3); From 2265a9a1f4931135acf7736acf31750cc8650c71 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Wed, 6 Dec 2023 16:09:53 +0000 Subject: [PATCH 3/3] formatting --- src/Pusher/Http/Controllers/ChannelUsersController.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Pusher/Http/Controllers/ChannelUsersController.php b/src/Pusher/Http/Controllers/ChannelUsersController.php index 36c50a1d..97618e03 100644 --- a/src/Pusher/Http/Controllers/ChannelUsersController.php +++ b/src/Pusher/Http/Controllers/ChannelUsersController.php @@ -21,7 +21,7 @@ public function __invoke(RequestInterface $request, Connection $connection, stri $channel = $this->channels->find($channel); - if(! $channel) { + if (! $channel) { return new JsonResponse((object) [], 404); }