From 6481ec1c92b67122721f411f938089ba0729dbd0 Mon Sep 17 00:00:00 2001 From: kingIZZZY <logic.cpp@gmail.com> Date: Mon, 23 Dec 2024 01:22:52 -0500 Subject: [PATCH 1/5] enable websockets for swoole --- bin/createSwooleServer.php | 4 +- bin/swoole-server | 63 +++++++++++++++++-- .../Concerns/InteractsWithServers.php | 2 +- src/Concerns/ProvidesConcurrencySupport.php | 6 +- src/Events/WebSocketDisconnectReceived.php | 17 +++++ src/Events/WebSocketMessageReceived.php | 17 +++++ src/Octane.php | 4 +- src/OctaneServiceProvider.php | 5 +- ...onvertSwooleRequestToIlluminateRequest.php | 2 +- ...sureRequestsDontExceedMaxExecutionTime.php | 7 ++- src/Swoole/Handlers/OnServerStart.php | 2 +- src/Swoole/Handlers/OnWorkerStart.php | 12 ++-- src/Swoole/SwooleTaskDispatcher.php | 11 ++-- src/Worker.php | 42 +++++++++++++ 14 files changed, 165 insertions(+), 29 deletions(-) create mode 100644 src/Events/WebSocketDisconnectReceived.php create mode 100644 src/Events/WebSocketMessageReceived.php diff --git a/bin/createSwooleServer.php b/bin/createSwooleServer.php index b94bc0d91..e8cb1c976 100644 --- a/bin/createSwooleServer.php +++ b/bin/createSwooleServer.php @@ -7,7 +7,9 @@ $sock = filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) ? SWOOLE_SOCK_TCP : SWOOLE_SOCK_TCP6; - $server = new Swoole\Http\Server( + $serverClass = ($config['swoole']['enableWebSockets'] ?? false) ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; + + $server = new $serverClass( $host, $serverState['port'] ?? 8000, $config['swoole']['mode'] ?? SWOOLE_PROCESS, diff --git a/bin/swoole-server b/bin/swoole-server index 3fa19c249..dc4b72623 100755 --- a/bin/swoole-server +++ b/bin/swoole-server @@ -9,6 +9,7 @@ use Laravel\Octane\Swoole\ServerStateFile; use Laravel\Octane\Swoole\SwooleExtension; use Laravel\Octane\Swoole\WorkerState; use Swoole\Http\Server; +use Swoole\WebSocket\Server as WServer; use Swoole\Timer; ini_set('display_errors', 'stderr'); @@ -49,7 +50,7 @@ $timerTable = require __DIR__.'/createSwooleTimerTable.php'; | */ -$server->on('start', fn (Server $server) => $bootstrap($serverState) && (new OnServerStart( +$server->on('start', fn (Server|WServer $server) => $bootstrap($serverState) && (new OnServerStart( new ServerStateFile($serverStateFile), new SwooleExtension, $serverState['appName'], @@ -91,7 +92,7 @@ $workerState->cacheTable = require __DIR__.'/createSwooleCacheTable.php'; $workerState->timerTable = $timerTable; $workerState->tables = require __DIR__.'/createSwooleTables.php'; -$server->on('workerstart', fn (Server $server, $workerId) => +$server->on('workerstart', fn (Server|WServer $server, $workerId) => (fn ($basePath) => (new OnWorkerStart( new SwooleExtension, $basePath, $serverState, $workerState ))($server, $workerId))($bootstrap($serverState)) @@ -142,13 +143,67 @@ $server->on('request', function ($request, $response) use ($server, $workerState | */ -$server->on('task', fn (Server $server, int $taskId, int $fromWorkerId, $data) => +$server->on('task', fn (Server|WServer $server, int $taskId, int $fromWorkerId, $data) => $data === 'octane-tick' ? $workerState->worker->handleTick() : $workerState->worker->handleTask($data) ); -$server->on('finish', fn (Server $server, int $taskId, $result) => $result); +$server->on('finish', fn (Server|WServer $server, int $taskId, $result) => $result); + + +if($serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false){ + + /* + |-------------------------------------------------------------------------- + | Handle Incoming WebSocket Connections + |-------------------------------------------------------------------------- + */ + $server->on('handshake', function ($request, $response) use ($server, $workerState, $serverState) { + $workerState->lastRequestTime = microtime(true); + + if ($workerState->timerTable) { + $workerState->timerTable->set($workerState->workerId, [ + 'worker_pid' => $workerState->workerPid, + 'time' => time(), + 'fd' => $request->fd, + ]); + } + + $workerState->worker->handle(...$workerState->client->marshalRequest(new RequestContext([ + 'swooleRequest' => $request, + 'swooleResponse' => $response, + 'publicPath' => $serverState['publicPath'], + 'octaneConfig' => $serverState['octaneConfig'], + ]))); + + if ($workerState->timerTable) { + $workerState->timerTable->del($workerState->workerId); + } + }); + + + /* + |-------------------------------------------------------------------------- + | Handle Incoming WebSocket Messages + |-------------------------------------------------------------------------- + */ + + $server->on('message', function (Server $server, Frame $frame) use ($workerState) { + $workerState->worker->handleWebSocketMessage($server, $frame); + }); + + /* + |-------------------------------------------------------------------------- + | Handle Closed WebSocket Connections + |-------------------------------------------------------------------------- + */ + + $server->on('close', function (Server $server, int $fd) use ($workerState) { + $workerState->worker->handleWebSocketDisconnect($server, $fd); + }); + +} /* |-------------------------------------------------------------------------- diff --git a/src/Commands/Concerns/InteractsWithServers.php b/src/Commands/Concerns/InteractsWithServers.php index d74e3f8b6..673a3be6b 100644 --- a/src/Commands/Concerns/InteractsWithServers.php +++ b/src/Commands/Concerns/InteractsWithServers.php @@ -105,7 +105,7 @@ protected function writeServerRunningMessage() $this->output->writeln([ '', ' Local: <fg=white;options=bold>'.($this->hasOption('https') && $this->option('https') ? 'https://' : 'http://').$this->getHost().':'.$this->getPort().' </>', - '', + config('octane.swoole.enableWebSockets') ? (' <fg=white;options=bold>'.($this->hasOption('https') && $this->option('https') ? 'wss://' : 'ws://').$this->getHost().':'.$this->getPort()." </>\n") : '', ' <fg=yellow>Press Ctrl+C to stop the server</>', '', ]); diff --git a/src/Concerns/ProvidesConcurrencySupport.php b/src/Concerns/ProvidesConcurrencySupport.php index 134f601f6..545bb40ac 100644 --- a/src/Concerns/ProvidesConcurrencySupport.php +++ b/src/Concerns/ProvidesConcurrencySupport.php @@ -7,7 +7,6 @@ use Laravel\Octane\Swoole\ServerStateFile; use Laravel\Octane\Swoole\SwooleHttpTaskDispatcher; use Laravel\Octane\Swoole\SwooleTaskDispatcher; -use Swoole\Http\Server; trait ProvidesConcurrencySupport { @@ -33,10 +32,11 @@ public function concurrently(array $tasks, int $waitMilliseconds = 3000) */ public function tasks() { + $serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; return match (true) { app()->bound(DispatchesTasks::class) => app(DispatchesTasks::class), - app()->bound(Server::class) => new SwooleTaskDispatcher, - class_exists(Server::class) => (fn (array $serverState) => new SwooleHttpTaskDispatcher( + app()->bound($serverClass) => new SwooleTaskDispatcher, + class_exists($serverClass) => (fn (array $serverState) => new SwooleHttpTaskDispatcher( $serverState['state']['host'] ?? '127.0.0.1', $serverState['state']['port'] ?? '8000', new SequentialTaskDispatcher diff --git a/src/Events/WebSocketDisconnectReceived.php b/src/Events/WebSocketDisconnectReceived.php new file mode 100644 index 000000000..5bc758cee --- /dev/null +++ b/src/Events/WebSocketDisconnectReceived.php @@ -0,0 +1,17 @@ +<?php + +namespace Laravel\Octane\Events; + +use Illuminate\Foundation\Application; +use Swoole\WebSocket\Server; + +class WebSocketDisconnectReceived +{ + public function __construct( + public Application $app, + public Application $sandbox, + public Server $server, + public int $fd + ) { + } +} diff --git a/src/Events/WebSocketMessageReceived.php b/src/Events/WebSocketMessageReceived.php new file mode 100644 index 000000000..239b8e23f --- /dev/null +++ b/src/Events/WebSocketMessageReceived.php @@ -0,0 +1,17 @@ +<?php + +namespace Laravel\Octane\Events; + +use Illuminate\Foundation\Application; +use Swoole\WebSocket\Server; + +class WebSocketMessageReceived +{ + public function __construct( + public Application $app, + public Application $sandbox, + public Server $server, + public Frame $frame + ) { + } +} diff --git a/src/Octane.php b/src/Octane.php index 11028a3a3..8d00ecc8d 100644 --- a/src/Octane.php +++ b/src/Octane.php @@ -4,7 +4,6 @@ use Exception; use Laravel\Octane\Swoole\WorkerState; -use Swoole\Http\Server; use Swoole\Table; use Throwable; @@ -20,7 +19,8 @@ class Octane */ public function table(string $table): Table { - if (! app()->bound(Server::class)) { + $serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; + if (! app()->bound($serverClass)) { throw new Exception('Tables may only be accessed when using the Swoole server.'); } diff --git a/src/OctaneServiceProvider.php b/src/OctaneServiceProvider.php index 206e32768..3c5b1e496 100644 --- a/src/OctaneServiceProvider.php +++ b/src/OctaneServiceProvider.php @@ -86,9 +86,10 @@ public function register() )); }); + $serverClass = $app['config']->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; $this->app->bind(DispatchesCoroutines::class, function ($app) { - return class_exists('Swoole\Http\Server') - ? new SwooleCoroutineDispatcher($app->bound('Swoole\Http\Server')) + return class_exists($serverClass) + ? new SwooleCoroutineDispatcher($app->bound($serverClass)) : $app->make(SequentialCoroutineDispatcher::class); }); } diff --git a/src/Swoole/Actions/ConvertSwooleRequestToIlluminateRequest.php b/src/Swoole/Actions/ConvertSwooleRequestToIlluminateRequest.php index 41f3f5889..c7097d22c 100644 --- a/src/Swoole/Actions/ConvertSwooleRequestToIlluminateRequest.php +++ b/src/Swoole/Actions/ConvertSwooleRequestToIlluminateRequest.php @@ -24,7 +24,7 @@ public function __invoke($swooleRequest, string $phpSapi): Request $request = new SymfonyRequest( $swooleRequest->get ?? [], $swooleRequest->post ?? [], - [], + ['fd' => $swooleRequest->fd], $swooleRequest->cookie ?? [], $swooleRequest->files ?? [], $serverVariables, diff --git a/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php b/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php index 8437ab362..da126a3ba 100644 --- a/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php +++ b/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php @@ -5,6 +5,7 @@ use Laravel\Octane\Swoole\SwooleExtension; use Swoole\Http\Response; use Swoole\Http\Server; +use Swoole\WebSocket\Server as WServer; class EnsureRequestsDontExceedMaxExecutionTime { @@ -12,7 +13,7 @@ public function __construct( protected SwooleExtension $extension, protected $timerTable, protected $maxExecutionTime, - protected ?Server $server = null + protected Server|WServer|null $server = null ) { } @@ -38,13 +39,13 @@ public function __invoke() $this->timerTable->del($workerId); - if ($this->server instanceof Server && ! $this->server->exists($row['fd'])) { + if (($this->server instanceof Server || $this->server instanceof WServer) && ! $this->server->exists($row['fd'])) { continue; } $this->extension->dispatchProcessSignal($row['worker_pid'], SIGKILL); - if ($this->server instanceof Server) { + if ($this->server instanceof Server || $this->server instanceof WServer) { $response = Response::create($this->server, $row['fd']); if ($response) { diff --git a/src/Swoole/Handlers/OnServerStart.php b/src/Swoole/Handlers/OnServerStart.php index 64943571d..d90f10653 100644 --- a/src/Swoole/Handlers/OnServerStart.php +++ b/src/Swoole/Handlers/OnServerStart.php @@ -23,7 +23,7 @@ public function __construct( /** * Handle the "start" Swoole event. * - * @param \Swoole\Http\Server $server + * @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server * @return void */ public function __invoke($server) diff --git a/src/Swoole/Handlers/OnWorkerStart.php b/src/Swoole/Handlers/OnWorkerStart.php index 0d928d79b..abda8f662 100644 --- a/src/Swoole/Handlers/OnWorkerStart.php +++ b/src/Swoole/Handlers/OnWorkerStart.php @@ -8,7 +8,6 @@ use Laravel\Octane\Swoole\SwooleExtension; use Laravel\Octane\Swoole\WorkerState; use Laravel\Octane\Worker; -use Swoole\Http\Server; use Throwable; class OnWorkerStart @@ -25,7 +24,7 @@ public function __construct( /** * Handle the "workerstart" Swoole event. * - * @param \Swoole\Http\Server $server + * @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server * @return void */ public function __invoke($server, int $workerId) @@ -53,18 +52,19 @@ public function __invoke($server, int $workerId) /** * Boot the Octane worker and application. * - * @param \Swoole\Http\Server $server + * @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server * @return \Laravel\Octane\Worker|null */ protected function bootWorker($server) { try { + $serverClass = ($this->serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false) ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; return tap(new Worker( new ApplicationFactory($this->basePath), $this->workerState->client = new SwooleClient ))->boot([ 'octane.cacheTable' => $this->workerState->cacheTable, - Server::class => $server, + $serverClass => $server, WorkerState::class => $this->workerState, ]); } catch (Throwable $e) { @@ -77,7 +77,7 @@ protected function bootWorker($server) /** * Start the Octane server tick to dispatch the tick task every second. * - * @param \Swoole\Http\Server $server + * @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server * @return void */ protected function dispatchServerTickTaskEverySecond($server) @@ -88,7 +88,7 @@ protected function dispatchServerTickTaskEverySecond($server) /** * Register the request handled listener that will output request information per request. * - * @param \Swoole\Http\Server $server + * @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server * @return void */ protected function streamRequestsToConsole($server) diff --git a/src/Swoole/SwooleTaskDispatcher.php b/src/Swoole/SwooleTaskDispatcher.php index 2d7910875..490181727 100644 --- a/src/Swoole/SwooleTaskDispatcher.php +++ b/src/Swoole/SwooleTaskDispatcher.php @@ -8,7 +8,6 @@ use Laravel\Octane\Exceptions\TaskExceptionResult; use Laravel\Octane\Exceptions\TaskTimeoutException; use Laravel\SerializableClosure\SerializableClosure; -use Swoole\Http\Server; class SwooleTaskDispatcher implements DispatchesTasks { @@ -23,11 +22,12 @@ class SwooleTaskDispatcher implements DispatchesTasks */ public function resolve(array $tasks, int $waitMilliseconds = 3000): array { - if (! app()->bound(Server::class)) { + $serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; + if (! app()->bound($serverClass)) { throw new InvalidArgumentException('Tasks can only be resolved within a Swoole server context / web request.'); } - $results = app(Server::class)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) { + $results = app($serverClass)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) { return [$key => $task instanceof Closure ? new SerializableClosure($task) : $task, ]; @@ -61,11 +61,12 @@ public function resolve(array $tasks, int $waitMilliseconds = 3000): array */ public function dispatch(array $tasks): void { - if (! app()->bound(Server::class)) { + $serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; + if (! app()->bound($serverClass)) { throw new InvalidArgumentException('Tasks can only be dispatched within a Swoole server context / web request.'); } - $server = app(Server::class); + $server = app($serverClass); collect($tasks)->each(function ($task) use ($server) { $server->task($task instanceof Closure ? new SerializableClosure($task) : $task); diff --git a/src/Worker.php b/src/Worker.php index 0a87ca4b1..5fb5a5776 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -12,6 +12,8 @@ use Laravel\Octane\Events\TaskTerminated; use Laravel\Octane\Events\TickReceived; use Laravel\Octane\Events\TickTerminated; +use Laravel\Octane\Events\WebSocketDisconnectReceived; +use Laravel\Octane\Events\WebSocketMessageReceived; use Laravel\Octane\Events\WorkerErrorOccurred; use Laravel\Octane\Events\WorkerStarting; use Laravel\Octane\Events\WorkerStopping; @@ -241,4 +243,44 @@ public function terminate(): void { $this->dispatchEvent($this->app, new WorkerStopping($this->app)); } + + /** + * Handle Incoming WebSocket Messages + */ + public function handleWebSocketMessage(\Swoole\WebSocket\Server $server, \Swoole\WebSocket\Frame $frame) + { + \Laravel\Octane\CurrentApplication::set($sandbox = clone $this->app); + + try { + $this->dispatchEvent($sandbox, new WebSocketMessageReceived($this->app, $sandbox, $server, $frame)); + } catch (Throwable $e) { + $this->dispatchEvent($sandbox, new WorkerErrorOccurred($e, $sandbox)); + } finally { + $sandbox->flush(); + + unset($sandbox); + + \Laravel\Octane\CurrentApplication::set($this->app); + } + } + + /** + * Handle Closed WebSocket Connections + */ + public function handleWebSocketDisconnect(\Swoole\WebSocket\Server $server, int $fd) + { + \Laravel\Octane\CurrentApplication::set($sandbox = clone $this->app); + + try { + $this->dispatchEvent($sandbox, new WebSocketDisconnectReceived($this->app, $sandbox, $server, $fd)); + } catch (Throwable $e) { + $this->dispatchEvent($sandbox, new WorkerErrorOccurred($e, $sandbox)); + } finally { + $sandbox->flush(); + + unset($sandbox); + + \Laravel\Octane\CurrentApplication::set($this->app); + } + } } From d1c95df4f4217836ed2f352f35d5eb66805457d6 Mon Sep 17 00:00:00 2001 From: kingIZZZY <logic.cpp@gmail.com> Date: Mon, 23 Dec 2024 01:43:01 -0500 Subject: [PATCH 2/5] fix "Undefined variable $app" --- src/OctaneServiceProvider.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OctaneServiceProvider.php b/src/OctaneServiceProvider.php index 3c5b1e496..3eaec69f3 100644 --- a/src/OctaneServiceProvider.php +++ b/src/OctaneServiceProvider.php @@ -86,8 +86,8 @@ public function register() )); }); - $serverClass = $app['config']->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; $this->app->bind(DispatchesCoroutines::class, function ($app) { + $serverClass = $app['config']->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server'; return class_exists($serverClass) ? new SwooleCoroutineDispatcher($app->bound($serverClass)) : $app->make(SequentialCoroutineDispatcher::class); From 50779a97fa48f0baac26f36930e5682f64874a67 Mon Sep 17 00:00:00 2001 From: kingIZZZY <logic.cpp@gmail.com> Date: Mon, 23 Dec 2024 01:49:07 -0500 Subject: [PATCH 3/5] WIP websocket listeners to prepareApplicationForNextOperation() --- config/octane.php | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/config/octane.php b/config/octane.php index 8cfba0114..b5db6112a 100644 --- a/config/octane.php +++ b/config/octane.php @@ -8,6 +8,8 @@ use Laravel\Octane\Events\TaskTerminated; use Laravel\Octane\Events\TickReceived; use Laravel\Octane\Events\TickTerminated; +use Laravel\Octane\Events\WebSocketMessageReceived; +use Laravel\Octane\Events\WebSocketDisconnectReceived; use Laravel\Octane\Events\WorkerErrorOccurred; use Laravel\Octane\Events\WorkerStarting; use Laravel\Octane\Events\WorkerStopping; @@ -102,6 +104,16 @@ // ], + WebSocketMessageReceived::class => [ + ...Octane::prepareApplicationForNextOperation(), + // + ], + + WebSocketDisconnectReceived::class => [ + ...Octane::prepareApplicationForNextOperation(), + // + ], + OperationTerminated::class => [ FlushOnce::class, FlushTemporaryContainerInstances::class, From 3b7027008b2628b18a7bae15487b2d97a2b102b6 Mon Sep 17 00:00:00 2001 From: kingIZZZY <logic.cpp@gmail.com> Date: Mon, 23 Dec 2024 02:00:16 -0500 Subject: [PATCH 4/5] WIP fix "must be of type Frame, Swoole\WebSocket\Frame given" --- bin/swoole-server | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/swoole-server b/bin/swoole-server index dc4b72623..47b8d1d0b 100755 --- a/bin/swoole-server +++ b/bin/swoole-server @@ -189,7 +189,7 @@ if($serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false){ |-------------------------------------------------------------------------- */ - $server->on('message', function (Server $server, Frame $frame) use ($workerState) { + $server->on('message', function (WServer $server, Swoole\WebSocket\Frame $frame) use ($workerState) { $workerState->worker->handleWebSocketMessage($server, $frame); }); @@ -199,7 +199,7 @@ if($serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false){ |-------------------------------------------------------------------------- */ - $server->on('close', function (Server $server, int $fd) use ($workerState) { + $server->on('close', function (WServer $server, int $fd) use ($workerState) { $workerState->worker->handleWebSocketDisconnect($server, $fd); }); From dcf265cf3fdc985e904cebc750578c0ad45d5648 Mon Sep 17 00:00:00 2001 From: kingIZZZY <logic.cpp@gmail.com> Date: Mon, 23 Dec 2024 02:02:39 -0500 Subject: [PATCH 5/5] WIP fix "must be of type Laravel\Octane\Events\Frame, Swoole\WebSocket\Frame given" --- src/Events/WebSocketMessageReceived.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Events/WebSocketMessageReceived.php b/src/Events/WebSocketMessageReceived.php index 239b8e23f..4860bad1e 100644 --- a/src/Events/WebSocketMessageReceived.php +++ b/src/Events/WebSocketMessageReceived.php @@ -3,6 +3,7 @@ namespace Laravel\Octane\Events; use Illuminate\Foundation\Application; +use Swoole\WebSocket\Frame; use Swoole\WebSocket\Server; class WebSocketMessageReceived