Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dependencies #15

Merged
merged 35 commits into from
Nov 16, 2023
Merged
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
joedixon committed Nov 14, 2023
commit 50bcde0dbbaa8b409c931cf72b21edc65b0a90f1
55 changes: 53 additions & 2 deletions src/Http/Middleware/WebSocketMiddleware.php
Original file line number Diff line number Diff line change
@@ -3,9 +3,15 @@
namespace Laravel\Reverb\Http\Middleware;

use Closure;
use Laravel\Reverb\Application;
use Laravel\Reverb\Contracts\ApplicationProvider;
use Laravel\Reverb\Contracts\ConnectionManager;
use Laravel\Reverb\Server;
use Laravel\Reverb\WebSockets\Connection;
use Laravel\Reverb\WebSockets\Request as WebSocketRequest;
use Laravel\Reverb\WebSockets\WsConnection;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Message\Response;

class WebSocketMiddleware
{
@@ -14,14 +20,59 @@ public function __construct(protected Server $server)

}

public function __invoke(ServerRequestInterface $request, Closure $next)
/**
* Invoke the WebSocket middleware.
*/
public function __invoke(ServerRequestInterface $request, Closure $next): ServerRequestInterface|Response
{
$wsRequest = new WebSocketRequest($request);

if (! $wsRequest->isWebSocketRequest()) {
return $next($request);
}

return $wsRequest->negotiate($request);
$connection = $this->connection($request, $ws = $wsRequest->connect());

$this->server->open($connection);

$ws->on('message', fn (string $message) => $this->server->message($connection, $message));
$ws->on('close', fn () => $this->server->close($connection));

return $wsRequest->respond();
}

/**
* Get the application from the request.
*/
protected function application(ServerRequestInterface $request): Application
{
parse_str($request->getUri()->getQuery(), $queryString);

return app(ApplicationProvider::class)->findByKey($queryString['appId']);
}

/**
* Get the origin from the request.
*/
protected function origin(ServerRequestInterface $request): ?string
{
return $request->getHeader('Origin')[0] ?? null;
}

/**
* Get a Reverb connection from a Ratchet connection.
*/
protected function connection(ServerRequestInterface $request, WsConnection $connection): Connection
{
return app(ConnectionManager::class)
->for($app = $this->application($request))
->resolve(
$connection->resourceId,
fn () => new Connection(
$connection,
$app,
$this->origin($request)
)
);
}
}
40 changes: 12 additions & 28 deletions src/WebSockets/Connection.php
Original file line number Diff line number Diff line change
@@ -2,15 +2,9 @@

namespace Laravel\Reverb\WebSockets;

use Illuminate\Support\Facades\App;
use Laravel\Reverb\Application;
use Laravel\Reverb\Concerns\GeneratesPusherIdentifiers;
use Laravel\Reverb\Connection as ReverbConnection;
use Laravel\Reverb\Contracts\ConnectionManager;
use Laravel\Reverb\Server;
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Message;
use Ratchet\RFC6455\Messaging\MessageBuffer;

class Connection extends ReverbConnection
{
@@ -28,25 +22,6 @@ class Connection extends ReverbConnection
public function __construct(protected WsConnection $connection, Application $application, string $origin = null)
{
parent::__construct($application, $origin);

$this->buffer = new MessageBuffer(
new CloseFrameChecker,
onMessage: function (Message $message) {
App::make(Server::class)->message($this, $message->getPayload());
},
sender: [$connection->stream, 'write']
);

App::make(ConnectionManager::class)->for($application)->resolve(
$connection->resourceId,
fn () => $this
);

App::make(Server::class)->open($this);
$connection->stream->on('data', [$this->buffer, 'onData']);
$connection->stream->on('close', function () {
App::make(Server::class)->close($this);
});
}

/**
@@ -69,18 +44,27 @@ public function id(): string
return $this->id;
}

public static function make(WsConnection $connection, $application, $origin)
/**
* Create a new connection instance.
*/
public static function make(WsConnection $connection, Application $application, string $origin): Connection
{
return new static($connection, $application, $origin);
}

/**
* Send a message to the connection.
*/
public function send(string $message): void
{
$this->buffer->sendMessage($message);
$this->connection->send($message);
}

/**
* Terminate a connection.
*/
public function terminate(): void
{
$this->connection->stream->close();
$this->connection->close();
}
}
72 changes: 28 additions & 44 deletions src/WebSockets/Request.php
Original file line number Diff line number Diff line change
@@ -2,8 +2,6 @@

