diff --git a/src/tcp-server/src/Response.php b/src/tcp-server/src/Response.php index e31abf9a6..bb0029246 100644 --- a/src/tcp-server/src/Response.php +++ b/src/tcp-server/src/Response.php @@ -10,7 +10,6 @@ use Swoft\Tcp\Server\Exception\TcpResponseException; use Swoft\Tcp\Response as TcpResponse; use Swoole\Server; -use function bean; /** * Class Response @@ -49,7 +48,7 @@ class Response extends TcpResponse implements ResponseInterface public static function new(int $fd = -1): TcpResponse { /** @var self $self */ - $self = bean('tcpResponse'); + $self = Swoft::getBean('tcpResponse'); // Set properties $self->fd = $fd; @@ -67,12 +66,14 @@ public static function new(int $fd = -1): TcpResponse */ public function send(Server $server = null): int { + // Deny repeat call send. + // But if you want send again, you can call `setSent(false)` before call it. if ($this->sent) { return 0; } /** @var Protocol $protocol */ - $protocol = bean('tcpServerProtocol'); + $protocol = Swoft::getBean('tcpServerProtocol'); // Content is empty, skip send if (!$content = $protocol->packResponse($this)) { diff --git a/src/websocket-server/src/Connection.php b/src/websocket-server/src/Connection.php index e1625c3a7..21713fb59 100644 --- a/src/websocket-server/src/Connection.php +++ b/src/websocket-server/src/Connection.php @@ -41,6 +41,11 @@ class Connection implements SessionInterface */ // private $module; + /** + * @var WebSocketServer + */ + private $server; + /** * @var Request|ServerRequestInterface */ @@ -63,18 +68,19 @@ class Connection implements SessionInterface private $moduleInfo; /** - * @param int $fd - * @param Request $request - * @param Response $response + * @param WebSocketServer $server + * @param Request $request + * @param Response $response * * @return Connection */ - public static function new(int $fd, Request $request, Response $response): self + public static function new(WebSocketServer $server, Request $request, Response $response): self { /** @var self $sess */ $sess = Swoft::getBean(self::class); - $sess->fd = $fd; + $sess->fd = $fd = $request->getFd(); + $sess->server = $server; // Init meta info $sess->buildMetadata($fd, $request->getUriPath()); @@ -92,7 +98,7 @@ public static function new(int $fd, Request $request, Response $response): self */ private function buildMetadata(int $fd, string $path): void { - $info = server()->getClientInfo($fd); + $info = $this->server->getClientInfo($fd); server()->log("Handshake: conn#{$fd} send handshake request to {$path}, client info: ", $info, 'debug'); @@ -115,7 +121,7 @@ private function buildMetadata(int $fd, string $path): void */ public function push(string $data, int $opcode = WEBSOCKET_OPCODE_TEXT, bool $finish = true): bool { - return server()->push($this->fd, $data, $opcode, $finish); + return $this->server->push($this->fd, $data, $opcode, $finish); } /** @@ -224,4 +230,12 @@ public function setModuleInfo(array $moduleInfo): void { $this->moduleInfo = $moduleInfo; } + + /** + * @return WebSocketServer + */ + public function getServer(): WebSocketServer + { + return $this->server; + } } diff --git a/src/websocket-server/src/Message/Extension/NullExtension.php b/src/websocket-server/src/Message/Extension/NullExtension.php new file mode 100644 index 000000000..2da1cc313 --- /dev/null +++ b/src/websocket-server/src/Message/Extension/NullExtension.php @@ -0,0 +1,13 @@ +sent) { return 0; } - $server = bean('wsServer'); + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $conn = $conn ?: Session::mustGet(); + $server = $conn->getServer(); $pageSize = $this->pageSize; $content = $this->formatContent($conn); + // Trigger event before push message content to client + Swoft::trigger(WsServerEvent::MESSAGE_PUSH, $server, $content, $this); + // To all users if ($this->sendToAll) { return $server->sendToAll($content, $this->sender, $pageSize, $this->opcode); @@ -221,26 +229,38 @@ public function send(Connection $conn = null): int * @return string * @throws SwoftException */ - protected function formatContent(Connection $conn = null): string + protected function formatContent(Connection $conn): string { // Content for response $content = $this->content; - if ($content === '') { - /** @noinspection CallableParameterUseCaseInTypeContextInspection */ - $conn = $conn ?: Session::mustGet(); + if ($content !== '') { + return $content; + } - /** @var WsMessageContext $context */ - $context = Context::get(true); - $parser = $conn->getParser(); + /** @var WsMessageContext $context */ + $context = Context::get(true); + $parser = $conn->getParser(); + $message = null; - if (is_object($this->data) && $this->data instanceof Message) { + $cmdId = $context->getMessage()->getCmd(); + if (is_object($this->data)) { + if ($this->data instanceof Message) { $message = $this->data; + } elseif ($this->data instanceof Frame) { + $this->setFd($this->data->fd); + $this->setFinish($this->data->finish); + $this->setOpcode($this->data->opcode); + + $content = $this->data->data; } else { - $cmdId = $context->getMessage()->getCmd(); $message = Message::new($cmdId, $this->data, $this->ext); } + } else { + $message = Message::new($cmdId, $this->data, $this->ext); + } + if ($message) { $content = $parser->encode($message); } @@ -262,7 +282,10 @@ public function getFd(): int */ public function setFd(int $fd): self { - $this->fd = $fd; + if ($fd > 0) { + $this->fd = $fd; + } + return $this; } diff --git a/src/websocket-server/src/Swoole/HandshakeListener.php b/src/websocket-server/src/Swoole/HandshakeListener.php index 73f228de8..ba59a8efb 100644 --- a/src/websocket-server/src/Swoole/HandshakeListener.php +++ b/src/websocket-server/src/Swoole/HandshakeListener.php @@ -59,12 +59,13 @@ public function onHandshake(Request $request, Response $response): bool } // Initialize psr7 Request and Response - $psr7Req = Psr7Request::new($request); - $psr7Res = Psr7Response::new($response); + $psr7Req = Psr7Request::new($request); + $psr7Res = Psr7Response::new($response); + $wsServer = Swoft::getBean('wsServer'); // Initialize connection session and context $ctx = WsHandshakeContext::new($psr7Req, $psr7Res); - $conn = Connection::new($fd, $psr7Req, $psr7Res); + $conn = Connection::new($wsServer, $psr7Req, $psr7Res); // Bind connection and bind cid => sid(fd) Session::set($sid, $conn); @@ -80,7 +81,7 @@ public function onHandshake(Request $request, Response $response): bool /** @var Psr7Response $psr7Res */ [$status, $psr7Res] = $dispatcher->handshake($psr7Req, $psr7Res); if (true !== $status) { - server()->log("Handshake: conn#$fd handshake check failed"); + $wsServer->log("Handshake: conn#$fd handshake check failed"); $psr7Res->quickSend(); // NOTICE: Rejecting a handshake still triggers a close event. @@ -96,9 +97,13 @@ public function onHandshake(Request $request, Response $response): bool // Response handshake successfully $meta = $conn->getMetadata(); $conn->setHandshake(true); + + // Swoft::trigger(WsServerEvent::HANDSHAKE_SUCCESS, $fd, $request, $response); + + // Response handshake $psr7Res->quickSend(); - server()->log("Handshake: conn#{$fd} handshake successful! meta:", $meta, 'debug'); + $wsServer->log("Handshake: conn#{$fd} handshake successful! meta:", $meta, 'debug'); Swoft::trigger(WsServerEvent::HANDSHAKE_SUCCESS, $fd, $request, $response); // Handshaking successful, Manually triggering the open event diff --git a/src/websocket-server/src/WsMessageDispatcher.php b/src/websocket-server/src/WsMessageDispatcher.php index beec8a349..5bcc871f0 100644 --- a/src/websocket-server/src/WsMessageDispatcher.php +++ b/src/websocket-server/src/WsMessageDispatcher.php @@ -99,7 +99,7 @@ public function dispatch(Server $server, Request $request, Response $response): $response->setOpcode((int)$route['opcode']); } - // Before send message + // Before call $response send message Swoft::trigger(WsServerEvent::MESSAGE_SEND, $response); // Do send response diff --git a/src/websocket-server/src/WsServerEvent.php b/src/websocket-server/src/WsServerEvent.php index ef091e9c1..8ff8e2099 100644 --- a/src/websocket-server/src/WsServerEvent.php +++ b/src/websocket-server/src/WsServerEvent.php @@ -9,36 +9,45 @@ */ final class WsServerEvent { - public const REGISTER_ROUTE = 'swoft.ws.server.router.register'; + public const REGISTER_ROUTE = 'swoft.ws.server.router.register'; public const HANDSHAKE_BEFORE = 'swoft.ws.server.handshake.before'; public const HANDSHAKE_SUCCESS = 'swoft.ws.server.handshake.ok'; public const HANDSHAKE_ERROR = 'swoft.ws.server.handshake.error'; - public const OPEN_AFTER = 'swoft.ws.server.open.after'; - public const OPEN_ERROR = 'swoft.ws.server.open.error'; + public const OPEN_AFTER = 'swoft.ws.server.open.after'; + public const OPEN_ERROR = 'swoft.ws.server.open.error'; /** * @deprecated Please use MESSAGE_RECEIVE instead. */ - public const MESSAGE_BEFORE = 'swoft.ws.server.message.receive'; + public const MESSAGE_BEFORE = 'swoft.ws.server.message.receive'; /** * On message receive, before handle message */ - public const MESSAGE_RECEIVE = 'swoft.ws.server.message.receive'; + public const MESSAGE_RECEIVE = 'swoft.ws.server.message.receive'; /** - * On before send message + * On before call response->send() */ - public const MESSAGE_SEND = 'swoft.ws.server.message.send'; + public const MESSAGE_SEND = 'swoft.ws.server.message.send'; - // On after handle message - public const MESSAGE_ERROR = 'swoft.ws.server.message.error'; + /** + * On before push message content to client + */ + public const MESSAGE_PUSH = 'swoft.ws.server.message.push'; - // On after handle message - public const MESSAGE_AFTER = 'swoft.ws.server.message.after'; + /** + * On handle message dispatch error + */ + public const MESSAGE_ERROR = 'swoft.ws.server.message.error'; + + /** + * On after dispatch message(after push message) + */ + public const MESSAGE_AFTER = 'swoft.ws.server.message.after'; - public const AFTER_CLOSE = 'swoft.ws.server.close.after'; - public const CLOSE_ERROR = 'swoft.ws.server.close.error'; + public const AFTER_CLOSE = 'swoft.ws.server.close.after'; + public const CLOSE_ERROR = 'swoft.ws.server.close.error'; }