diff --git a/composer.json b/composer.json index 29999d9e..8d2723cd 100644 --- a/composer.json +++ b/composer.json @@ -16,9 +16,9 @@ "ext-swoole": ">=4.0.3", "easyswoole/component": "^1.0", "easyswoole/spl": "^1.0", - "easyswoole/core": "^1.0", "easyswoole/http": "^1.0", - "easyswoole/utility": "^1.0" + "easyswoole/utility": "^1.0", + "jeremeamia/SuperClosure": "^2.4" }, "require-dev": { "easyswoole/swoole-ide-helper": "dev-master" diff --git a/src/AbstractInterface/Event.php b/src/AbstractInterface/Event.php index 99e01b5a..bbe6fcfc 100644 --- a/src/AbstractInterface/Event.php +++ b/src/AbstractInterface/Event.php @@ -9,7 +9,7 @@ namespace EasySwoole\EasySwoole\AbstractInterface; -use EasySwoole\Core\EventRegister; +use EasySwoole\EasySwoole\Swoole\EventRegister; use EasySwoole\Http\Request; use EasySwoole\Http\Response; diff --git a/src/Core.php b/src/Core.php index d73ed9b9..93ec2bf0 100644 --- a/src/Core.php +++ b/src/Core.php @@ -11,10 +11,9 @@ use EasySwoole\Component\Di; use EasySwoole\Component\Singleton; -use EasySwoole\Core\EventHelper; -use EasySwoole\Core\EventRegister; -use EasySwoole\Core\ServerManager; use EasySwoole\EasySwoole\AbstractInterface\Event; +use EasySwoole\EasySwoole\Swoole\EventHelper; +use EasySwoole\EasySwoole\Swoole\EventRegister; use EasySwoole\Http\Request; use EasySwoole\Http\Response; use EasySwoole\Http\WebService; diff --git a/src/Resource/Config.tpl b/src/Resource/Config.tpl index 8db04ad9..def77f54 100755 --- a/src/Resource/Config.tpl +++ b/src/Resource/Config.tpl @@ -4,7 +4,7 @@ return [ 'MAIN_SERVER'=>[ 'HOST'=>'0.0.0.0', 'PORT'=>9501, - 'SERVER_TYPE'=>\EasySwoole\Core\ServerManager::TYPE_WEB_SERVER, + 'SERVER_TYPE'=>\EasySwoole\EasySwoole\ServerManager::TYPE_WEB_SERVER, 'SOCK_TYPE'=>SWOOLE_TCP,//该配置项当为SERVER_TYPE值为TYPE_SERVER时有效 'RUN_MODEL'=>SWOOLE_PROCESS, 'SETTING'=>[ diff --git a/src/Resource/EasySwooleEvent.tpl b/src/Resource/EasySwooleEvent.tpl index 853038dc..492596db 100644 --- a/src/Resource/EasySwooleEvent.tpl +++ b/src/Resource/EasySwooleEvent.tpl @@ -9,7 +9,7 @@ namespace EasySwoole\EasySwoole; -use EasySwoole\Core\EventRegister; +use EasySwoole\EasySwoole\Swoole\EventRegister; use EasySwoole\EasySwoole\AbstractInterface\Event; use EasySwoole\Http\Request; use EasySwoole\Http\Response; diff --git a/src/ServerManager.php b/src/ServerManager.php new file mode 100644 index 00000000..63418871 --- /dev/null +++ b/src/ServerManager.php @@ -0,0 +1,210 @@ +mainServerEventRegister = new EventRegister(); + } + /** + * @param string $serverName + * @return null|\swoole_server|\swoole_server_port + */ + function getSwooleServer(string $serverName = null) + { + if($serverName === null){ + return $this->swooleServer; + }else{ + if(isset($this->swooleServer[$serverName])){ + return $this->swooleServer[$serverName]; + } + return null; + } + } + + function createSwooleServer($port,$type = self::TYPE_SERVER,$address = '0.0.0.0',array $setting = [],...$args):bool + { + switch ($type){ + case self::TYPE_SERVER:{ + $this->swooleServer = new \swoole_server($address,$port,...$args); + break; + } + case self::TYPE_WEB_SERVER:{ + $this->swooleServer = new \swoole_http_server($address,$port,...$args); + break; + } + case self::TYPE_WEB_SOCKET_SERVER:{ + $this->swooleServer = new \swoole_websocket_server($address,$port,...$args); + break; + } + default:{ + Trigger::error('"unknown server type :{$type}"'); + return false; + } + } + if($this->swooleServer){ + $this->swooleServer->set($setting); + } + $this->registerDefault($this->swooleServer); + return true; + } + + + public function addServer(string $serverName,int $port,int $type = SWOOLE_TCP,string $host = '0.0.0.0',array $setting = [ + "open_eof_check"=>false, + ]):EventRegister + { + $eventRegister = new EventRegister(); + $this->subServerRegister[$serverName] = [ + 'port'=>$port, + 'host'=>$host, + 'type'=>$type, + 'setting'=>$setting, + 'eventRegister'=>$eventRegister + ]; + return $eventRegister; + } + + function getMainEventRegister():EventRegister + { + return $this->mainServerEventRegister; + } + + function start() + { + $events = $this->getMainEventRegister()->all(); + foreach ($events as $event => $callback){ + $this->getSwooleServer()->on($event, function (...$args) use ($callback) { + foreach ($callback as $item) { + call_user_func($item,...$args); + } + }); + } + $this->attachListener(); + $this->getSwooleServer()->start(); + } + + private function attachListener():void + { + foreach ($this->subServerRegister as $serverName => $server){ + $subPort = $this->getSwooleServer()->addlistener($server['host'],$server['port'],$server['type']); + if($subPort){ + $this->subServer[$serverName] = $subPort; + if(is_array($server['setting'])){ + $subPort->set($server['setting']); + } + $events = $server['eventRegister']->all(); + foreach ($events as $event => $callback){ + $subPort->on($event, function (...$args) use ($callback) { + foreach ($callback as $item) { + call_user_func($item,...$args); + } + }); + } + }else{ + Trigger::throwable(new \Exception("addListener with server name:{$serverName} at host:{$server['host']} port:{$server['port']} fail")); + } + } + } + + + private function registerDefault(\swoole_server $server) + { + //注册默认的on task,finish 不经过 event register。因为on task需要返回值。不建议重写onTask,否则es自带的异步任务事件失效 + EventHelper::on($server,EventRegister::onTask,function (\swoole_server $server, $taskId, $fromWorkerId,$taskObj){ + if(is_string($taskObj) && class_exists($taskObj)){ + $taskObj = new $taskObj; + } + if($taskObj instanceof AbstractAsyncTask){ + try{ + $ret = $taskObj->run($taskObj->getData(),$taskId,$fromWorkerId); + //在有return或者设置了结果的时候 说明需要执行结束回调 + $ret = is_null($ret) ? $taskObj->getResult() : $ret; + if(!is_null($ret)){ + $taskObj->setResult($ret); + return $taskObj; + } + }catch (\Throwable $throwable){ + $taskObj->onException($throwable); + } + }else if($taskObj instanceof SuperClosure){ + try{ + return $taskObj( $server, $taskId, $fromWorkerId); + }catch (\Throwable $throwable){ + Trigger::throwable($throwable); + } + } + return null; + }); + EventHelper::on($server,EventRegister::onFinish,function (\swoole_server $server, $taskId, $taskObj){ + //finish 在仅仅对AbstractAsyncTask做处理,其余处理无意义。 + if($taskObj instanceof AbstractAsyncTask){ + try{ + $taskObj->finish($taskObj->getResult(),$taskId); + }catch (\Throwable $throwable){ + $taskObj->onException($throwable); + } + } + }); + + //注册默认的pipe通讯 + OnCommand::getInstance()->set('TASK',function ($fromId,$taskObj){ + if(is_string($taskObj) && class_exists($taskObj)){ + $taskObj = new $taskObj; + } + if($taskObj instanceof AbstractAsyncTask){ + try{ + $taskObj->run($taskObj->getData(),ServerManager::getInstance()->getSwooleServer()->worker_id,$fromId); + }catch (\Throwable $throwable){ + $taskObj->onException($throwable); + } + }else if($taskObj instanceof SuperClosure){ + try{ + $taskObj(); + }catch (\Throwable $throwable){ + Trigger::throwable($throwable); + } + } + }); + + EventHelper::on($server,EventRegister::onPipeMessage,function (\swoole_server $server,$fromWorkerId,$data){ + $message = \swoole_serialize::unpack($data); + if($message instanceof Message){ + OnCommand::getInstance()->hook($message->getCommand(),$fromWorkerId,$message->getData()); + }else{ + Trigger::error("data :{$data} not packet by swoole_serialize or not a Message Instance"); + } + }); + } +} \ No newline at end of file diff --git a/src/Swoole/EventHelper.php b/src/Swoole/EventHelper.php new file mode 100644 index 00000000..cfbc1498 --- /dev/null +++ b/src/Swoole/EventHelper.php @@ -0,0 +1,28 @@ +set($event,$callback); + } + + public static function registerWithAdd(EventRegister $register,string $event,callable $callback):void + { + $register->add($event,$callback); + } + + public static function on(\swoole_server $server,string $event,callable $callback) + { + $server->on($event,$callback); + } +} \ No newline at end of file diff --git a/src/Swoole/EventRegister.php b/src/Swoole/EventRegister.php new file mode 100644 index 00000000..d6404a9a --- /dev/null +++ b/src/Swoole/EventRegister.php @@ -0,0 +1,48 @@ +list[$name])){ + $a = new Atomic($int); + $this->list[$name] = $a; + } + } + + function addLong($name,int $int = 0) + { + if(!isset($this->listForLong[$name])){ + $a = new Long($int); + $this->listForLong[$name] = $a; + } + } + + function getLong($name):?Long + { + if(!isset($this->listForLong[$name])){ + return $this->listForLong[$name]; + }else{ + return null; + } + } + + function get($name):?Atomic + { + if(isset($this->list[$name])){ + return $this->list[$name]; + }else{ + return null; + } + } +} \ No newline at end of file diff --git a/src/Swoole/Memory/ChannelManager.php b/src/Swoole/Memory/ChannelManager.php new file mode 100644 index 00000000..538a4a50 --- /dev/null +++ b/src/Swoole/Memory/ChannelManager.php @@ -0,0 +1,36 @@ +list[$name])){ + $chan = new Channel($size); + $this->list[$name] = $chan; + } + } + + function get($name):?Channel + { + if(isset($this->list[$name])){ + return $this->list[$name]; + }else{ + return null; + } + } +} \ No newline at end of file diff --git a/src/Swoole/Memory/TableManager.php b/src/Swoole/Memory/TableManager.php new file mode 100644 index 00000000..2909a540 --- /dev/null +++ b/src/Swoole/Memory/TableManager.php @@ -0,0 +1,47 @@ +['type'=>Table::TYPE_STRING,'size'=>1]] + * @param int $size + */ + public function add($name,array $columns,$size = 1024):void + { + if(!isset($this->list[$name])){ + $table = new Table($size); + foreach ($columns as $column => $item){ + $table->column($column,$item['type'],$item['size']); + } + $table->create(); + $this->list[$name] = $table; + } + } + + public function get($name):?Table + { + if(isset($this->list[$name])){ + return $this->list[$name]; + }else{ + return null; + } + } +} \ No newline at end of file diff --git a/src/Swoole/PipeMessage/Message.php b/src/Swoole/PipeMessage/Message.php new file mode 100644 index 00000000..f43e1829 --- /dev/null +++ b/src/Swoole/PipeMessage/Message.php @@ -0,0 +1,48 @@ +command; + } + + /** + * @param mixed $command + */ + public function setCommand($command): void + { + $this->command = $command; + } + + /** + * @return mixed + */ + public function getData() + { + return $this->data; + } + + /** + * @param mixed $data + */ + public function setData($data): void + { + $this->data = $data; + } +} \ No newline at end of file diff --git a/src/Swoole/PipeMessage/OnCommand.php b/src/Swoole/PipeMessage/OnCommand.php new file mode 100644 index 00000000..cdcfc49c --- /dev/null +++ b/src/Swoole/PipeMessage/OnCommand.php @@ -0,0 +1,17 @@ +data = $data; + } + + /** + * @return null + */ + public function getData() + { + return $this->data; + } + + /** + * @return mixed + */ + public function getResult() + { + return $this->result; + } + + public function setResult($data):void + { + $this->result = $data; + } + + abstract function run($taskData,$taskId,$fromWorkerId); + + abstract function finish($result,$task_id); + + public function onException(\Throwable $throwable):void + { + throw $throwable; + } +} \ No newline at end of file diff --git a/src/Swoole/Task/SuperClosure.php b/src/Swoole/Task/SuperClosure.php new file mode 100644 index 00000000..341cf6e6 --- /dev/null +++ b/src/Swoole/Task/SuperClosure.php @@ -0,0 +1,47 @@ +closure = $closure; + } + + final public function __sleep() + { + $serializer = new Serializer(); + $this->serialized = $serializer->serialize($this->closure); + unset($this->closure); + return ['serialized']; + } + + final public function __wakeup() + { + $serializer = new Serializer(); + $this->closure = $serializer->unserialize($this->serialized); + } + + final public function __invoke() + { + // TODO: Implement __invoke() method. + $args = func_get_args(); + return call_user_func($this->closure,...$args); + } + + final function call(...$args) + { + return call_user_func($this->closure,...$args); + } +} \ No newline at end of file diff --git a/src/Swoole/Task/TaskManager.php b/src/Swoole/Task/TaskManager.php new file mode 100644 index 00000000..79bac355 --- /dev/null +++ b/src/Swoole/Task/TaskManager.php @@ -0,0 +1,72 @@ +getSwooleServer()->task($task,$taskWorkerId,$finishCallback); + } + + + public static function sync($task,$timeout = 0.5,$taskWorkerId = -1) + { + if($task instanceof \Closure){ + try{ + $task = new SuperClosure($task); + }catch (\Throwable $throwable){ + Trigger::throwable($throwable); + return false; + } + } + return ServerManager::getInstance()->getSwooleServer()->taskwait($task,$timeout,$taskWorkerId); + } + + public static function barrier(array $taskList,$timeout = 0.5) + { + $temp =[]; + $map = []; + $result = []; + foreach ($taskList as $name => $task){ + if($task instanceof \Closure){ + try{ + $task = new SuperClosure($task); + }catch (\Throwable $throwable){ + Trigger::throwable($throwable); + return false; + } + } + $temp[] = $task; + $map[] = $name; + } + if(!empty($temp)){ + $ret = ServerManager::getInstance()->getSwooleServer()->taskWaitMulti($temp,$timeout); + if(!empty($ret)){ + //极端情况下 所有任务都超时 + foreach ($ret as $index => $subRet){ + $result[$map[$index]] = $subRet; + } + } + } + return $result; + } +} \ No newline at end of file diff --git a/src/Swoole/Time/Time.php b/src/Swoole/Time/Time.php new file mode 100644 index 00000000..6ed814e7 --- /dev/null +++ b/src/Swoole/Time/Time.php @@ -0,0 +1,44 @@ +getSwooleServer()->tick($microSeconds,$new,$args); + } + + public static function delay($microSeconds,callable $func,$args = null){ + $new = function (...$args)use($func){ + try{ + call_user_func($func,...$args); + }catch (\Throwable $throwable){ + Trigger::throwable($throwable); + } + }; + return ServerManager::getInstance()->getSwooleServer()->after($microSeconds,$new,$args); + } + /* + * @param $timerId + */ + public static function clear($timerId){ + ServerManager::getInstance()->getSwooleServer()->clearTimer($timerId); + } +} \ No newline at end of file