Skip to content

Commit

Permalink
3.1.7
Browse files Browse the repository at this point in the history
  • Loading branch information
kiss291323003 committed Jan 10, 2019
1 parent 249f01f commit c838c84
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
15 changes: 11 additions & 4 deletions src/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
use EasySwoole\EasySwoole\Swoole\PipeMessage\OnCommand;
use EasySwoole\EasySwoole\Swoole\Task\AbstractAsyncTask;
use EasySwoole\EasySwoole\Swoole\Task\SuperClosure;
use Swoole\Server\Task;
use Swoole\Server\Task;

////////////////////////////////////////////////////////////////////
// _ooOoo_ //
Expand Down Expand Up @@ -360,11 +360,18 @@ private function registerDefaultCallBack(\swoole_server $server,int $serverType)
//空逻辑
});



//通过pipe通讯也就是processAsync投递的闭包任务是没有taskId信息的因此参数传递默认-1
OnCommand::getInstance()->set('TASK',function (\swoole_server $server,$taskObj,$fromWorkerId){
$server->task($taskObj);
//闭包任务无法再次二次序列化,因此直接执行
if($taskObj instanceof SuperClosure){
try{
call_user_func($taskObj,$server,-1,$fromWorkerId);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
}else{
$server->task($taskObj);
}
});

EventHelper::on($server,EventRegister::onPipeMessage,function (\swoole_server $server,$fromWorkerId,$data){
Expand Down
13 changes: 12 additions & 1 deletion src/Swoole/Task/TaskManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ public static function processAsync($task)
{
$conf = ServerManager::getInstance()->getSwooleServer()->setting;
$workerNum = $conf['worker_num'];
if(!isset($conf['task_worker_num'])){
return false;
}
$taskNum = $conf['task_worker_num'];
$closure = false;
if($task instanceof \Closure){
try{
$task = new SuperClosure($task);
$closure = true;
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
return false;
Expand All @@ -44,7 +50,12 @@ public static function processAsync($task)
$message->setCommand('TASK');
$message->setData($task);
mt_srand();
$workerId = mt_rand(0,$workerNum-1);
//闭包无法再onPipeMessage中再次被序列化因此直接投递给task进程直接执行
if($closure){
$workerId = mt_rand($workerNum,($workerNum+$taskNum)-1);
}else{
$workerId = mt_rand(0,$workerNum -1);
}
ServerManager::getInstance()->getSwooleServer()->sendMessage(serialize($message),$workerId);
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/SysConst.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class SysConst
{
const EASYSWOOLE_VERSION = '3.1.6';
const EASYSWOOLE_VERSION = '3.1.7';
const ERROR_HANDLER = 'ERROR_HANDLER';
const SHUTDOWN_FUNCTION = 'SHUTDOWN_FUNCTION';

Expand Down

0 comments on commit c838c84

Please sign in to comment.