diff --git a/src/Servers/Reverb/Controller.php b/src/Servers/Reverb/Controller.php index f4428a12..ddea4c95 100644 --- a/src/Servers/Reverb/Controller.php +++ b/src/Servers/Reverb/Controller.php @@ -25,10 +25,14 @@ public function __invoke(RequestInterface $request, WsConnection $connection, st return; } + + $connection->onMessage(fn (string $message) => $this->server->message($reverbConnection, $message)); + $connection->onClose(fn () => $this->server->close($reverbConnection)); + $connection->openStream(); + $this->server->open($reverbConnection); - - $connection->on('message', fn (string $message) => $this->server->message($reverbConnection, $message)); - $connection->on('close', fn () => $this->server->close($reverbConnection)); + // $connection->on('message', fn (string $message) => $this->server->message($reverbConnection, $message)); + // $connection->on('close', fn () => $this->server->close($reverbConnection)); } /** diff --git a/src/WebSockets/WsConnection.php b/src/WebSockets/WsConnection.php index 7faf9df2..fc8ecd91 100644 --- a/src/WebSockets/WsConnection.php +++ b/src/WebSockets/WsConnection.php @@ -14,17 +14,26 @@ class WsConnection extends EventEmitter { protected $buffer; + protected $onMessage; + + protected $onClose; + public function __construct(public Connection $connection) + { + // + } + + public function openStream() { $this->buffer = new MessageBuffer( new CloseFrameChecker, - onMessage: fn (Message $message) => $this->emit('message', [$message->getPayload()]), + onMessage: $this->onMessage ?: fn () => null, onControl: fn (FrameInterface $message) => $this->control($message), - sender: [$connection, 'send'] + sender: [$this->connection, 'send'] ); - $connection->on('data', [$this->buffer, 'onData']); - $connection->on('close', fn () => $this->emit('close')); + $this->connection->on('data', [$this->buffer, 'onData']); + $this->connection->on('close', $this->onClose ?: fn () => null); } /** @@ -46,6 +55,16 @@ public function control(FrameInterface $message): void }; } + public function onMessage(callable $callback): void + { + $this->onMessage = $callback; + } + + public function onClose(callable $callback): void + { + $this->onClose = $callback; + } + /** * Close the connection. */