Skip to content
This repository has been archived by the owner on Sep 10, 2024. It is now read-only.

Commit

Permalink
Merge pull request #230 from swooletw/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
albertcht authored Mar 3, 2019
2 parents e0b9167 + 4b1d510 commit 9184fea
Show file tree
Hide file tree
Showing 11 changed files with 510 additions and 359 deletions.
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ sudo: false

matrix:
include:
- php: 7.2
env: FRAMEWORK_VERSION=laravel/framework:5.3.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/framework:5.4.*
- php: 7.2
Expand All @@ -14,8 +12,8 @@ matrix:
env: FRAMEWORK_VERSION=laravel/framework:5.6.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/framework:5.7.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/lumen-framework:5.3.*
# - php: 7.2
# env: FRAMEWORK_VERSION=laravel/framework:5.8.*
- php: 7.2
env: FRAMEWORK_VERSION=laravel/lumen-framework:5.4.*
- php: 7.2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This package provides a high performance HTTP server to speed up your Laravel/Lu

| PHP | Laravel | Lumen | Swoole |
|:-------:|:-------:|:-----:|:-------:|
| >=7.2 | ~5.3 | ~5.3 | >=4.0.0 |
| >=7.2 | ~5.4 | ~5.4 | >=4.0.0 |

## Features

