Skip to content

Commit

Permalink
add more event for ws server message dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
inhere committed Aug 9, 2019
1 parent 3c81001 commit e1b0583
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 42 deletions.
7 changes: 4 additions & 3 deletions src/tcp-server/src/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Swoft\Tcp\Server\Exception\TcpResponseException;
use Swoft\Tcp\Response as TcpResponse;
use Swoole\Server;
use function bean;

/**
* Class Response
Expand Down Expand Up @@ -49,7 +48,7 @@ class Response extends TcpResponse implements ResponseInterface
public static function new(int $fd = -1): TcpResponse
{
/** @var self $self */
$self = bean('tcpResponse');
$self = Swoft::getBean('tcpResponse');

// Set properties
$self->fd = $fd;
Expand All @@ -67,12 +66,14 @@ public static function new(int $fd = -1): TcpResponse
*/
public function send(Server $server = null): int
{
// Deny repeat call send.
// But if you want send again, you can call `setSent(false)` before call it.
if ($this->sent) {
return 0;
}

/** @var Protocol $protocol */
$protocol = bean('tcpServerProtocol');
$protocol = Swoft::getBean('tcpServerProtocol');

// Content is empty, skip send
if (!$content = $protocol->packResponse($this)) {
Expand Down
28 changes: 21 additions & 7 deletions src/websocket-server/src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class Connection implements SessionInterface
*/
// private $module;

/**
* @var WebSocketServer
*/
private $server;

/**
* @var Request|ServerRequestInterface
*/
Expand All @@ -63,18 +68,19 @@ class Connection implements SessionInterface
private $moduleInfo;

/**
* @param int $fd
* @param Request $request
* @param Response $response
* @param WebSocketServer $server
* @param Request $request
* @param Response $response
*
* @return Connection
*/
public static function new(int $fd, Request $request, Response $response): self
public static function new(WebSocketServer $server, Request $request, Response $response): self
{
/** @var self $sess */
$sess = Swoft::getBean(self::class);

$sess->fd = $fd;
$sess->fd = $fd = $request->getFd();
$sess->server = $server;

// Init meta info
$sess->buildMetadata($fd, $request->getUriPath());
Expand All @@ -92,7 +98,7 @@ public static function new(int $fd, Request $request, Response $response): self
*/
private function buildMetadata(int $fd, string $path): void
{
$info = server()->getClientInfo($fd);
$info = $this->server->getClientInfo($fd);

server()->log("Handshake: conn#{$fd} send handshake request to {$path}, client info: ", $info, 'debug');

Expand All @@ -115,7 +121,7 @@ private function buildMetadata(int $fd, string $path): void
*/
public function push(string $data, int $opcode = WEBSOCKET_OPCODE_TEXT, bool $finish = true): bool
{
return server()->push($this->fd, $data, $opcode, $finish);
return $this->server->push($this->fd, $data, $opcode, $finish);
}

/**
Expand Down Expand Up @@ -224,4 +230,12 @@ public function setModuleInfo(array $moduleInfo): void
{
$this->moduleInfo = $moduleInfo;
}

/**
* @return WebSocketServer
*/
public function getServer(): WebSocketServer
{
return $this->server;
}
}
13 changes: 13 additions & 0 deletions src/websocket-server/src/Message/Extension/NullExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php declare(strict_types=1);

namespace Swoft\WebSocket\Server\Message;

/**
* Class NullExtension
*
* @since 2.0.6
*/
class NullExtension
{

}
49 changes: 36 additions & 13 deletions src/websocket-server/src/Message/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Swoft\WebSocket\Server\Message;

use Swoft;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Context\Context;
use Swoft\Exception\SwoftException;
Expand All @@ -13,6 +14,8 @@
use Swoft\WebSocket\Server\Contract\ResponseInterface;
use function bean;
use function is_object;
use Swoft\WebSocket\Server\WsServerEvent;
use Swoole\WebSocket\Frame;
use const WEBSOCKET_OPCODE_TEXT;

/**
Expand Down Expand Up @@ -177,17 +180,22 @@ public function toAll(bool $yes = true): ResponseInterface
*/
public function send(Connection $conn = null): int
{
// Deny repeat send.
// But if you want send again, you can call `setSent(false)` before send.
// Deny repeat call send.
// But if you want send again, you can call `setSent(false)` before call it.
if ($this->sent) {
return 0;
}

$server = bean('wsServer');
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$conn = $conn ?: Session::mustGet();
$server = $conn->getServer();

$pageSize = $this->pageSize;
$content = $this->formatContent($conn);

// Trigger event before push message content to client
Swoft::trigger(WsServerEvent::MESSAGE_PUSH, $server, $content, $this);

// To all users
if ($this->sendToAll) {
return $server->sendToAll($content, $this->sender, $pageSize, $this->opcode);
Expand Down Expand Up @@ -221,26 +229,38 @@ public function send(Connection $conn = null): int
* @return string
* @throws SwoftException
*/
protected function formatContent(Connection $conn = null): string
protected function formatContent(Connection $conn): string
{
// Content for response
$content = $this->content;

if ($content === '') {
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$conn = $conn ?: Session::mustGet();
if ($content !== '') {
return $content;
}

/** @var WsMessageContext $context */
$context = Context::get(true);
$parser = $conn->getParser();
/** @var WsMessageContext $context */
$context = Context::get(true);
$parser = $conn->getParser();
$message = null;

if (is_object($this->data) && $this->data instanceof Message) {
$cmdId = $context->getMessage()->getCmd();
if (is_object($this->data)) {
if ($this->data instanceof Message) {
$message = $this->data;
} elseif ($this->data instanceof Frame) {
$this->setFd($this->data->fd);
$this->setFinish($this->data->finish);
$this->setOpcode($this->data->opcode);

$content = $this->data->data;
} else {
$cmdId = $context->getMessage()->getCmd();
$message = Message::new($cmdId, $this->data, $this->ext);
}
} else {
$message = Message::new($cmdId, $this->data, $this->ext);
}

if ($message) {
$content = $parser->encode($message);
}

Expand All @@ -262,7 +282,10 @@ public function getFd(): int
*/
public function setFd(int $fd): self
{
$this->fd = $fd;
if ($fd > 0) {
$this->fd = $fd;
}

return $this;
}

Expand Down
15 changes: 10 additions & 5 deletions src/websocket-server/src/Swoole/HandshakeListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ public function onHandshake(Request $request, Response $response): bool
}

// Initialize psr7 Request and Response
$psr7Req = Psr7Request::new($request);
$psr7Res = Psr7Response::new($response);
$psr7Req = Psr7Request::new($request);
$psr7Res = Psr7Response::new($response);
$wsServer = Swoft::getBean('wsServer');

// Initialize connection session and context
$ctx = WsHandshakeContext::new($psr7Req, $psr7Res);
$conn = Connection::new($fd, $psr7Req, $psr7Res);
$conn = Connection::new($wsServer, $psr7Req, $psr7Res);

// Bind connection and bind cid => sid(fd)
Session::set($sid, $conn);
Expand All @@ -80,7 +81,7 @@ public function onHandshake(Request $request, Response $response): bool
/** @var Psr7Response $psr7Res */
[$status, $psr7Res] = $dispatcher->handshake($psr7Req, $psr7Res);
if (true !== $status) {
server()->log("Handshake: conn#$fd handshake check failed");
$wsServer->log("Handshake: conn#$fd handshake check failed");
$psr7Res->quickSend();

// NOTICE: Rejecting a handshake still triggers a close event.
Expand All @@ -96,9 +97,13 @@ public function onHandshake(Request $request, Response $response): bool
// Response handshake successfully
$meta = $conn->getMetadata();
$conn->setHandshake(true);

// Swoft::trigger(WsServerEvent::HANDSHAKE_SUCCESS, $fd, $request, $response);

// Response handshake
$psr7Res->quickSend();

server()->log("Handshake: conn#{$fd} handshake successful! meta:", $meta, 'debug');
$wsServer->log("Handshake: conn#{$fd} handshake successful! meta:", $meta, 'debug');
Swoft::trigger(WsServerEvent::HANDSHAKE_SUCCESS, $fd, $request, $response);

// Handshaking successful, Manually triggering the open event
Expand Down
2 changes: 1 addition & 1 deletion src/websocket-server/src/WsMessageDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public function dispatch(Server $server, Request $request, Response $response):
$response->setOpcode((int)$route['opcode']);
}

// Before send message
// Before call $response send message
Swoft::trigger(WsServerEvent::MESSAGE_SEND, $response);

// Do send response
Expand Down
35 changes: 22 additions & 13 deletions src/websocket-server/src/WsServerEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,45 @@
*/
final class WsServerEvent
{
public const REGISTER_ROUTE = 'swoft.ws.server.router.register';
public const REGISTER_ROUTE = 'swoft.ws.server.router.register';

public const HANDSHAKE_BEFORE = 'swoft.ws.server.handshake.before';
public const HANDSHAKE_SUCCESS = 'swoft.ws.server.handshake.ok';
public const HANDSHAKE_ERROR = 'swoft.ws.server.handshake.error';

public const OPEN_AFTER = 'swoft.ws.server.open.after';
public const OPEN_ERROR = 'swoft.ws.server.open.error';
public const OPEN_AFTER = 'swoft.ws.server.open.after';
public const OPEN_ERROR = 'swoft.ws.server.open.error';

/**
* @deprecated Please use MESSAGE_RECEIVE instead.
*/
public const MESSAGE_BEFORE = 'swoft.ws.server.message.receive';
public const MESSAGE_BEFORE = 'swoft.ws.server.message.receive';

/**
* On message receive, before handle message
*/
public const MESSAGE_RECEIVE = 'swoft.ws.server.message.receive';
public const MESSAGE_RECEIVE = 'swoft.ws.server.message.receive';

/**
* On before send message
* On before call response->send()
*/
public const MESSAGE_SEND = 'swoft.ws.server.message.send';
public const MESSAGE_SEND = 'swoft.ws.server.message.send';

// On after handle message
public const MESSAGE_ERROR = 'swoft.ws.server.message.error';
/**
* On before push message content to client
*/
public const MESSAGE_PUSH = 'swoft.ws.server.message.push';

// On after handle message
public const MESSAGE_AFTER = 'swoft.ws.server.message.after';
/**
* On handle message dispatch error
*/
public const MESSAGE_ERROR = 'swoft.ws.server.message.error';

/**
* On after dispatch message(after push message)
*/
public const MESSAGE_AFTER = 'swoft.ws.server.message.after';

public const AFTER_CLOSE = 'swoft.ws.server.close.after';
public const CLOSE_ERROR = 'swoft.ws.server.close.error';
public const AFTER_CLOSE = 'swoft.ws.server.close.after';
public const CLOSE_ERROR = 'swoft.ws.server.close.error';
}

0 comments on commit e1b0583

Please sign in to comment.