Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
joedixon committed Nov 24, 2023
1 parent f9802ad commit a57abb0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
10 changes: 7 additions & 3 deletions src/Servers/Reverb/Controller.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ public function __invoke(RequestInterface $request, WsConnection $connection, st
return;
}


$connection->onMessage(fn (string $message) => $this->server->message($reverbConnection, $message));
$connection->onClose(fn () => $this->server->close($reverbConnection));
$connection->openStream();

$this->server->open($reverbConnection);

$connection->on('message', fn (string $message) => $this->server->message($reverbConnection, $message));
$connection->on('close', fn () => $this->server->close($reverbConnection));
// $connection->on('message', fn (string $message) => $this->server->message($reverbConnection, $message));
// $connection->on('close', fn () => $this->server->close($reverbConnection));
}

/**
Expand Down
27 changes: 23 additions & 4 deletions src/WebSockets/WsConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,26 @@ class WsConnection extends EventEmitter
{
protected $buffer;

protected $onMessage;

protected $onClose;

public function __construct(public Connection $connection)
{
//
}

public function openStream()
{
$this->buffer = new MessageBuffer(
new CloseFrameChecker,
onMessage: fn (Message $message) => $this->emit('message', [$message->getPayload()]),
onMessage: $this->onMessage ?: fn () => null,
onControl: fn (FrameInterface $message) => $this->control($message),
sender: [$connection, 'send']
sender: [$this->connection, 'send']
);

$connection->on('data', [$this->buffer, 'onData']);
$connection->on('close', fn () => $this->emit('close'));
$this->connection->on('data', [$this->buffer, 'onData']);
$this->connection->on('close', $this->onClose ?: fn () => null);
}

/**
Expand All @@ -46,6 +55,16 @@ public function control(FrameInterface $message): void
};
}

public function onMessage(callable $callback): void
{
$this->onMessage = $callback;
}

public function onClose(callable $callback): void
{
$this->onClose = $callback;
}

/**
* Close the connection.
*/
Expand Down

0 comments on commit a57abb0

Please sign in to comment.