diff --git a/composer.json b/composer.json index 8edf0e8a..6122cb67 100644 --- a/composer.json +++ b/composer.json @@ -17,22 +17,24 @@ "require": { "php": "^8.2", "aws/aws-sdk-php": "^3.241", - "cboden/ratchet": "^0.4.4", "clue/redis-react": "^2.6", + "guzzlehttp/psr7": "^2.6", "illuminate/cache": "^10.0", "illuminate/console": "^10.0", "illuminate/http": "^10.0", "illuminate/redis": "^10.0", "illuminate/support": "^10.0", - "nesbot/carbon": "^2.64" + "nesbot/carbon": "^2.64", + "ratchet/rfc6455": "^0.3.1", + "react/socket": "^1.14", + "symfony/http-foundation": "^6.3" }, "require-dev": { "orchestra/testbench": "^8.0", "pestphp/pest": "^2.0", "phpstan/phpstan": "^1.10", "ratchet/pawl": "^0.4.1", - "react/async": "^4.0", - "react/http": "^1.8" + "react/async": "^4.0" }, "autoload": { "psr-4": { @@ -66,4 +68,4 @@ }, "minimum-stability": "dev", "prefer-stable": true -} \ No newline at end of file +} diff --git a/config/reverb.php b/config/reverb.php index 3d7b8f27..ae77f839 100644 --- a/config/reverb.php +++ b/config/reverb.php @@ -11,10 +11,10 @@ | messages received from and when sending messages to connected | clients. You must specify one of the servers listed below. | - | Supported: "ratchet", "api_gateway" + | Supported: "reverb", "api_gateway" */ - 'default' => env('REVERB_SERVER', 'ratchet'), + 'default' => env('REVERB_SERVER', 'reverb'), /* |-------------------------------------------------------------------------- @@ -28,15 +28,15 @@ 'servers' => [ - 'ratchet' => [ - 'host' => env('REVERB_RATCHET_HOST', '127.0.0.1'), - 'port' => env('REVERB_RATCHET_PORT', 8080), + 'reverb' => [ + 'host' => env('REVERB_HOST', '127.0.0.1'), + 'port' => env('REVERB_PORT', 8080), 'connection_manager' => [ - 'prefix' => env('REVERB_RATCHET_CONNECTION_CACHE_PREFIX', 'reverb'), + 'prefix' => env('REVERB_CONNECTION_CACHE_PREFIX', 'reverb'), ], 'publish_events' => [ - 'enabled' => env('REVERB_RATCHET_SCALING_ENABLED', false), - 'channel' => env('REVERB_RATCHET_SCALING_CHANNEL', 'reverb'), + 'enabled' => env('REVERB_SCALING_ENABLED', false), + 'channel' => env('REVERB_SCALING_CHANNEL', 'reverb'), ], ], diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index 8b0fae42..a247f3a6 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -6,8 +6,8 @@ use Illuminate\Support\Arr; use Illuminate\Support\Facades\App; use Laravel\Reverb\Application; -use Laravel\Reverb\Connection; use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Output; class Channel @@ -49,9 +49,7 @@ public function unsubscribe(Connection $connection): void */ public function broadcast(Application $app, array $payload, Connection $except = null): void { - App::make(ChannelManager::class) - ->for($app) - ->connections($this) + collect(App::make(ChannelManager::class)->for($app)->connections($this)) ->each(function ($connection) use ($payload, $except) { if ($except && $except->identifier() === $connection->identifier()) { return; diff --git a/src/Channels/PresenceChannel.php b/src/Channels/PresenceChannel.php index 84835813..6c686b57 100644 --- a/src/Channels/PresenceChannel.php +++ b/src/Channels/PresenceChannel.php @@ -4,8 +4,8 @@ use Illuminate\Support\Facades\App; use Laravel\Reverb\Application; -use Laravel\Reverb\Connection; use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\Connection; class PresenceChannel extends PrivateChannel { diff --git a/src/Channels/PrivateChannel.php b/src/Channels/PrivateChannel.php index 5cfd6ade..6aa33427 100644 --- a/src/Channels/PrivateChannel.php +++ b/src/Channels/PrivateChannel.php @@ -3,7 +3,7 @@ namespace Laravel\Reverb\Channels; use Illuminate\Support\Str; -use Laravel\Reverb\Connection; +use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Exceptions\ConnectionUnauthorized; class PrivateChannel extends Channel diff --git a/src/ClientEvent.php b/src/ClientEvent.php index 3a123b57..52caebf5 100644 --- a/src/ClientEvent.php +++ b/src/ClientEvent.php @@ -3,6 +3,7 @@ namespace Laravel\Reverb; use Illuminate\Support\Str; +use Laravel\Reverb\Contracts\Connection; class ClientEvent { diff --git a/src/Concerns/ClosesConnections.php b/src/Concerns/ClosesConnections.php new file mode 100644 index 00000000..e51a4f08 --- /dev/null +++ b/src/Concerns/ClosesConnections.php @@ -0,0 +1,21 @@ +send(Message::toString($response)); + $connection->close(); + } +} diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php deleted file mode 100644 index 2c837100..00000000 --- a/src/Console/Commands/StartServer.php +++ /dev/null @@ -1,51 +0,0 @@ -option('server') ?: config('reverb.default'); - - return match ($server) { - 'ratchet' => $this->call('ratchet:start', [ - '--host' => $this->option('host'), - '--port' => $this->option('port'), - ]), - default => $this->invalidServer($server), - }; - } - - /** - * Inform the user that the server type is invalid. - */ - protected function invalidServer(string $server): int - { - $this->error("Invalid server: {$server}."); - - return 1; - } -} diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index 1a23c7c3..4b08447a 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -5,7 +5,6 @@ use Illuminate\Support\Collection; use Laravel\Reverb\Application; use Laravel\Reverb\Channels\Channel; -use Laravel\Reverb\Connection; use Laravel\Reverb\Managers\Connections; interface ChannelManager @@ -40,17 +39,12 @@ public function all(): Collection; */ public function unsubscribeFromAll(Connection $connection): void; - /** - * Get all connection keys for the given channel. - */ - public function connectionKeys(Channel $channel): Collection; - /** * Get all connections for the given channel. * - * @return \Laravel\Reverb\Managers\Connections|\Laravel\Reverb\Connection[]|string[] + * @return */ - public function connections(Channel $channel): Connections; + public function connections(Channel $channel): array; /** * Flush the channel manager repository. diff --git a/src/Connection.php b/src/Contracts/Connection.php similarity index 95% rename from src/Connection.php rename to src/Contracts/Connection.php index e24bb4d7..ff73ba5b 100644 --- a/src/Connection.php +++ b/src/Contracts/Connection.php @@ -1,12 +1,12 @@ */ - public function all(): Connections; - - /** - * Synchronize the connections with the manager. - * - * @param \Laravel\Reverb\Managers\Connections|\Laravel\Reverb\Connection[]|string[] $connections - */ - public function sync(Connections $connections): void; + public function all(): array; /** * Synchronize a connection with the manager. */ - public function syncConnection(Connection $connection): void; + public function save(Connection $connection): void; /** * Flush the channel manager repository. diff --git a/src/Event.php b/src/Event.php index 5da9f7ae..1f83213a 100644 --- a/src/Event.php +++ b/src/Event.php @@ -4,6 +4,7 @@ use Illuminate\Support\Facades\App; use Laravel\Reverb\Channels\ChannelBroker; +use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Contracts\ServerProvider; class Event diff --git a/src/Http/Connection.php b/src/Http/Connection.php new file mode 100644 index 00000000..fe7988b0 --- /dev/null +++ b/src/Http/Connection.php @@ -0,0 +1,125 @@ +id = (int) $connection->stream; + } + + /** + * Return the connection ID. + */ + public function id(): int + { + return $this->id; + } + + /** + * Mark the connection as connected. + */ + public function connect(): void + { + $this->connected = true; + } + + /** + * Determine whether the connection is connected. + */ + public function isConnected(): bool + { + return $this->connected; + } + + /** + * Get the HTTP message buffer. + */ + public function buffer(): string + { + return $this->buffer; + } + + /** + * Determine whether the connection has an HTTP message buffer set. + */ + public function hasBuffer() + { + return $this->buffer !== ''; + } + + /** + * Return the HTTP message buffer length. + */ + public function bufferLength() + { + return strlen($this->buffer); + } + + /** + * Append to the HTTP message buffer. + */ + public function appendToBuffer($message) + { + $this->buffer .= $message; + } + + /** + * Clear the HTTP message buffer. + */ + public function clearBuffer() + { + $this->buffer = ''; + } + + /** + * Send a message to the connection. + */ + public function send($data) + { + $this->connection->write($data); + + return $this; + } + + /** + * Close the connection. + */ + public function close() + { + $this->connection->end(); + + return $this; + } + + /** + * Dynamically proxy method calls to the underlying connection. + */ + public function __call($method, $parameters) + { + if (! method_exists($this->connection, $method)) { + throw new BadMethodCallException("Method [{$method}] does not exist on [".get_class($this->connection).'].'); + } + + return $this->connection->{$method}(...$parameters); + } +} diff --git a/src/Http/Controllers/EventController.php b/src/Http/Controllers/EventController.php deleted file mode 100644 index 1a15941e..00000000 --- a/src/Http/Controllers/EventController.php +++ /dev/null @@ -1,66 +0,0 @@ -getBody()->getContents(), true); - - Event::dispatch($this->application($request), [ - 'event' => $payload['name'], - 'channel' => $payload['channel'], - 'data' => $payload['data'], - ]); - - tap($conn)->send(new JsonResponse((object) []))->close(); - } - - /** - * Handle a new message received by the connected client. - * - * @param string $message - */ - public function onMessage(ConnectionInterface $from, $message): void - { - // - } - - /** - * Handle a client disconnection. - */ - public function onClose(ConnectionInterface $connection): void - { - // - } - - /** - * Handle an error. - */ - public function onError(ConnectionInterface $connection, \Exception $e): void - { - // - } - - /** - * Get the application instance for the request. - */ - protected function application(RequestInterface $request): Application - { - parse_str($request->getUri()->getQuery(), $queryString); - - return app(ApplicationProvider::class)->findById($queryString['appId']); - } -} diff --git a/src/Http/Controllers/StatsController.php b/src/Http/Controllers/StatsController.php deleted file mode 100644 index 8bf1ad4c..00000000 --- a/src/Http/Controllers/StatsController.php +++ /dev/null @@ -1,49 +0,0 @@ -getUri()->getQuery(), $queryString); - $app = App::make(ApplicationProvider::class)->findById($queryString['appId']); - - tap($conn)->send(new JsonResponse((object) [ - 'connections' => App::make(ConnectionManager::class)->for($app)->all()->count(), - 'channels' => App::make(ChannelManager::class)->for($app)->all()->map(function ($channel) use ($app) { - return [ - 'name' => $channel->name(), - 'connections' => App::make(ChannelManager::class) - ->for($app) - ->connectionKeys($channel) - ->count(), - ]; - }), - ]))->close(); - } - - public function onMessage(ConnectionInterface $from, $message) - { - // - } - - public function onClose(ConnectionInterface $connection) - { - // - } - - public function onError(ConnectionInterface $connection, \Exception $e) - { - // - } -} diff --git a/src/Http/Request.php b/src/Http/Request.php new file mode 100644 index 00000000..fd2b6077 --- /dev/null +++ b/src/Http/Request.php @@ -0,0 +1,49 @@ +appendToBuffer($message); + + if ($connection->bufferLength() > static::MAX_SIZE) { + throw new OverflowException('Maximum HTTP buffer size of '.static::MAX_SIZE.'exceeded.'); + } + + if (static::isEndOfMessage($buffer = $connection->buffer())) { + $connection->clearBuffer(); + + return Message::parseRequest($buffer); + } + + return null; + } + + /** + * Determine if the message has been buffered as per the HTTP specification + */ + protected static function isEndOfMessage(string $message): bool + { + return (bool) strpos($message, static::EOM); + } +} diff --git a/src/Http/Route.php b/src/Http/Route.php new file mode 100644 index 00000000..e98c462c --- /dev/null +++ b/src/Http/Route.php @@ -0,0 +1,101 @@ +route = new BaseRoute($path); + } + + /** + * Create a new `GET` route. + */ + public static function get(string $path, callable $action): BaseRoute + { + return static::route($path, 'GET', $action); + } + + /** + * Create a new `POST` route. + */ + public static function post($path, callable $action) + { + return static::route($path, 'POST', $action); + } + + /** + * Create a new `PUT` route. + */ + public static function put($path, callable $action) + { + return static::route($path, 'PUT', $action); + } + + /** + * Create a new `PATCH` route. + */ + public static function patch($path, callable $action) + { + return static::route($path, 'PATCH', $action); + } + + /** + * Create a new `DELETE` route. + */ + public static function delete($path, callable $action) + { + return static::route($path, 'DELETE', $action); + } + + /** + * Create a new `HEAD` route. + */ + public static function head($path, callable $action) + { + return static::route($path, 'HEAD', $action); + } + + /** + * Create a new `CONNECT` route. + */ + public static function connect($path, callable $action) + { + return static::route($path, 'CONNECT', $action); + } + + /** + * Create a new `OPTIONS` route. + */ + public static function options($path, callable $action) + { + return static::route($path, 'OPTIONS', $action); + } + + /** + * Create a new `TRACE` route. + */ + public static function trace($path, callable $action) + { + return static::route($path, 'TRACE', $action); + } + + /** + * Create a new route. + */ + protected static function route(string $path, string|array $methods, callable $action) + { + $route = (new static($path)) + ->methods(Arr::wrap($methods)) + ->controller($action); + + return $route->route; + } +} diff --git a/src/Http/Router.php b/src/Http/Router.php new file mode 100644 index 00000000..2fa7160d --- /dev/null +++ b/src/Http/Router.php @@ -0,0 +1,69 @@ +getUri(); + $context = $this->matcher->getContext(); + $context->setMethod($request->getMethod()); + $context->setHost($uri->getHost()); + + if ($this->isWebSocketRequest($request)) { + $connection = $this->attemptUpgrade($request, $connection); + } + + try { + $route = $this->matcher->match($uri->getPath()); + } catch (MethodNotAllowedException $e) { + return $this->close($connection, 405, ['Allow' => $e->getAllowedMethods()]); + } catch (ResourceNotFoundException $e) { + return $this->close($connection, 404); + } + + return $route['_controller']($request, $connection, ...Arr::except($route, ['_controller', '_route'])); + } + + /** + * Determine whether the request is for a WebSocket connection. + */ + protected function isWebSocketRequest(RequestInterface $request): bool + { + return $request->getHeader('Upgrade')[0] ?? null === 'websocket'; + } + + /** + * Negotiate the WebSocket connection upgrade. + */ + protected function attemptUpgrade(RequestInterface $request, Connection $connection): WsConnection + { + $negotiator = new ServerNegotiator(new RequestVerifier); + $response = $negotiator->handshake($request); + + $connection->write(Message::toString($response)); + + return new WsConnection($connection); + } +} diff --git a/src/Http/Server.php b/src/Http/Server.php new file mode 100644 index 00000000..efa1666b --- /dev/null +++ b/src/Http/Server.php @@ -0,0 +1,77 @@ +loop = $loop ?: Loop::get(); + + $socket->on('connection', $this); + } + + public function __invoke(ConnectionInterface $connection) + { + $connection = new Connection($connection); + + $connection->on('data', function ($data) use ($connection) { + $this->handleRequest($data, $connection); + }); + // $connection->on('close', function () use ($connection) { + // $this->handleEnd($conn); + // }); + // $conn->on('error', function (\Exception $e) use ($conn) { + // $this->handleError($e, $conn); + // }); + } + + /** + * Start the Http server + */ + public function start(): void + { + $this->loop->run(); + } + + /** + * Handle an incoming request. + */ + protected function handleRequest(string $message, Connection $connection): void + { + if ($connection->isConnected()) { + return; + } + + if (($request = $this->createRequest($message, $connection)) === null) { + return; + } + + $connection->connect(); + + $this->router->dispatch($request, $connection); + } + + /** + * Create a Psr7 request from the incoming message. + */ + protected function createRequest(string $message, Connection $connection): RequestInterface + { + try { + return Request::from($message, $connection); + } catch (OverflowException $e) { + $this->close($connection, 413); + } + } +} diff --git a/src/Jobs/PingInactiveConnections.php b/src/Jobs/PingInactiveConnections.php index f52cf9a0..a2ec881c 100644 --- a/src/Jobs/PingInactiveConnections.php +++ b/src/Jobs/PingInactiveConnections.php @@ -18,16 +18,13 @@ public function handle(ConnectionManager $connections): void app(ApplicationProvider::class) ->all() ->each(function ($application) use ($connections) { - $connections - ->for($application) - ->all() - ->each(function ($connection) use ($connections) { + collect($connections->for($application)->all()) + ->each(function ($connection) { if ($connection->isActive()) { return; } $connection->ping(); - $connections->syncConnection($connection); }); }); } diff --git a/src/Jobs/PruneStaleConnections.php b/src/Jobs/PruneStaleConnections.php index 784e92f9..81963629 100644 --- a/src/Jobs/PruneStaleConnections.php +++ b/src/Jobs/PruneStaleConnections.php @@ -19,9 +19,7 @@ public function handle(ConnectionManager $connections): void app(ApplicationProvider::class) ->all() ->each(function ($application) use ($connections) { - $connections - ->for($application) - ->all() + collect($connections->for($application)->all()) ->each(function ($connection) { if (! $connection->isStale()) { return; diff --git a/src/Managers/ChannelManager.php b/src/Managers/ChannelManager.php index da7be7d1..5e1d9bc0 100644 --- a/src/Managers/ChannelManager.php +++ b/src/Managers/ChannelManager.php @@ -10,15 +10,21 @@ use Laravel\Reverb\Channels\ChannelBroker; use Laravel\Reverb\Concerns\EnsuresIntegrity; use Laravel\Reverb\Concerns\InteractsWithApplications; -use Laravel\Reverb\Connection; use Laravel\Reverb\Contracts\ApplicationProvider; use Laravel\Reverb\Contracts\ChannelManager as ChannelManagerInterface; -use Laravel\Reverb\Contracts\ConnectionManager; +use Laravel\Reverb\Contracts\Connection; class ChannelManager implements ChannelManagerInterface { use EnsuresIntegrity, InteractsWithApplications; + /** + * Connection store. + * + * @var array>> + */ + protected $connections = []; + /** * The appliation instance. * @@ -26,13 +32,6 @@ class ChannelManager implements ChannelManagerInterface */ protected $application; - public function __construct( - protected Repository $repository, - protected ConnectionManager $connections, - protected $prefix = 'reverb' - ) { - } - /** * Get the application instance. */ @@ -46,12 +45,7 @@ public function app(): ?Application */ public function subscribe(Channel $channel, Connection $connection, $data = []): void { - $this->mutex(function () use ($channel, $connection, $data) { - $connections = $this->connectionKeys($channel) - ->put($connection->identifier(), $data); - - $this->syncConnections($channel, $connections); - }); + $this->connections[$this->application->id()][$channel->name()][$connection->identifier()] = $connection; } /** @@ -59,12 +53,7 @@ public function subscribe(Channel $channel, Connection $connection, $data = []): */ public function unsubscribe(Channel $channel, Connection $connection): void { - $this->mutex(function () use ($channel, $connection) { - $connections = $this->connectionKeys($channel) - ->reject(fn ($data, $identifier) => (string) $identifier === $connection->identifier()); - - $this->syncConnections($channel, $connections); - }); + unset($this->connections[$this->application->id()][$channel->name()][$connection->identifier()]); } /** @@ -87,53 +76,14 @@ public function unsubscribeFromAll(Connection $connection): void }); } - /** - * Get all connection keys for the given channel. - */ - public function connectionKeys(Channel $channel): Collection - { - return $this->channel($channel); - } - /** * Get all connections for the given channel. * - * @return \Laravel\Reverb\Managers\Connections|\Laravel\Reverb\Connection[]|string[] - */ - public function connections(Channel $channel): Connections - { - return $this->connections - ->for($this->application) - ->all() - ->intersectByKeys( - $this->connectionKeys($channel) - ); - } - - /** - * Sync the connections for a channel. + * @return */ - protected function syncConnections(Channel $channel, Collection $connections): void + public function connections(Channel $channel): array { - $channels = $this->channels(); - - $channels[$channel->name()] = $connections; - - $this->repository->forever($this->key(), $channels); - } - - /** - * Get the key for the channels. - */ - protected function key(): string - { - $key = $this->prefix; - - if ($this->application) { - $key .= ":{$this->application->id()}"; - } - - return $key.':channels'; + return $this->connections[$this->application->id()][$channel->name()] ?? []; } /** @@ -149,7 +99,7 @@ protected function channel(Channel $channel): Collection */ protected function channels(Channel $channel = null): Collection { - $channels = $this->repository->get($this->key(), []); + $channels = $this->connections[$this->application->id()]; if ($channel) { return collect($channels[$channel->name()] ?? []); @@ -158,18 +108,6 @@ protected function channels(Channel $channel = null): Collection return collect($channels ?: []); } - /** - * Get the data stored for a connection. - */ - public function data(Channel $channel, Connection $connection): array - { - if (! $data = $this->connectionKeys($channel)->get($connection->identifier())) { - return []; - } - - return (array) $data; - } - /** * Flush the channel manager repository. */ @@ -178,8 +116,7 @@ public function flush(): void App::make(ApplicationProvider::class) ->all() ->each(function (Application $application) { - $this->for($application); - $this->repository->forget($this->key()); + $this->connections[$application->id()] = []; }); } } diff --git a/src/Managers/ConnectionManager.php b/src/Managers/ConnectionManager.php index 9fac3877..429c97a7 100644 --- a/src/Managers/ConnectionManager.php +++ b/src/Managers/ConnectionManager.php @@ -6,17 +6,21 @@ use Illuminate\Contracts\Cache\Repository; use Illuminate\Support\Facades\App; use Laravel\Reverb\Application; -use Laravel\Reverb\Concerns\EnsuresIntegrity; use Laravel\Reverb\Concerns\InteractsWithApplications; -use Laravel\Reverb\Connection; use Laravel\Reverb\Contracts\ApplicationProvider; +use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Contracts\ConnectionManager as ConnectionManagerInterface; class ConnectionManager implements ConnectionManagerInterface { - use EnsuresIntegrity, InteractsWithApplications; + use InteractsWithApplications; - protected $connections; + /** + * Connection store. + * + * @var array> + */ + protected $connections = []; /** * The appliation instance. @@ -25,12 +29,6 @@ class ConnectionManager implements ConnectionManagerInterface */ protected $application; - public function __construct( - protected Repository $repository, - protected $prefix = 'reverb' - ) { - } - /** * Get the application instance. */ @@ -46,7 +44,7 @@ public function connect(Connection $connection): Connection { $connection->touch(); - $this->syncConnection($connection); + $this->save($connection); return $connection; } @@ -68,11 +66,7 @@ public function reconnect(string $identifier): ?Connection */ public function disconnect(string $identifier): void { - $connections = $this->all(); - - $this->sync( - $connections->reject(fn ($connection, $id) => (string) $id === $identifier) - ); + unset($this->connections[$this->application->id()][$identifier]); } /** @@ -92,62 +86,25 @@ public function resolve(string $identifier, Closure $newConnection): Connection */ public function find(string $identifier): ?Connection { - if ($connection = $this->all()->find($identifier)) { - return $connection; - } - - return null; + return $this->connections[$this->application->id()][$identifier] ?? null; } /** * Get all of the connections from the cache. * - * @return \Laravel\Reverb\Managers\Connections|\Laravel\Reverb\Connection[]|string[] - */ - public function all(): Connections - { - return $this->mutex(function () { - return $this->repository->get($this->key()) ?: new Connections; - }); - } - - /** - * Synchronize the connections with the manager. - * - * @param \Laravel\Reverb\Managers\Connections|\Laravel\Reverb\Connection[]|string[] $connections + * @return array */ - public function sync(Connections $connections): void + public function all(): array { - $this->mutex(function () use ($connections) { - $this->repository->forever($this->key(), $connections); - }); + return $this->connections[$this->application->id()] ?? []; } /** * Synchronize a connection with the manager. */ - public function syncConnection(Connection $connection): void - { - $connections = $this->all()->put( - $connection->identifier(), - Connection::dehydrate($connection) - ); - - $this->sync($connections); - } - - /** - * Get the key for the channels. - */ - protected function key(): string + public function save(Connection $connection): void { - $key = $this->prefix; - - if ($this->application) { - $key .= ":{$this->application->id()}"; - } - - return $key .= ':connections'; + $this->connections[$this->application->id()][$connection->identifier()] = $connection; } /** @@ -158,8 +115,7 @@ public function flush(): void App::make(ApplicationProvider::class) ->all() ->each(function (Application $application) { - $this->for($application); - $this->repository->forget($this->key()); + $this->connections[$application->id()] = []; }); } } diff --git a/src/Managers/Connections.php b/src/Managers/Connections.php index b7e0436b..e6701855 100644 --- a/src/Managers/Connections.php +++ b/src/Managers/Connections.php @@ -3,7 +3,7 @@ namespace Laravel\Reverb\Managers; use Illuminate\Support\Collection; -use Laravel\Reverb\Connection; +use Laravel\Reverb\Contracts\Connection; class Connections extends Collection { diff --git a/src/PusherEvent.php b/src/PusherEvent.php index 113e5913..1a1f2967 100644 --- a/src/PusherEvent.php +++ b/src/PusherEvent.php @@ -5,6 +5,7 @@ use Exception; use Illuminate\Support\Str; use Laravel\Reverb\Channels\ChannelBroker; +use Laravel\Reverb\Contracts\Connection; class PusherEvent { diff --git a/src/Server.php b/src/Server.php index 0a34fc6f..d39978f9 100644 --- a/src/Server.php +++ b/src/Server.php @@ -5,6 +5,7 @@ use Exception; use Illuminate\Support\Str; use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Exceptions\InvalidOrigin; use Laravel\Reverb\Exceptions\PusherException; diff --git a/src/ServerManager.php b/src/ServerManager.php index 02867638..a5747f8f 100644 --- a/src/ServerManager.php +++ b/src/ServerManager.php @@ -5,7 +5,7 @@ use Illuminate\Contracts\Foundation\Application; use Illuminate\Support\Manager; use Laravel\Reverb\Servers\ApiGateway\ApiGatewayProvider; -use Laravel\Reverb\Servers\Ratchet\RatchetProvider; +use Laravel\Reverb\Servers\Reverb\ReverbProvider; class ServerManager extends Manager { @@ -19,17 +19,17 @@ public function __construct(protected Application $app) */ public function getDefaultDriver(): string { - return $this->config->get('reverb.default', 'ratchet'); + return $this->config->get('reverb.default', 'reverb'); } /** - * Creates the Ratchet driver. + * Creates the Reverb driver. */ - public function createRatchetDriver(): RatchetProvider + public function createReverbDriver(): ReverbProvider { - return new RatchetProvider( + return new ReverbProvider( $this->app, - $this->config->get('reverb.servers.ratchet', []) + $this->config->get('reverb.servers.reverb', []) ); } diff --git a/src/Servers/ApiGateway/Connection.php b/src/Servers/ApiGateway/Connection.php index dbc21ba7..c49b6882 100644 --- a/src/Servers/ApiGateway/Connection.php +++ b/src/Servers/ApiGateway/Connection.php @@ -5,7 +5,7 @@ use Laravel\Reverb\Application; use Laravel\Reverb\Concerns\GeneratesPusherIdentifiers; use Laravel\Reverb\Concerns\SerializesConnections; -use Laravel\Reverb\Connection as BaseConnection; +use Laravel\Reverb\Contracts\Connection as BaseConnection; use Laravel\Reverb\Contracts\SerializableConnection; use Laravel\Reverb\Servers\ApiGateway\Jobs\SendToConnection; diff --git a/src/Servers/Ratchet/Connection.php b/src/Servers/Ratchet/Connection.php deleted file mode 100644 index 5f9da735..00000000 --- a/src/Servers/Ratchet/Connection.php +++ /dev/null @@ -1,82 +0,0 @@ -connection->resourceId; - } - - /** - * Get the normalized socket ID. - */ - public function id(): string - { - if (! $this->id) { - $this->id = $this->generateId(); - } - - return $this->id; - } - - /** - * Get the origin of the connection. - */ - public function origin(): string - { - return $this->origin; - } - - /** - * Send a message to the connection. - */ - public function send(string $message): void - { - try { - $this->connection->send($message); - - Output::info('Message Sent', $this->id()); - Output::message($message); - } catch (Throwable $e) { - Output::error('Unable to send message.'); - Output::info($e->getMessage()); - } - } - - /** - * Terminate a connection. - */ - public function terminate(): void - { - $this->connection->close(); - } -} diff --git a/src/Servers/Ratchet/Factory.php b/src/Servers/Ratchet/Factory.php deleted file mode 100644 index f145fe8f..00000000 --- a/src/Servers/Ratchet/Factory.php +++ /dev/null @@ -1,64 +0,0 @@ -add('sockets', new Route('/app/{appId}', ['_controller' => static::handler()], [], [], null, [], ['GET'])); - $routes->add('events', new Route('/apps/{appId}/events', ['_controller' => EventController::class], [], [], null, [], ['POST'])); - $routes->add('stats', new Route('/stats', ['_controller' => StatsController::class], [], [], null, [], ['GET'])); - - return $routes; - } - - /** - * Build the WebSocket server. - */ - protected static function handler(): WsServer - { - return new WsServer( - App::make(Server::class) - ); - } -} diff --git a/src/Servers/Ratchet/Server.php b/src/Servers/Ratchet/Server.php deleted file mode 100644 index 14bb46c9..00000000 --- a/src/Servers/Ratchet/Server.php +++ /dev/null @@ -1,104 +0,0 @@ -server->open( - $this->connection($connection) - ); - } - - /** - * Handle a new message received by the connected client. - * - * @param string $message - */ - public function onMessage(ConnectionInterface $from, $message): void - { - $this->server->message( - $this->connection($from), - $message - ); - } - - /** - * Handle a client disconnection. - */ - public function onClose(ConnectionInterface $connection): void - { - $this->server->close( - $this->connection($connection) - ); - } - - /** - * Handle an error. - */ - public function onError(ConnectionInterface $connection, Exception $e): void - { - if ($e instanceof InvalidApplication) { - $connection->send( - $e->message() - ); - - return; - } - - $this->server->error( - $this->connection($connection), - $e - ); - } - - /** - * Get a Reverb connection from a Ratchet connection. - */ - protected function connection(ConnectionInterface $connection): Connection - { - $application = $this->application($connection); - - return $this->connections - ->for($application) - ->resolve( - $connection->resourceId, - fn () => new Connection( - $connection, - $application, - $connection->httpRequest->getHeader('Origin')[0] ?? null - ) - ); - } - - /** - * Get the application instance for the request. - */ - protected function application(ConnectionInterface $connection): Application - { - $request = $connection->httpRequest; - parse_str($request->getUri()->getQuery(), $queryString); - - return $this->applications->findByKey($queryString['appId']); - } -} diff --git a/src/Servers/Reverb/Connection.php b/src/Servers/Reverb/Connection.php new file mode 100644 index 00000000..37b387c2 --- /dev/null +++ b/src/Servers/Reverb/Connection.php @@ -0,0 +1,79 @@ +connection->id(); + } + + /** + * Get the normalized socket ID. + */ + public function id(): string + { + if (! $this->id) { + $this->id = $this->generateId(); + } + + return $this->id; + } + + /** + * Create a new connection instance. + */ + public static function make(WsConnection $connection, Application $application, string $origin): Connection + { + return new static($connection, $application, $origin); + } + + /** + * Send a message to the connection. + */ + public function send(string $message): void + { + $this->connection->send($message); + } + + /** + * Terminate a connection. + */ + public function terminate(): void + { + $this->connection->close(); + } +} diff --git a/src/Servers/Ratchet/Console/Commands/StartServer.php b/src/Servers/Reverb/Console/Commands/StartServer.php similarity index 76% rename from src/Servers/Ratchet/Console/Commands/StartServer.php rename to src/Servers/Reverb/Console/Commands/StartServer.php index 54548512..dee06e39 100644 --- a/src/Servers/Ratchet/Console/Commands/StartServer.php +++ b/src/Servers/Reverb/Console/Commands/StartServer.php @@ -1,15 +1,13 @@ laravel->instance(Logger::class, new CliLogger($this->output)); - - $config = $this->laravel['config']['reverb.servers.ratchet']; + $config = $this->laravel['config']['reverb.servers.reverb']; $host = $this->option('host') ?: $config['host']; $port = $this->option('port') ?: $config['port']; @@ -54,7 +50,7 @@ public function handle(): void $this->components->info("Starting server on {$host}:{$port}"); - $server->run(); + $server->start(); } /** diff --git a/src/Servers/Reverb/Controller.php b/src/Servers/Reverb/Controller.php new file mode 100644 index 00000000..4b5f03e8 --- /dev/null +++ b/src/Servers/Reverb/Controller.php @@ -0,0 +1,43 @@ +connection($request, $connection, $key); + + $server = app(Server::class); + $server->open($reverbConnection); + + $connection->on('message', fn (string $message) => $server->message($reverbConnection, $message)); + $connection->on('close', fn () => $server->close($reverbConnection)); + } + + /** + * Get the Reverb connection instance for the request. + */ + protected function connection(RequestInterface $request, WsConnection $connection, string $key): ReverbConnection + { + return app(ConnectionManager::class) + ->for($application = app(ApplicationProvider::class)->findByKey($key)) + ->connect( + new ReverbConnection( + $connection, + $application, + $request->getHeader('Origin')[0] ?? null + ) + ); + } +} diff --git a/src/Servers/Reverb/Factory.php b/src/Servers/Reverb/Factory.php new file mode 100644 index 00000000..eca76a45 --- /dev/null +++ b/src/Servers/Reverb/Factory.php @@ -0,0 +1,40 @@ +add('sockets', Route::get('/app/{key}', new Controller)); + + return $routes; + } +} diff --git a/src/Servers/Ratchet/RatchetProvider.php b/src/Servers/Reverb/ReverbProvider.php similarity index 83% rename from src/Servers/Ratchet/RatchetProvider.php rename to src/Servers/Reverb/ReverbProvider.php index 9eece882..1c485da0 100644 --- a/src/Servers/Ratchet/RatchetProvider.php +++ b/src/Servers/Reverb/ReverbProvider.php @@ -1,6 +1,6 @@ app['cache']->store('array'), - $this->config['connection_manager']['prefix'] ?? 'reverb' - ); + return new ConnectionManager; } /** @@ -110,10 +107,6 @@ public function buildConnectionManager(): ConnectionManagerInterface */ public function buildChannelManager(): ChannelManagerInterface { - return new ChannelManager( - $this->app['cache']->store('array'), - $this->app->make(ConnectionManagerInterface::class), - $this->config['connection_manager']['prefix'] ?? 'reverb' - ); + return new ChannelManager; } } diff --git a/src/ServiceProvider.php b/src/ServiceProvider.php index c89ecad4..917c4bc0 100644 --- a/src/ServiceProvider.php +++ b/src/ServiceProvider.php @@ -3,12 +3,11 @@ namespace Laravel\Reverb; use Illuminate\Support\ServiceProvider as BaseServiceProvider; -use Laravel\Reverb\Console\Commands\StartServer; use Laravel\Reverb\Contracts\ChannelManager; use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Contracts\Logger; use Laravel\Reverb\Contracts\ServerProvider; -use Laravel\Reverb\Loggers\StandardLogger; +use Laravel\Reverb\Loggers\NullLogger; class ServiceProvider extends BaseServiceProvider { @@ -17,12 +16,6 @@ class ServiceProvider extends BaseServiceProvider */ public function boot() { - if ($this->app->runningInConsole()) { - $this->commands([ - StartServer::class, - ]); - } - $this->publishes([ __DIR__.'/../config/reverb.php' => config_path('reverb.php'), ]); @@ -54,16 +47,16 @@ public function registerServer() $server->register(); - $this->app->bind( + $this->app->singleton( ConnectionManager::class, fn () => $server->buildConnectionManager() ); - $this->app->bind( + $this->app->singleton( ChannelManager::class, fn () => $server->buildChannelManager() ); - $this->app->instance(Logger::class, new StandardLogger); + $this->app->instance(Logger::class, new NullLogger); } } diff --git a/src/WebSockets/WsConnection.php b/src/WebSockets/WsConnection.php new file mode 100644 index 00000000..9e214635 --- /dev/null +++ b/src/WebSockets/WsConnection.php @@ -0,0 +1,48 @@ +buffer = new MessageBuffer( + new CloseFrameChecker, + onMessage: fn (Message $message) => $this->emit('message', [$message->getPayload()]), + onControl: fn () => $this->close(), + sender: [$connection, 'send'] + ); + + $connection->on('data', [$this->buffer, 'onData']); + $connection->on('close', fn () => $this->emit('close')); + } + + /** + * Send a message to the connection. + */ + public function send(string $message): void + { + $this->buffer->sendMessage($message); + } + + /** + * Close the connection. + */ + public function close(): void + { + $this->connection->close(); + } + + public function id() + { + return $this->connection->id(); + } +} diff --git a/tests/ApiGatewayTestCase.php b/tests/ApiGatewayTestCase.php index 81fb748e..69fd6d10 100644 --- a/tests/ApiGatewayTestCase.php +++ b/tests/ApiGatewayTestCase.php @@ -5,7 +5,7 @@ use Illuminate\Support\Facades\App; use Illuminate\Support\Facades\Bus; use Illuminate\Support\Str; -use Laravel\Reverb\Connection; +use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Contracts\Logger; use Laravel\Reverb\Loggers\NullLogger; use Laravel\Reverb\ManagerProvider; diff --git a/tests/Connection.php b/tests/Connection.php index 37d4b1c7..6d6aa97e 100644 --- a/tests/Connection.php +++ b/tests/Connection.php @@ -5,8 +5,8 @@ use Carbon\Carbon; use Illuminate\Testing\Assert; use Laravel\Reverb\Application; -use Laravel\Reverb\Connection as BaseConnection; use Laravel\Reverb\Contracts\ApplicationProvider; +use Laravel\Reverb\Contracts\Connection as BaseConnection; class Connection extends BaseConnection { diff --git a/tests/Pest.php b/tests/Pest.php index 9692b6c5..fbfda262 100644 --- a/tests/Pest.php +++ b/tests/Pest.php @@ -2,9 +2,9 @@ use Illuminate\Support\Facades\App; use Laravel\Reverb\Application; -use Laravel\Reverb\Connection as ReverbConnection; use Laravel\Reverb\Contracts\ApplicationProvider; use Laravel\Reverb\Contracts\ChannelManager; +use Laravel\Reverb\Contracts\Connection as ReverbConnection; use Laravel\Reverb\Contracts\ConnectionManager; use Laravel\Reverb\Managers\Connections; use Laravel\Reverb\Tests\Connection; diff --git a/tests/RatchetTestCase.php b/tests/RatchetTestCase.php index 5ae060e9..2bf162bc 100644 --- a/tests/RatchetTestCase.php +++ b/tests/RatchetTestCase.php @@ -5,7 +5,7 @@ use Clue\React\Redis\Client; use Illuminate\Support\Str; use Laravel\Reverb\Concerns\InteractsWithAsyncRedis; -use Laravel\Reverb\Connection; +use Laravel\Reverb\Contracts\Connection; use Laravel\Reverb\Contracts\Logger; use Laravel\Reverb\Contracts\ServerProvider; use Laravel\Reverb\Event;