namespace Laravel\Reverb\WebSockets;

use Illuminate\Support\Facades\App;
use Laravel\Reverb\Contracts\ApplicationProvider;
use Psr\Http\Message\ServerRequestInterface;
use Ratchet\RFC6455\Handshake\RequestVerifier;
use Ratchet\RFC6455\Handshake\ServerNegotiator;
@@ -15,62 +13,48 @@ class Request
{
protected $connection;

public function __construct(protected ServerRequestInterface $request)
{
protected $input;

}
protected $output;

public function isWebSocketRequest()
{
$upgrade = $this->request->getHeader('Upgrade')[0] ?? null;
protected $stream;

return $upgrade === 'websocket';
}
protected $response;

public function negotiate()
public function __construct(protected ServerRequestInterface $request)
{
$negotiator = new ServerNegotiator(new RequestVerifier);
$response = $negotiator->handshake($this->request);

if ($response->getStatusCode() != '101') {
return false;
}

$inStream = new ThroughStream();
$outStream = new ThroughStream();

$this->connect($inStream, $outStream);

return new Response(
$response->getStatusCode(),
$response->getHeaders(),
new CompositeStream($outStream, $inStream)
);
$this->response = $negotiator->handshake($this->request);
$this->input = new ThroughStream;
$this->output = new ThroughStream;
$this->stream = new CompositeStream($this->input, $this->output);
}

public function connect($inStream, $outStream)
/**
* Determine whether thee request is a WebSocket request.
*/
public function isWebSocketRequest(): bool
{
return $this->connection = Connection::make(
new WsConnection(new CompositeStream($inStream, $outStream)),
$this->application(),
$this->origin(),
);
return $this->response->getStatusCode() === 101;
}

public function connection()
/**
* Generate the response to the WebSocket request.
*/
public function respond(): Response
{
return $this->connection;
}

protected function application()
{
parse_str($this->request->getUri()->getQuery(), $queryString);

return App::make(ApplicationProvider::class)->findByKey($queryString['appId']);
return new Response(
$this->response->getStatusCode(),
$this->response->getHeaders(),
new CompositeStream($this->output, $this->input)
);
}

protected function origin()
/**
* Generate a WebSocket connection from the request.
*/
public function connect(): WsConnection
{
return $this->request->getHeader('Origin')[0] ?? null;
return new WsConnection($this->stream);
}
}
37 changes: 34 additions & 3 deletions src/WebSockets/WsConnection.php
Original file line number Diff line number Diff line change
@@ -2,16 +2,47 @@

namespace Laravel\Reverb\WebSockets;

use React\Stream\CompositeStream;
use Evenement\EventEmitter;
use Illuminate\Support\Str;
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Message;
use Ratchet\RFC6455\Messaging\MessageBuffer;
use React\Stream\DuplexStreamInterface;

class WsConnection
class WsConnection extends EventEmitter
{
public string $resourceId;

protected $buffer;

public function __construct(public DuplexStreamInterface $stream)
{
$this->resourceId = (string) Str::uuid();

$this->buffer = new MessageBuffer(
new CloseFrameChecker,
onMessage: fn (Message $message) => $this->emit('message', [$message->getPayload()]),
onControl: fn () => $this->close(),
sender: [$stream, 'write']
);

$stream->on('data', [$this->buffer, 'onData']);
$stream->on('close', fn () => $this->emit('close'));
}

/**
* Send a message to the connection.
*/
public function send(string $message): void
{
$this->buffer->sendMessage($message);
}

/**
* Close the connection.
*/
public function close(): void
{
$this->stream->close();
}
}
}