From f213c3831045ce2fae8408b516ca1961b1b67d12 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Fri, 24 Nov 2023 12:27:31 +0000 Subject: [PATCH] wip --- src/Channels/Channel.php | 28 ++++++++----------- src/Contracts/Connection.php | 5 ---- src/Jobs/PruneStaleConnections.php | 2 -- src/Output.php | 19 ------------- src/Server.php | 18 ++---------- .../ApiGateway/Jobs/SendToConnection.php | 4 +-- .../Reverb/Console/Commands/StartServer.php | 5 +--- src/ServiceProvider.php | 4 --- 8 files changed, 16 insertions(+), 69 deletions(-) delete mode 100644 src/Output.php diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 7d951e06..fd6af2bb 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -6,7 +6,6 @@ use Illuminate\Support\Arr; use Laravel\Reverb\Contracts\ChannelConnectionManager; use Laravel\Reverb\Contracts\Connection; -use React\Promise\Promise; class Channel { @@ -90,24 +89,21 @@ public function broadcast(array $payload, Connection $except = null): void $chunks = array_chunk($this->connections(), 100); foreach ($chunks as $connections) { - new Promise(function ($resolve, $reject) use ($connections, $message, $except) { + foreach ($connections as $connection) { + if ($except && $except->id() === $connection->connection()->id()) { + break; + } + + if (isset($payload['except']) && $payload['except'] === $connection->connection()->id()) { + break; + } + try { - foreach ($connections as $connection) { - if ($except && $except->id() === $connection->connection()->id()) { - break; - } - - if (isset($payload['except']) && $payload['except'] === $connection->connection()->id()) { - break; - } - - $connection->send($message); - $resolve(); - } + $connection->send($message); } catch (Exception $e) { - $reject($e); + // } - }); + } } } diff --git a/src/Contracts/Connection.php b/src/Contracts/Connection.php index 502dc6be..7a6cd8fe 100644 --- a/src/Contracts/Connection.php +++ b/src/Contracts/Connection.php @@ -3,8 +3,6 @@ namespace Laravel\Reverb\Contracts; use Laravel\Reverb\Application; -use Laravel\Reverb\Output; -use Laravel\Reverb\Pusher\Event as PusherEvent; abstract class Connection { @@ -27,7 +25,6 @@ public function __construct( protected ?string $origin ) { $this->lastSeenAt = time(); - // $this->pusher = new PusherEvent; } /** @@ -74,8 +71,6 @@ public function ping(): void $this->hasBeenPinged = true; // $this->pusher->ping($this); - - // Output::info('Connection Pinged', $this->id()); } /** diff --git a/src/Jobs/PruneStaleConnections.php b/src/Jobs/PruneStaleConnections.php index 2bb68773..1384ae42 100644 --- a/src/Jobs/PruneStaleConnections.php +++ b/src/Jobs/PruneStaleConnections.php @@ -5,7 +5,6 @@ use Illuminate\Foundation\Bus\Dispatchable; use Laravel\Reverb\Contracts\ApplicationProvider; use Laravel\Reverb\Contracts\ChannelManager; -use Laravel\Reverb\Output; class PruneStaleConnections { @@ -33,7 +32,6 @@ public function handle(ChannelManager $channels): void ])); $connection->disconnect(); - // Output::info('Connection Pruned', $connection->id()); } }); } diff --git a/src/Output.php b/src/Output.php deleted file mode 100644 index f13ad174..00000000 --- a/src/Output.php +++ /dev/null @@ -1,19 +0,0 @@ -touch(); $this->pusher->handle($connection, 'pusher:connection_established'); - - // Output::info('Connection Established', $connection->id()); } catch (Exception $e) { $this->error($connection, $e); } @@ -40,11 +38,6 @@ public function open(Connection $connection): void */ public function message(Connection $from, string $message): void { - // Output::info('Message Received', $from->id()); - // Output::message($message); - - $from->touch(); - $event = json_decode($message, true); try { @@ -58,10 +51,11 @@ public function message(Connection $from, string $message): void default => ClientEvent::handle($from, $event) }; - // Output::info('Message Handled', $from->id()); } catch (Exception $e) { $this->error($from, $e); } + + $from->touch(); } /** @@ -74,8 +68,6 @@ public function close(Connection $connection): void ->unsubscribeFromAll($connection); $connection->disconnect(); - - // Output::info('Connection Closed', $connection->id()); } /** @@ -86,9 +78,6 @@ public function error(Connection $connection, Exception $exception): void if ($exception instanceof PusherException) { $connection->send(json_encode($exception->payload())); - // Output::error('Message from '.$connection->id().' resulted in a pusher error'); - // Output::info($exception->getMessage()); - return; } @@ -99,9 +88,6 @@ public function error(Connection $connection, Exception $exception): void 'message' => 'Invalid message format', ]), ])); - - // Output::error('Message from '.$connection->id().' resulted in an unknown error'); - // Output::info($exception->getMessage()); } /** diff --git a/src/Servers/ApiGateway/Jobs/SendToConnection.php b/src/Servers/ApiGateway/Jobs/SendToConnection.php index 451e1962..37f39e7c 100644 --- a/src/Servers/ApiGateway/Jobs/SendToConnection.php +++ b/src/Servers/ApiGateway/Jobs/SendToConnection.php @@ -6,7 +6,6 @@ use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Support\Facades\Config; -use Laravel\Reverb\Output; use Throwable; class SendToConnection implements ShouldQueue @@ -37,8 +36,7 @@ public function handle(): void 'Data' => $this->message, ]); } catch (Throwable $e) { - // Output::error('Unable to send message.'); - // Output::info($e->getMessage()); + // } } } diff --git a/src/Servers/Reverb/Console/Commands/StartServer.php b/src/Servers/Reverb/Console/Commands/StartServer.php index 155f5931..5768699d 100644 --- a/src/Servers/Reverb/Console/Commands/StartServer.php +++ b/src/Servers/Reverb/Console/Commands/StartServer.php @@ -6,7 +6,6 @@ use Laravel\Reverb\Concerns\InteractsWithAsyncRedis; use Laravel\Reverb\Jobs\PingInactiveConnections; use Laravel\Reverb\Jobs\PruneStaleConnections; -use Laravel\Reverb\Output; use Laravel\Reverb\Servers\Reverb\Factory as ServerFactory; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; @@ -58,11 +57,9 @@ public function handle(): void */ protected function scheduleCleanup(LoopInterface $loop): void { - $loop->addPeriodicTimer(60, function () { - // Output::info('Pruning Stale Connections'); + $loop->addPeriodicTimer(300, function () { PruneStaleConnections::dispatch(); - // Output::info('Pinging Inactive Connections'); PingInactiveConnections::dispatch(); }); } diff --git a/src/ServiceProvider.php b/src/ServiceProvider.php index 5850046c..64d9bc85 100644 --- a/src/ServiceProvider.php +++ b/src/ServiceProvider.php @@ -5,9 +5,7 @@ use Illuminate\Support\ServiceProvider as BaseServiceProvider; use Laravel\Reverb\Contracts\ChannelConnectionManager; use Laravel\Reverb\Contracts\ChannelManager; -use Laravel\Reverb\Contracts\Logger; use Laravel\Reverb\Contracts\ServerProvider; -use Laravel\Reverb\Loggers\NullLogger; class ServiceProvider extends BaseServiceProvider { @@ -56,7 +54,5 @@ public function registerServer() ChannelConnectionManager::class, fn () => $server->buildChannelConnectionManager() ); - - $this->app->instance(Logger::class, new NullLogger); } }