Expand Down
10 changes: 5 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
],
"require": {
"php": "^7.2",
"illuminate/console": "~5.3",
"illuminate/contracts": "~5.3",
"illuminate/http": "~5.3",
"illuminate/support": "~5.3",
"illuminate/console": "~5.4",
"illuminate/contracts": "~5.4",
"illuminate/http": "~5.4",
"illuminate/support": "~5.4",
"predis/predis": "^1.1"
},
"require-dev": {
"laravel/lumen-framework": "~5.3",
"laravel/lumen-framework": "~5.4",
"phpunit/phpunit": "^7.5",
"phpunit/php-code-coverage": "^6.1",
"php-coveralls/php-coveralls": "^2.1",
Expand Down
76 changes: 12 additions & 64 deletions src/Concerns/InteractsWithWebsocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
namespace SwooleTW\Http\Concerns;

use Throwable;
use Illuminate\Support\Arr;
use Illuminate\Pipeline\Pipeline;
use SwooleTW\Http\Server\Sandbox;
use SwooleTW\Http\Websocket\Push;
use SwooleTW\Http\Websocket\Parser;
use SwooleTW\Http\Websocket\Pusher;
use SwooleTW\Http\Websocket\Websocket;
use SwooleTW\Http\Transformers\Request;
use SwooleTW\Http\Server\Facades\Server;
Expand Down Expand Up @@ -158,7 +157,7 @@ public function onClose($server, $fd, $reactorId)
*/
protected function isWebsocketPushPacket($packet)
{
if ( !is_array($packet)) {
if (! is_array($packet)) {
return false;
}

Expand All @@ -176,25 +175,11 @@ protected function isWebsocketPushPacket($packet)
*/
public function pushMessage($server, array $data)
{
$push = Push::new($data);
$payload = $this->payloadParser->encode($push->getEvent(), $push->getMessage());

// attach sender if not broadcast
if (! $push->isBroadcast() && $push->getSender() && ! $push->hasOwnDescriptor()) {
$push->addDescriptor($push->getSender());
}

// check if to broadcast all clients
if ($push->isBroadcastToAllDescriptors()) {
$push->mergeDescriptor($this->filterWebsocket($server->connections));
}

// push message to designated fds
foreach ($push->getDescriptors() as $descriptor) {
if ($server->exist($descriptor) || ! $push->isBroadcastToDescriptor((int) $descriptor)) {
$server->push($descriptor, $payload, $push->getOpcode());
}
}
$pusher = Pusher::make($data, $server);
$pusher->push($this->payloadParser->encode(
$pusher->getEvent(),
$pusher->getMessage()
));
}

/**
Expand Down Expand Up @@ -244,25 +229,8 @@ protected function prepareWebsocket()
*/
protected function isServerWebsocket(int $fd): bool
{
$info = $this->container->make(Server::class)->connection_info($fd);

return Arr::has($info, 'websocket_status') && Arr::get($info, 'websocket_status');
}

/**
* Returns all descriptors that are websocket
*
* @param array $descriptors
*
* @return array
*/
protected function filterWebsocket(array $descriptors): array
{
$callback = function ($descriptor) {
return $this->isServerWebsocket($descriptor);
};

return collect($descriptors)->filter($callback)->toArray();
return $this->container->make(Server::class)
->connection_info($fd)['websocket_status'] ?? false;
}

/**
Expand Down Expand Up @@ -364,41 +332,21 @@ protected function loadWebsocketRoutes()
return require $routePath;
}

/**
* Normalize data for message push.
*
* @param array $data
*
* @return array
*/
public function normalizePushData(array $data)
{
$opcode = Arr::get($data, 'opcode', 1);
$sender = Arr::get($data, 'sender', 0);
$fds = Arr::get($data, 'fds', []);
$broadcast = Arr::get($data, 'broadcast', false);
$assigned = Arr::get($data, 'assigned', false);
$event = Arr::get($data, 'event', null);
$message = Arr::get($data, 'message', null);

return [$opcode, $sender, $fds, $broadcast, $assigned, $event, $message];
}

/**
* Indicates if the payload is websocket push.
*
* @param mixed $payload
*
* @return boolean
*/
protected function isWebsocketPushPayload($payload): bool
public function isWebsocketPushPayload($payload): bool
{
if (! is_array($payload)) {
return false;
}

return $this->isServerWebsocket
&& array_key_exists('action', $payload)
&& $payload['action'] === Websocket::PUSH_ACTION;
&& ($payload['action'] ?? null) === Websocket::PUSH_ACTION
&& array_key_exists('data', $payload);
}
}
38 changes: 12 additions & 26 deletions src/HttpServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace SwooleTW\Http;

use Illuminate\Support\Arr;
use SwooleTW\Http\Helpers\FW;
use Illuminate\Queue\QueueManager;
use Illuminate\Contracts\Http\Kernel;
Expand Down Expand Up @@ -177,17 +176,17 @@ protected function registerDatabaseDriver()
{
$this->app->extend(DatabaseManager::class, function (DatabaseManager $db) {
$db->extend('mysql-coroutine', function ($config, $name) {
$config = $this->getMergedDatabaseConfig($config, $name);
$config['name'] = $name;

$connection = new MySqlConnection(
$this->getNewMySqlConnection($config),
Arr::get($config, 'database'),
Arr::get($config, 'prefix'),
$this->getNewMySqlConnection($config, 'write'),
$config['database'],
$config['prefix'],
$config
);

if (Arr::has($config, 'read')) {
$connection->setReadPdo($this->getNewMySqlConnection($config));
if (isset($config['read'])) {
$connection->setReadPdo($this->getNewMySqlConnection($config, 'read'));
}

return $connection;
Expand All @@ -197,33 +196,20 @@ protected function registerDatabaseDriver()
});
}

/**
* Get mereged config for coroutine mysql.
*
* @param array $config
* @param string $name
*
* @return array
*/
protected function getMergedDatabaseConfig(array $config, string $name)
{
$newConfig = $config;
$newConfig = Arr::add($newConfig, 'name', $name);
$newConfig = array_merge($newConfig, Arr::get($newConfig, 'read', []));
$newConfig = array_merge($newConfig, Arr::get($newConfig, 'write', []));

return $newConfig;
}

/**
* Get a new mysql connection.
*
* @param array $config
* @param string $connection
*
* @return \PDO
*/
protected function getNewMySqlConnection(array $config)
protected function getNewMySqlConnection(array $config, string $connection = null)
{
if ($connection && isset($config[$connection])) {
$config = array_merge($config, $config[$connection]);
}

return ConnectorFactory::make(FW::version())->connect($config);
}

Expand Down
40 changes: 20 additions & 20 deletions src/Server/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Debug\ExceptionHandler;
use SwooleTW\Http\Concerns\InteractsWithWebsocket;
use Symfony\Component\Console\Output\ConsoleOutput;
use SwooleTW\Http\Concerns\InteractsWithSwooleQueue;
use SwooleTW\Http\Concerns\InteractsWithSwooleTable;
use Symfony\Component\Debug\Exception\FatalThrowableError;
Expand Down Expand Up @@ -122,7 +123,7 @@ protected function setSwooleServerListeners()
foreach ($this->events as $event) {
$listener = Str::camel("on_$event");
$callback = method_exists($this, $listener) ? [$this, $listener] : function () use ($event) {
$this->container->make('events')->fire("swoole.$event", func_get_args());
$this->container->make('events')->dispatch("swoole.$event", func_get_args());
};

$this->container->make(Server::class)->on($event, $callback);
Expand All @@ -137,7 +138,7 @@ public function onStart()
$this->setProcessName('master process');
$this->createPidFile();

$this->container->make('events')->fire('swoole.start', func_get_args());
$this->container->make('events')->dispatch('swoole.start', func_get_args());
}

/**
Expand All @@ -148,7 +149,7 @@ public function onStart()
public function onManagerStart()
{
$this->setProcessName('manager process');
$this->container->make('events')->fire('swoole.managerStart', func_get_args());
$this->container->make('events')->dispatch('swoole.managerStart', func_get_args());
}

/**
Expand All @@ -162,7 +163,7 @@ public function onWorkerStart($server)
{
$this->clearCache();

$this->container->make('events')->fire('swoole.workerStart', func_get_args());
$this->container->make('events')->dispatch('swoole.workerStart', func_get_args());

// don't init laravel app in task workers
if ($server->taskworker) {
Expand Down Expand Up @@ -196,7 +197,7 @@ public function onWorkerStart($server)
*/
public function onRequest($swooleRequest, $swooleResponse)
{
$this->app->make('events')->fire('swoole.request');
$this->app->make('events')->dispatch('swoole.request');

$this->resetOnRequest();
$sandbox = $this->app->make(Sandbox::class);
Expand Down Expand Up @@ -261,21 +262,15 @@ protected function resetOnRequest()
*/
public function onTask($server, ...$args)
{
if ($args[0] instanceof \Swoole\Server\Task && $task = array_shift($args)) {
list($taskId, $srcWorkerId, $data) = [$task->id, $task->worker_id, $task->data];
} else {
list($taskId, $srcWorkerId, $data) = $args;
}

$this->container->make('events')->fire('swoole.task', [$server, $taskId, $srcWorkerId, $data]);
$this->container->make('events')->dispatch('swoole.task', [$server, $args]);

try {
// push websocket message
if ($this->isWebsocketPushPayload($data)) {
$this->pushMessage($server, $data['data'] ?? []);
$this->pushMessage($server, $data['data']);
// push async task to queue
} elseif ($this->isAsyncTaskPayload($data)) {
(new SwooleTaskJob($this->container, $server, $data, $taskId, $srcWorkerId))->fire();
(new SwooleTaskJob($this->container, $server, $data, $taskId, $srcWorkerId))->dispatch();
}
} catch (Throwable $e) {
$this->logServerError($e);
Expand All @@ -292,7 +287,7 @@ public function onTask($server, ...$args)
public function onFinish($server, $taskId, $data)
{
// task worker callback
$this->container->make('events')->fire('swoole.finish', func_get_args());
$this->container->make('events')->dispatch('swoole.finish', func_get_args());

return;
}
Expand Down Expand Up @@ -426,11 +421,16 @@ protected function isInTesting()
*/
public function logServerError(Throwable $e)
{
$this->container
->make(ExceptionHandler::class)
->report(
$this->normalizeException($e)
);
if ($this->isInTesting()) {
return;
}

$exception = $this->normalizeException($e);
$this->container->make(ConsoleOutput::class)
->writeln(sprintf("<error>%s</error>", $exception));

$this->container->make(ExceptionHandler::class)
->report($exception);
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/Transformers/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected function sendInChunk($content)
$this->swooleResponse->write($v);
}
}

$this->swooleResponse->end();
}

Expand Down
Loading

0 comments on commit 9184fea

Please sign in to comment.