diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index aa9d80cb..c4d9d2de 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -4,16 +4,23 @@ use Exception; use Illuminate\Support\Arr; -use Illuminate\Support\Facades\App; use Laravel\Reverb\Application; -use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\ChannelConnectionManager; use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Output; class Channel { + /** + * The channel connections. + * + * @var \Laravel\Reverb\Contracts\ChannelConnectionManager + */ + protected $connections; + public function __construct(protected string $name) { + $this->connections = app(ChannelConnectionManager::class); } /** @@ -24,14 +31,20 @@ public function name(): string return $this->name; } + /** + * Get all connections for the channel. + */ + public function connections(): array + { + return $this->connections->all(); + } + /** * Subscribe to the given channel. */ public function subscribe(Connection $connection, string $auth = null, string $data = null): void { - App::make(ChannelManager::class) - ->for($connection->app()) - ->subscribe($this, $connection, $data ? json_decode($data, true) : []); + $this->connections->add($connection, $data ? json_decode($data, true) : []); } /** @@ -39,9 +52,7 @@ public function subscribe(Connection $connection, string $auth = null, string $d */ public function unsubscribe(Connection $connection): void { - App::make(ChannelManager::class) - ->for($connection->app()) - ->unsubscribe($this, $connection); + $this->connections->remove($connection); } /** @@ -49,7 +60,7 @@ public function unsubscribe(Connection $connection): void */ public function broadcast(Application $app, array $payload, Connection $except = null): void { - collect(App::make(ChannelManager::class)->for($app)->connections($this)) + collect($this->connections()) ->each(function ($connection) use ($payload, $except) { if ($except && $except->id() === $connection->id()) { return; diff --git a/src/Concerns/EnsuresIntegrity.php b/src/Concerns/EnsuresIntegrity.php deleted file mode 100644 index 19a44b7a..00000000 --- a/src/Concerns/EnsuresIntegrity.php +++ /dev/null @@ -1,45 +0,0 @@ -repository->getStore(), 'lock')) { - return $callback(); - } - - return $this->repository->lock($this->mutexKey(), $timeout) - ->block($timeout, function () use ($callback) { - return $callback(); - }); - } - - /** - * Get the mutex category key. - */ - protected function key(): string - { - return 'mutex'; - } - - /** - * Get the mutex key. - */ - protected function mutexKey(): string - { - return "{$this->key()}:mutex"; - } -} diff --git a/src/Contracts/ChannelConnectionManager.php b/src/Contracts/ChannelConnectionManager.php new file mode 100644 index 00000000..0c670775 --- /dev/null +++ b/src/Contracts/ChannelConnectionManager.php @@ -0,0 +1,26 @@ + + * Unsubscribe from all channels. */ - public function connections(Channel $channel): array; + public function unsubscribeFromAll(Connection $connection): void; /** * Flush the channel manager repository. diff --git a/src/Event.php b/src/Event.php index 78d71cad..3d40cf8e 100644 --- a/src/Event.php +++ b/src/Event.php @@ -4,7 +4,7 @@ use Illuminate\Support\Arr; use Illuminate\Support\Facades\App; -use Laravel\Reverb\Channels\ChannelBroker; +use Laravel\Reverb\Contracts\ChannelManager; use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Contracts\ServerProvider; @@ -38,7 +38,7 @@ public static function dispatchSynchronously(Application $app, array $payload, C foreach ($channels as $channel) { unset($payload['channels']); - $channel = ChannelBroker::create($channel); + $channel = app(ChannelManager::class)->find($channel); $payload['channel'] = $channel->name(); $channel->broadcast($app, $payload, $connection); diff --git a/src/Managers/ArrayChannelConnectionManager.php b/src/Managers/ArrayChannelConnectionManager.php new file mode 100644 index 00000000..5118653c --- /dev/null +++ b/src/Managers/ArrayChannelConnectionManager.php @@ -0,0 +1,48 @@ +> + */ + protected $connections = []; + + /** + * Add a connection. + */ + public function add(Connection $connection): void + { + $this->connections[$connection->identifier()] = $connection; + } + + /** + * Remove a connection. + */ + public function remove(Connection $connection): void + { + unset($this->connections[$connection->identifier()]); + } + + /** + * Get all the connections. + */ + public function all(): array + { + return $this->connections; + } + + /** + * Flush the channel connection manager. + */ + public function flush(): void + { + $this->connections = []; + } +} diff --git a/src/Managers/ArrayChannelManager.php b/src/Managers/ArrayChannelManager.php new file mode 100644 index 00000000..003e6fb9 --- /dev/null +++ b/src/Managers/ArrayChannelManager.php @@ -0,0 +1,108 @@ +>> + */ + protected $applications = []; + + /** + * The appliation instance. + * + * @var \Laravel\Reverb\Application + */ + protected $application; + + /** + * Get the application instance. + */ + public function app(): ?Application + { + return $this->application; + } + + /** + * Get all the channels. + */ + public function all(): Collection + { + return collect($this->channels()); + } + + /** + * Find the given channel + */ + public function find(string $channel): Channel + { + return $this->channels($channel); + } + + /** + * Unsubscribe from all channels. + */ + public function unsubscribeFromAll(Connection $connection): void + { + foreach ($this->channels() as $channel) { + $channel->unsubscribe($connection); + } + } + + /** + * Get the given channel. + */ + public function channel(string $channel): array + { + return $this->channels($channel); + } + + /** + * Get the channels. + */ + public function channels(string $channel = null): array|Channel + { + if (! isset($this->applications[$this->application->id()])) { + $this->applications[$this->application->id()] = []; + } + + $channels = $this->applications[$this->application->id()]; + + if ($channel) { + if (! isset($channels[$channel])) { + $this->applications[$this->application->id()][$channel] = ChannelBroker::create($channel); + } + + return $this->applications[$this->application->id()][$channel]; + } + + return $channels ?: []; + } + + /** + * Flush the channel manager repository. + */ + public function flush(): void + { + App::make(ApplicationProvider::class) + ->all() + ->each(function (Application $application) { + $this->applications[$application->id()] = []; + }); + } +} diff --git a/src/Managers/ConnectionManager.php b/src/Managers/ArrayConnectionManager.php similarity index 96% rename from src/Managers/ConnectionManager.php rename to src/Managers/ArrayConnectionManager.php index e20d57de..23c4828f 100644 --- a/src/Managers/ConnectionManager.php +++ b/src/Managers/ArrayConnectionManager.php @@ -3,7 +3,6 @@ namespace Laravel\Reverb\Managers; use Closure; -use Illuminate\Contracts\Cache\Repository; use Illuminate\Support\Facades\App; use Laravel\Reverb\Application; use Laravel\Reverb\Concerns\InteractsWithApplications; @@ -11,7 +10,7 @@ use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Contracts\ConnectionManager as ConnectionManagerInterface; -class ConnectionManager implements ConnectionManagerInterface +class ArrayConnectionManager implements ConnectionManagerInterface { use InteractsWithApplications; diff --git a/src/Managers/ChannelManager.php b/src/Managers/ChannelManager.php deleted file mode 100644 index 6b57a03b..00000000 --- a/src/Managers/ChannelManager.php +++ /dev/null @@ -1,126 +0,0 @@ ->> - */ - protected $connections = []; - - /** - * The appliation instance. - * - * @var \Laravel\Reverb\Application - */ - protected $application; - - /** - * Get the application instance. - */ - public function app(): ?Application - { - return $this->application; - } - - /** - * Subscribe to a channel. - */ - public function subscribe(Channel $channel, Connection $connection, $data = []): void - { - $this->connections[$this->application->id()][$channel->name()][$connection->id()] = $connection; - } - - /** - * Unsubscribe from a channel. - */ - public function unsubscribe(Channel $channel, Connection $connection): void - { - unset($this->connections[$this->application->id()][$channel->name()][$connection->id()]); - } - - /** - * Get all the channels. - */ - public function all(): Collection - { - return $this->channels()->map(function ($connections, $name) { - return ChannelBroker::create($name); - }); - } - - /** - * Unsubscribe from all channels. - */ - public function unsubscribeFromAll(Connection $connection): void - { - $this->channels()->each(function ($connections, $name) use ($connection) { - ChannelBroker::create($name)->unsubscribe($connection); - }); - } - - /** - * Get all connections for the given channel. - * - * @return - */ - public function connections(Channel $channel): array - { - return $this->connections[$this->application->id()][$channel->name()] ?? []; - } - - /** - * Get the given channel from the cache. - */ - public function channel(Channel $channel): Collection - { - return $this->channels($channel); - } - - /** - * Get the channels from the cache. - */ - public function channels(Channel $channel = null): Collection - { - if (! isset($this->connections[$this->application->id()])) { - $this->connections[$this->application->id()] = []; - } - - $channels = $this->connections[$this->application->id()]; - - if ($channel) { - return collect($channels[$channel->name()] ?? []); - } - - return collect($channels ?: []); - } - - /** - * Flush the channel manager repository. - */ - public function flush(): void - { - App::make(ApplicationProvider::class) - ->all() - ->each(function (Application $application) { - $this->connections[$application->id()] = []; - }); - } -} diff --git a/src/Pusher/Http/Controllers/ChannelController.php b/src/Pusher/Http/Controllers/ChannelController.php index b9ff126b..5bb54024 100644 --- a/src/Pusher/Http/Controllers/ChannelController.php +++ b/src/Pusher/Http/Controllers/ChannelController.php @@ -2,7 +2,6 @@ namespace Laravel\Reverb\Pusher\Http\Controllers; -use Laravel\Reverb\Channels\ChannelBroker; use Laravel\Reverb\Http\Connection; use Psr\Http\Message\RequestInterface; use Symfony\Component\HttpFoundation\JsonResponse; @@ -16,7 +15,7 @@ class ChannelController extends Controller public function handle(RequestInterface $request, Connection $connection, ...$args): Response { $info = explode(',', $this->query['info'] ?? ''); - $connections = $this->channels->channel(ChannelBroker::create($args['channel'])); + $connections = $this->channels->find($args['channel'])->connections(); $totalConnections = count($connections); return new JsonResponse((object) array_filter([ diff --git a/src/Pusher/Http/Controllers/ChannelUsersController.php b/src/Pusher/Http/Controllers/ChannelUsersController.php index d0db13cf..5d4115fe 100644 --- a/src/Pusher/Http/Controllers/ChannelUsersController.php +++ b/src/Pusher/Http/Controllers/ChannelUsersController.php @@ -2,7 +2,6 @@ namespace Laravel\Reverb\Pusher\Http\Controllers; -use Laravel\Reverb\Channels\ChannelBroker; use Laravel\Reverb\Channels\PresenceChannel; use Laravel\Reverb\Http\Connection; use Psr\Http\Message\RequestInterface; @@ -16,7 +15,7 @@ class ChannelUsersController extends Controller */ public function handle(RequestInterface $request, Connection $connection, ...$args): Response { - $channel = ChannelBroker::create($args['channel']); + $channel = $this->channels->find($args['channel']); if (! $channel instanceof PresenceChannel) { return new JsonResponse((object) [], 400); diff --git a/src/Pusher/Http/Controllers/ChannelsController.php b/src/Pusher/Http/Controllers/ChannelsController.php index 04c5b3d1..334b98d6 100644 --- a/src/Pusher/Http/Controllers/ChannelsController.php +++ b/src/Pusher/Http/Controllers/ChannelsController.php @@ -15,15 +15,15 @@ class ChannelsController extends Controller */ public function handle(RequestInterface $request, Connection $connection, ...$args): Response { - $channels = $this->channels->channels(); + $channels = $this->channels->all(); $info = explode(',', $this->query['info'] ?? ''); if (isset($this->query['filter_by_prefix'])) { - $channels = $channels->filter(fn ($connections, $name) => Str::startsWith($name, $this->query['filter_by_prefix'])); + $channels = $channels->filter(fn ($channel) => Str::startsWith($channel->name(), $this->query['filter_by_prefix'])); } - $channels = $channels->mapWithKeys(function ($connections, $name) use ($info) { - return [$name => array_filter(['user_count' => in_array('user_count', $info) ? count($connections) : null])]; + $channels = $channels->mapWithKeys(function ($channel) use ($info) { + return [$channel->name() => array_filter(['user_count' => in_array('user_count', $info) ? count($channel->connections()) : null])]; }); return new JsonResponse((object) ['channels' => $channels]); diff --git a/src/Pusher/Http/Controllers/EventsBatchController.php b/src/Pusher/Http/Controllers/EventsBatchController.php index 523a89d2..7fe357c8 100644 --- a/src/Pusher/Http/Controllers/EventsBatchController.php +++ b/src/Pusher/Http/Controllers/EventsBatchController.php @@ -2,7 +2,6 @@ namespace Laravel\Reverb\Pusher\Http\Controllers; -use Laravel\Reverb\Channels\ChannelBroker; use Laravel\Reverb\Event; use Laravel\Reverb\Http\Connection; use Psr\Http\Message\RequestInterface; @@ -46,7 +45,7 @@ public function handle(RequestInterface $request, Connection $connection, ...$ar protected function getInfo(string $channel, string $info): array { $info = explode(',', $info); - $count = count($this->channels->connections(ChannelBroker::create($channel))); + $count = count($this->channels->find($channel)->connections()); $info = [ 'user_count' => in_array('user_count', $info) ? $count : null, 'subscription_count' => in_array('subscription_count', $info) ? $count : null, diff --git a/src/Pusher/Http/Controllers/EventsController.php b/src/Pusher/Http/Controllers/EventsController.php index 79474e30..36057731 100644 --- a/src/Pusher/Http/Controllers/EventsController.php +++ b/src/Pusher/Http/Controllers/EventsController.php @@ -3,7 +3,6 @@ namespace Laravel\Reverb\Pusher\Http\Controllers; use Illuminate\Support\Arr; -use Laravel\Reverb\Channels\ChannelBroker; use Laravel\Reverb\Event; use Laravel\Reverb\Http\Connection; use Psr\Http\Message\RequestInterface; @@ -50,7 +49,7 @@ protected function getInfo(array $channels, string $info): array $info = explode(',', $info); $channels = collect($channels)->mapWithKeys(function ($channel) use ($info) { - $count = count($this->channels->connections(ChannelBroker::create($channel))); + $count = count($this->channels->find($channel)->connections()); $info = [ 'user_count' => in_array('user_count', $info) ? $count : null, 'subscription_count' => in_array('subscription_count', $info) ? $count : null, diff --git a/src/PusherEvent.php b/src/PusherEvent.php index 1a1f2967..0c9510b0 100644 --- a/src/PusherEvent.php +++ b/src/PusherEvent.php @@ -4,7 +4,7 @@ use Exception; use Illuminate\Support\Str; -use Laravel\Reverb\Channels\ChannelBroker; +use Laravel\Reverb\Contracts\ChannelManager; use Laravel\Reverb\Contracts\Connection; class PusherEvent @@ -45,7 +45,9 @@ public static function acknowledge(Connection $connection): void */ public static function subscribe(Connection $connection, string $channel, string $auth = null, string $data = null): void { - $channel = ChannelBroker::create($channel); + $channel = app(ChannelManager::class) + ->for($connection->app()) + ->find($channel); $channel->subscribe($connection, $auth, $data); @@ -57,7 +59,9 @@ public static function subscribe(Connection $connection, string $channel, string */ public static function unsubscribe(Connection $connection, string $channel): void { - ChannelBroker::create($channel) + $channel = app(ChannelManager::class) + ->for($connection->app()) + ->find($channel) ->unsubscribe($connection); } diff --git a/src/Server.php b/src/Server.php index d39978f9..463a00ad 100644 --- a/src/Server.php +++ b/src/Server.php @@ -4,20 +4,12 @@ use Exception; use Illuminate\Support\Str; -use Laravel\Reverb\Contracts\ChannelManager; use Laravel\Reverb\Contracts\Connection; -use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Exceptions\InvalidOrigin; use Laravel\Reverb\Exceptions\PusherException; class Server { - public function __construct( - protected ConnectionManager $connections, - protected ChannelManager $channels - ) { - } - /** * Handle the a client connection. */ diff --git a/src/Servers/Reverb/ReverbProvider.php b/src/Servers/Reverb/ReverbProvider.php index 1c485da0..99b1b845 100644 --- a/src/Servers/Reverb/ReverbProvider.php +++ b/src/Servers/Reverb/ReverbProvider.php @@ -7,12 +7,14 @@ use Illuminate\Console\Application as Artisan; use Illuminate\Contracts\Foundation\Application; use Laravel\Reverb\Concerns\InteractsWithAsyncRedis; -use Laravel\Reverb\Contracts\ChannelManager as ChannelManagerInterface; -use Laravel\Reverb\Contracts\ConnectionManager as ConnectionManagerInterface; +use Laravel\Reverb\Contracts\ChannelConnectionManager; +use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Contracts\ServerProvider; use Laravel\Reverb\Event; -use Laravel\Reverb\Managers\ChannelManager; -use Laravel\Reverb\Managers\ConnectionManager; +use Laravel\Reverb\Managers\ArrayChannelConnectionManager; +use Laravel\Reverb\Managers\ArrayChannelManager; +use Laravel\Reverb\Managers\ArrayConnectionManager; use Laravel\Reverb\Servers\Reverb\Console\Commands\StartServer; use React\EventLoop\LoopInterface; @@ -97,16 +99,24 @@ public function withPublishing(): void /** * Build the connection manager for the server. */ - public function buildConnectionManager(): ConnectionManagerInterface + public function buildConnectionManager(): ConnectionManager { - return new ConnectionManager; + return new ArrayConnectionManager; } /** * Build the channel manager for the server. */ - public function buildChannelManager(): ChannelManagerInterface + public function buildChannelManager(): ChannelManager { - return new ChannelManager; + return new ArrayChannelManager; + } + + /** + * Build the channel manager for the server. + */ + public function buildChannelConnectionManager(): ChannelConnectionManager + { + return new ArrayChannelConnectionManager; } } diff --git a/src/ServiceProvider.php b/src/ServiceProvider.php index 917c4bc0..22b9f8f1 100644 --- a/src/ServiceProvider.php +++ b/src/ServiceProvider.php @@ -3,6 +3,7 @@ namespace Laravel\Reverb; use Illuminate\Support\ServiceProvider as BaseServiceProvider; +use Laravel\Reverb\Contracts\ChannelConnectionManager; use Laravel\Reverb\Contracts\ChannelManager; use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Contracts\Logger; @@ -57,6 +58,11 @@ public function registerServer() fn () => $server->buildChannelManager() ); + $this->app->bind( + ChannelConnectionManager::class, + fn () => $server->buildChannelConnectionManager() + ); + $this->app->instance(Logger::class, new NullLogger); } } diff --git a/tests/Feature/Ratchet/UsersTerminateControllerTest.php b/tests/Feature/Ratchet/UsersTerminateControllerTest.php index c1a8ab59..45e7b51d 100644 --- a/tests/Feature/Ratchet/UsersTerminateControllerTest.php +++ b/tests/Feature/Ratchet/UsersTerminateControllerTest.php @@ -1,7 +1,6 @@ subscribe('test-channel-two'); expect($connections = connectionManager()->all())->toHaveCount(3); - expect(channelManager()->connections(ChannelBroker::create('test-channel-one')))->toHaveCount(2); - expect(channelManager()->connections(ChannelBroker::create('test-channel-two')))->toHaveCount(2); + expect(channelManager()->all()->get('test-channel-one')->connections())->toHaveCount(2); + expect(channelManager()->all()->get('test-channel-two')->connections())->toHaveCount(2); $connection = Arr::first($connections); @@ -32,6 +31,6 @@ $this->assertSame(200, $response->getStatusCode()); $this->assertSame('{}', $response->getBody()->getContents()); expect($connections = connectionManager()->all())->toHaveCount(2); - expect(channelManager()->connections(ChannelBroker::create('test-channel-one')))->toHaveCount(1); - expect(channelManager()->connections(ChannelBroker::create('test-channel-two')))->toHaveCount(1); + expect(channelManager()->all()->get('test-channel-one')->connections())->toHaveCount(1); + expect(channelManager()->all()->get('test-channel-two')->connections())->toHaveCount(1); });