Skip to content
This repository has been archived by the owner on Sep 10, 2024. It is now read-only.

Develop #230

Merged
merged 15 commits into from
Mar 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ sudo: false

matrix:
include:
- php: 7.2
env: FRAMEWORK_VERSION=laravel/framework:5.3.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/framework:5.4.*
- php: 7.2
Expand All @@ -14,8 +12,8 @@ matrix:
env: FRAMEWORK_VERSION=laravel/framework:5.6.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/framework:5.7.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/lumen-framework:5.3.*
# - php: 7.2
# env: FRAMEWORK_VERSION=laravel/framework:5.8.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/lumen-framework:5.4.*
- php: 7.2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This package provides a high performance HTTP server to speed up your Laravel/Lu

| PHP | Laravel | Lumen | Swoole |
|:-------:|:-------:|:-----:|:-------:|
| >=7.2 | ~5.3 | ~5.3 | >=4.0.0 |
| >=7.2 | ~5.4 | ~5.4 | >=4.0.0 |

## Features

Expand Down
10 changes: 5 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
],
"require": {
"php": "^7.2",
"illuminate/console": "~5.3",
"illuminate/contracts": "~5.3",
"illuminate/http": "~5.3",
"illuminate/support": "~5.3",
"illuminate/console": "~5.4",
"illuminate/contracts": "~5.4",
"illuminate/http": "~5.4",
"illuminate/support": "~5.4",
"predis/predis": "^1.1"
},
"require-dev": {
"laravel/lumen-framework": "~5.3",
"laravel/lumen-framework": "~5.4",
"phpunit/phpunit": "^7.5",
"phpunit/php-code-coverage": "^6.1",
"php-coveralls/php-coveralls": "^2.1",
Expand Down
76 changes: 12 additions & 64 deletions src/Concerns/InteractsWithWebsocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
namespace SwooleTW\Http\Concerns;

use Throwable;
use Illuminate\Support\Arr;
use Illuminate\Pipeline\Pipeline;
use SwooleTW\Http\Server\Sandbox;
use SwooleTW\Http\Websocket\Push;
use SwooleTW\Http\Websocket\Parser;
use SwooleTW\Http\Websocket\Pusher;
use SwooleTW\Http\Websocket\Websocket;
use SwooleTW\Http\Transformers\Request;
use SwooleTW\Http\Server\Facades\Server;
Expand Down Expand Up @@ -158,7 +157,7 @@ public function onClose($server, $fd, $reactorId)
*/
protected function isWebsocketPushPacket($packet)
{
if ( !is_array($packet)) {
if (! is_array($packet)) {
return false;
}

Expand All @@ -176,25 +175,11 @@ protected function isWebsocketPushPacket($packet)
*/
public function pushMessage($server, array $data)
{
$push = Push::new($data);
$payload = $this->payloadParser->encode($push->getEvent(), $push->getMessage());

// attach sender if not broadcast
if (! $push->isBroadcast() && $push->getSender() && ! $push->hasOwnDescriptor()) {
$push->addDescriptor($push->getSender());
}

// check if to broadcast all clients
if ($push->isBroadcastToAllDescriptors()) {
$push->mergeDescriptor($this->filterWebsocket($server->connections));
}

// push message to designated fds
foreach ($push->getDescriptors() as $descriptor) {
if ($server->exist($descriptor) || ! $push->isBroadcastToDescriptor((int) $descriptor)) {
$server->push($descriptor, $payload, $push->getOpcode());
}
}
$pusher = Pusher::make($data, $server);
$pusher->push($this->payloadParser->encode(
$pusher->getEvent(),
$pusher->getMessage()
));
}

/**
Expand Down Expand Up @@ -244,25 +229,8 @@ protected function prepareWebsocket()
*/
protected function isServerWebsocket(int $fd): bool
{
$info = $this->container->make(Server::class)->connection_info($fd);

return Arr::has($info, 'websocket_status') && Arr::get($info, 'websocket_status');
}

/**
* Returns all descriptors that are websocket
*
* @param array $descriptors
*
* @return array
*/
protected function filterWebsocket(array $descriptors): array
{
$callback = function ($descriptor) {
return $this->isServerWebsocket($descriptor);
};

return collect($descriptors)->filter($callback)->toArray();
return $this->container->make(Server::class)
->connection_info($fd)['websocket_status'] ?? false;
}

/**
Expand Down Expand Up @@ -364,41 +332,21 @@ protected function loadWebsocketRoutes()
return require $routePath;
}

/**
* Normalize data for message push.
*
* @param array $data
*
* @return array
*/
public function normalizePushData(array $data)
{
$opcode = Arr::get($data, 'opcode', 1);
$sender = Arr::get($data, 'sender', 0);
$fds = Arr::get($data, 'fds', []);
$broadcast = Arr::get($data, 'broadcast', false);
$assigned = Arr::get($data, 'assigned', false);
$event = Arr::get($data, 'event', null);
$message = Arr::get($data, 'message', null);

return [$opcode, $sender, $fds, $broadcast, $assigned, $event, $message];
}

/**
* Indicates if the payload is websocket push.
*
* @param mixed $payload
*
* @return boolean
*/
protected function isWebsocketPushPayload($payload): bool
public function isWebsocketPushPayload($payload): bool
{
if (! is_array($payload)) {
return false;
}

return $this->isServerWebsocket
&& array_key_exists('action', $payload)
&& $payload['action'] === Websocket::PUSH_ACTION;
&& ($payload['action'] ?? null) === Websocket::PUSH_ACTION
&& array_key_exists('data', $payload);
}
}
38 changes: 12 additions & 26 deletions src/HttpServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace SwooleTW\Http;

use Illuminate\Support\Arr;
use SwooleTW\Http\Helpers\FW;
use Illuminate\Queue\QueueManager;
use Illuminate\Contracts\Http\Kernel;
Expand Down Expand Up @@ -177,17 +176,17 @@ protected function registerDatabaseDriver()
{
$this->app->extend(DatabaseManager::class, function (DatabaseManager $db) {
$db->extend('mysql-coroutine', function ($config, $name) {
$config = $this->getMergedDatabaseConfig($config, $name);
$config['name'] = $name;

$connection = new MySqlConnection(
$this->getNewMySqlConnection($config),
Arr::get($config, 'database'),
Arr::get($config, 'prefix'),
$this->getNewMySqlConnection($config, 'write'),
$config['database'],
$config['prefix'],
$config
);

if (Arr::has($config, 'read')) {
$connection->setReadPdo($this->getNewMySqlConnection($config));
if (isset($config['read'])) {
$connection->setReadPdo($this->getNewMySqlConnection($config, 'read'));
}

return $connection;
Expand All @@ -197,33 +196,20 @@ protected function registerDatabaseDriver()
});
}

/**
* Get mereged config for coroutine mysql.
*
* @param array $config
* @param string $name
*
* @return array
*/
protected function getMergedDatabaseConfig(array $config, string $name)
{
$newConfig = $config;
$newConfig = Arr::add($newConfig, 'name', $name);
$newConfig = array_merge($newConfig, Arr::get($newConfig, 'read', []));
$newConfig = array_merge($newConfig, Arr::get($newConfig, 'write', []));

return $newConfig;
}

/**
* Get a new mysql connection.
*
* @param array $config
* @param string $connection
*
* @return \PDO
*/
protected function getNewMySqlConnection(array $config)
protected function getNewMySqlConnection(array $config, string $connection = null)
{
if ($connection && isset($config[$connection])) {
$config = array_merge($config, $config[$connection]);
}

return ConnectorFactory::make(FW::version())->connect($config);
}

Expand Down
40 changes: 20 additions & 20 deletions src/Server/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Debug\ExceptionHandler;
use SwooleTW\Http\Concerns\InteractsWithWebsocket;
use Symfony\Component\Console\Output\ConsoleOutput;
use SwooleTW\Http\Concerns\InteractsWithSwooleQueue;
use SwooleTW\Http\Concerns\InteractsWithSwooleTable;
use Symfony\Component\Debug\Exception\FatalThrowableError;
Expand Down Expand Up @@ -122,7 +123,7 @@ protected function setSwooleServerListeners()
foreach ($this->events as $event) {
$listener = Str::camel("on_$event");
$callback = method_exists($this, $listener) ? [$this, $listener] : function () use ($event) {
$this->container->make('events')->fire("swoole.$event", func_get_args());
$this->container->make('events')->dispatch("swoole.$event", func_get_args());
};

$this->container->make(Server::class)->on($event, $callback);
Expand All @@ -137,7 +138,7 @@ public function onStart()
$this->setProcessName('master process');
$this->createPidFile();

$this->container->make('events')->fire('swoole.start', func_get_args());
$this->container->make('events')->dispatch('swoole.start', func_get_args());
}

/**
Expand All @@ -148,7 +149,7 @@ public function onStart()
public function onManagerStart()
{
$this->setProcessName('manager process');
$this->container->make('events')->fire('swoole.managerStart', func_get_args());
$this->container->make('events')->dispatch('swoole.managerStart', func_get_args());
}

/**
Expand All @@ -162,7 +163,7 @@ public function onWorkerStart($server)
{
$this->clearCache();

$this->container->make('events')->fire('swoole.workerStart', func_get_args());
$this->container->make('events')->dispatch('swoole.workerStart', func_get_args());

// don't init laravel app in task workers
if ($server->taskworker) {
Expand Down Expand Up @@ -196,7 +197,7 @@ public function onWorkerStart($server)
*/
public function onRequest($swooleRequest, $swooleResponse)
{
$this->app->make('events')->fire('swoole.request');
$this->app->make('events')->dispatch('swoole.request');

$this->resetOnRequest();
$sandbox = $this->app->make(Sandbox::class);
Expand Down Expand Up @@ -261,21 +262,15 @@ protected function resetOnRequest()
*/
public function onTask($server, ...$args)
{
if ($args[0] instanceof \Swoole\Server\Task && $task = array_shift($args)) {
list($taskId, $srcWorkerId, $data) = [$task->id, $task->worker_id, $task->data];
} else {
list($taskId, $srcWorkerId, $data) = $args;
}

$this->container->make('events')->fire('swoole.task', [$server, $taskId, $srcWorkerId, $data]);
$this->container->make('events')->dispatch('swoole.task', [$server, $args]);

try {
// push websocket message
if ($this->isWebsocketPushPayload($data)) {
$this->pushMessage($server, $data['data'] ?? []);
$this->pushMessage($server, $data['data']);
// push async task to queue
} elseif ($this->isAsyncTaskPayload($data)) {
(new SwooleTaskJob($this->container, $server, $data, $taskId, $srcWorkerId))->fire();
(new SwooleTaskJob($this->container, $server, $data, $taskId, $srcWorkerId))->dispatch();
}
} catch (Throwable $e) {
$this->logServerError($e);
Expand All @@ -292,7 +287,7 @@ public function onTask($server, ...$args)
public function onFinish($server, $taskId, $data)
{
// task worker callback
$this->container->make('events')->fire('swoole.finish', func_get_args());
$this->container->make('events')->dispatch('swoole.finish', func_get_args());

return;
}
Expand Down Expand Up @@ -426,11 +421,16 @@ protected function isInTesting()
*/
public function logServerError(Throwable $e)
{
$this->container
->make(ExceptionHandler::class)
->report(
$this->normalizeException($e)
);
if ($this->isInTesting()) {
return;
}

$exception = $this->normalizeException($e);
$this->container->make(ConsoleOutput::class)
->writeln(sprintf("<error>%s</error>", $exception));

$this->container->make(ExceptionHandler::class)
->report($exception);
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/Transformers/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected function sendInChunk($content)
$this->swooleResponse->write($v);
}
}

$this->swooleResponse->end();
}

Expand Down
Loading