Skip to content

Commit

Permalink
enable websockets for swoole
Browse files Browse the repository at this point in the history
  • Loading branch information
kingIZZZY authored Dec 23, 2024
1 parent ee88fe3 commit 6481ec1
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 29 deletions.
4 changes: 3 additions & 1 deletion bin/createSwooleServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

$sock = filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) ? SWOOLE_SOCK_TCP : SWOOLE_SOCK_TCP6;

$server = new Swoole\Http\Server(
$serverClass = ($config['swoole']['enableWebSockets'] ?? false) ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';

$server = new $serverClass(
$host,
$serverState['port'] ?? 8000,
$config['swoole']['mode'] ?? SWOOLE_PROCESS,
Expand Down
63 changes: 59 additions & 4 deletions bin/swoole-server
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use Laravel\Octane\Swoole\ServerStateFile;
use Laravel\Octane\Swoole\SwooleExtension;
use Laravel\Octane\Swoole\WorkerState;
use Swoole\Http\Server;
use Swoole\WebSocket\Server as WServer;
use Swoole\Timer;

ini_set('display_errors', 'stderr');
Expand Down Expand Up @@ -49,7 +50,7 @@ $timerTable = require __DIR__.'/createSwooleTimerTable.php';
|
*/

$server->on('start', fn (Server $server) => $bootstrap($serverState) && (new OnServerStart(
$server->on('start', fn (Server|WServer $server) => $bootstrap($serverState) && (new OnServerStart(
new ServerStateFile($serverStateFile),
new SwooleExtension,
$serverState['appName'],
Expand Down Expand Up @@ -91,7 +92,7 @@ $workerState->cacheTable = require __DIR__.'/createSwooleCacheTable.php';
$workerState->timerTable = $timerTable;
$workerState->tables = require __DIR__.'/createSwooleTables.php';

$server->on('workerstart', fn (Server $server, $workerId) =>
$server->on('workerstart', fn (Server|WServer $server, $workerId) =>
(fn ($basePath) => (new OnWorkerStart(
new SwooleExtension, $basePath, $serverState, $workerState
))($server, $workerId))($bootstrap($serverState))
Expand Down Expand Up @@ -142,13 +143,67 @@ $server->on('request', function ($request, $response) use ($server, $workerState
|
*/

$server->on('task', fn (Server $server, int $taskId, int $fromWorkerId, $data) =>
$server->on('task', fn (Server|WServer $server, int $taskId, int $fromWorkerId, $data) =>
$data === 'octane-tick'
? $workerState->worker->handleTick()
: $workerState->worker->handleTask($data)
);

$server->on('finish', fn (Server $server, int $taskId, $result) => $result);
$server->on('finish', fn (Server|WServer $server, int $taskId, $result) => $result);


if($serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false){

/*
|--------------------------------------------------------------------------
| Handle Incoming WebSocket Connections
|--------------------------------------------------------------------------
*/
$server->on('handshake', function ($request, $response) use ($server, $workerState, $serverState) {
$workerState->lastRequestTime = microtime(true);

if ($workerState->timerTable) {
$workerState->timerTable->set($workerState->workerId, [
'worker_pid' => $workerState->workerPid,
'time' => time(),
'fd' => $request->fd,
]);
}

$workerState->worker->handle(...$workerState->client->marshalRequest(new RequestContext([
'swooleRequest' => $request,
'swooleResponse' => $response,
'publicPath' => $serverState['publicPath'],
'octaneConfig' => $serverState['octaneConfig'],
])));

if ($workerState->timerTable) {
$workerState->timerTable->del($workerState->workerId);
}
});


/*
|--------------------------------------------------------------------------
| Handle Incoming WebSocket Messages
|--------------------------------------------------------------------------
*/

$server->on('message', function (Server $server, Frame $frame) use ($workerState) {
$workerState->worker->handleWebSocketMessage($server, $frame);
});

/*
|--------------------------------------------------------------------------
| Handle Closed WebSocket Connections
|--------------------------------------------------------------------------
*/

$server->on('close', function (Server $server, int $fd) use ($workerState) {
$workerState->worker->handleWebSocketDisconnect($server, $fd);
});

}

/*
|--------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/Commands/Concerns/InteractsWithServers.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected function writeServerRunningMessage()
$this->output->writeln([
'',
' Local: <fg=white;options=bold>'.($this->hasOption('https') && $this->option('https') ? 'https://' : 'http://').$this->getHost().':'.$this->getPort().' </>',
'',
config('octane.swoole.enableWebSockets') ? (' <fg=white;options=bold>'.($this->hasOption('https') && $this->option('https') ? 'wss://' : 'ws://').$this->getHost().':'.$this->getPort()." </>\n") : '',
' <fg=yellow>Press Ctrl+C to stop the server</>',
'',
]);
Expand Down
6 changes: 3 additions & 3 deletions src/Concerns/ProvidesConcurrencySupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Laravel\Octane\Swoole\ServerStateFile;
use Laravel\Octane\Swoole\SwooleHttpTaskDispatcher;
use Laravel\Octane\Swoole\SwooleTaskDispatcher;
use Swoole\Http\Server;

trait ProvidesConcurrencySupport
{
Expand All @@ -33,10 +32,11 @@ public function concurrently(array $tasks, int $waitMilliseconds = 3000)
*/
public function tasks()
{
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
return match (true) {
app()->bound(DispatchesTasks::class) => app(DispatchesTasks::class),
app()->bound(Server::class) => new SwooleTaskDispatcher,
class_exists(Server::class) => (fn (array $serverState) => new SwooleHttpTaskDispatcher(
app()->bound($serverClass) => new SwooleTaskDispatcher,
class_exists($serverClass) => (fn (array $serverState) => new SwooleHttpTaskDispatcher(
$serverState['state']['host'] ?? '127.0.0.1',
$serverState['state']['port'] ?? '8000',
new SequentialTaskDispatcher
Expand Down
17 changes: 17 additions & 0 deletions src/Events/WebSocketDisconnectReceived.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Laravel\Octane\Events;

use Illuminate\Foundation\Application;
use Swoole\WebSocket\Server;

class WebSocketDisconnectReceived
{
public function __construct(
public Application $app,
public Application $sandbox,
public Server $server,
public int $fd
) {
}
}
17 changes: 17 additions & 0 deletions src/Events/WebSocketMessageReceived.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Laravel\Octane\Events;

use Illuminate\Foundation\Application;
use Swoole\WebSocket\Server;

class WebSocketMessageReceived
{
public function __construct(
public Application $app,
public Application $sandbox,
public Server $server,
public Frame $frame
) {
}
}
4 changes: 2 additions & 2 deletions src/Octane.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Exception;
use Laravel\Octane\Swoole\WorkerState;
use Swoole\Http\Server;
use Swoole\Table;
use Throwable;

Expand All @@ -20,7 +19,8 @@ class Octane
*/
public function table(string $table): Table
{
if (! app()->bound(Server::class)) {
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
if (! app()->bound($serverClass)) {
throw new Exception('Tables may only be accessed when using the Swoole server.');
}

Expand Down
5 changes: 3 additions & 2 deletions src/OctaneServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ public function register()
));
});

$serverClass = $app['config']->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
$this->app->bind(DispatchesCoroutines::class, function ($app) {
return class_exists('Swoole\Http\Server')
? new SwooleCoroutineDispatcher($app->bound('Swoole\Http\Server'))
return class_exists($serverClass)
? new SwooleCoroutineDispatcher($app->bound($serverClass))
: $app->make(SequentialCoroutineDispatcher::class);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __invoke($swooleRequest, string $phpSapi): Request
$request = new SymfonyRequest(
$swooleRequest->get ?? [],
$swooleRequest->post ?? [],
[],
['fd' => $swooleRequest->fd],
$swooleRequest->cookie ?? [],
$swooleRequest->files ?? [],
$serverVariables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
use Laravel\Octane\Swoole\SwooleExtension;
use Swoole\Http\Response;
use Swoole\Http\Server;
use Swoole\WebSocket\Server as WServer;

class EnsureRequestsDontExceedMaxExecutionTime
{
public function __construct(
protected SwooleExtension $extension,
protected $timerTable,
protected $maxExecutionTime,
protected ?Server $server = null
protected Server|WServer|null $server = null
) {
}

Expand All @@ -38,13 +39,13 @@ public function __invoke()

$this->timerTable->del($workerId);

if ($this->server instanceof Server && ! $this->server->exists($row['fd'])) {
if (($this->server instanceof Server || $this->server instanceof WServer) && ! $this->server->exists($row['fd'])) {
continue;
}

$this->extension->dispatchProcessSignal($row['worker_pid'], SIGKILL);

if ($this->server instanceof Server) {
if ($this->server instanceof Server || $this->server instanceof WServer) {
$response = Response::create($this->server, $row['fd']);

if ($response) {
Expand Down
2 changes: 1 addition & 1 deletion src/Swoole/Handlers/OnServerStart.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function __construct(
/**
* Handle the "start" Swoole event.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
public function __invoke($server)
Expand Down
12 changes: 6 additions & 6 deletions src/Swoole/Handlers/OnWorkerStart.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Laravel\Octane\Swoole\SwooleExtension;
use Laravel\Octane\Swoole\WorkerState;
use Laravel\Octane\Worker;
use Swoole\Http\Server;
use Throwable;

class OnWorkerStart
Expand All @@ -25,7 +24,7 @@ public function __construct(
/**
* Handle the "workerstart" Swoole event.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
public function __invoke($server, int $workerId)
Expand Down Expand Up @@ -53,18 +52,19 @@ public function __invoke($server, int $workerId)
/**
* Boot the Octane worker and application.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return \Laravel\Octane\Worker|null
*/
protected function bootWorker($server)
{
try {
$serverClass = ($this->serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false) ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
return tap(new Worker(
new ApplicationFactory($this->basePath),
$this->workerState->client = new SwooleClient
))->boot([
'octane.cacheTable' => $this->workerState->cacheTable,
Server::class => $server,
$serverClass => $server,
WorkerState::class => $this->workerState,
]);
} catch (Throwable $e) {
Expand All @@ -77,7 +77,7 @@ protected function bootWorker($server)
/**
* Start the Octane server tick to dispatch the tick task every second.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
protected function dispatchServerTickTaskEverySecond($server)
Expand All @@ -88,7 +88,7 @@ protected function dispatchServerTickTaskEverySecond($server)
/**
* Register the request handled listener that will output request information per request.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
protected function streamRequestsToConsole($server)
Expand Down
11 changes: 6 additions & 5 deletions src/Swoole/SwooleTaskDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Laravel\Octane\Exceptions\TaskExceptionResult;
use Laravel\Octane\Exceptions\TaskTimeoutException;
use Laravel\SerializableClosure\SerializableClosure;
use Swoole\Http\Server;

class SwooleTaskDispatcher implements DispatchesTasks
{
Expand All @@ -23,11 +22,12 @@ class SwooleTaskDispatcher implements DispatchesTasks
*/
public function resolve(array $tasks, int $waitMilliseconds = 3000): array
{
if (! app()->bound(Server::class)) {
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
if (! app()->bound($serverClass)) {
throw new InvalidArgumentException('Tasks can only be resolved within a Swoole server context / web request.');
}

$results = app(Server::class)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) {
$results = app($serverClass)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) {
return [$key => $task instanceof Closure
? new SerializableClosure($task)
: $task, ];
Expand Down Expand Up @@ -61,11 +61,12 @@ public function resolve(array $tasks, int $waitMilliseconds = 3000): array
*/
public function dispatch(array $tasks): void
{
if (! app()->bound(Server::class)) {
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
if (! app()->bound($serverClass)) {
throw new InvalidArgumentException('Tasks can only be dispatched within a Swoole server context / web request.');
}

$server = app(Server::class);
$server = app($serverClass);

collect($tasks)->each(function ($task) use ($server) {
$server->task($task instanceof Closure ? new SerializableClosure($task) : $task);
Expand Down
Loading

0 comments on commit 6481ec1

Please sign in to comment.