Skip to content

Commit

Permalink
Optimizations (#21)
Browse files Browse the repository at this point in the history
* resolve higher

* Fix code styling

* remove log

* wip

* wip

* resolve higher

* wip

* wip

* wip

* use time

* use arrays

* tidy

* typo

* Fix code styling

* wip

* wip

* Fix code styling

* wip

* Fix code styling

* wip

* revert

* wip

* Fix code styling

* wip

* Fix code styling

* wip

* wip

* bypass buffer

* wip

* wip

* wip

* Fix code styling

* wip

* handle pings

* Fix code styling

* wip

* update tests

* Fix code styling

* tidy controllers

* Fix code styling

* formatting

* formatting

* wip

* Fix code styling
  • Loading branch information
joedixon authored Nov 26, 2023
1 parent fc6ff02 commit 5e55523
Show file tree
Hide file tree
Showing 41 changed files with 354 additions and 259 deletions.
1 change: 0 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"illuminate/http": "^10.0",
"illuminate/redis": "^10.0",
"illuminate/support": "^10.0",
"nesbot/carbon": "^2.64",
"ratchet/rfc6455": "^0.3.1",
"react/socket": "^1.14",
"symfony/http-foundation": "^6.3"
Expand Down
30 changes: 15 additions & 15 deletions src/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

use Exception;
use Illuminate\Support\Arr;
use Laravel\Reverb\Application;
use Laravel\Reverb\Contracts\ChannelConnectionManager;
use Laravel\Reverb\Contracts\Connection;
use Laravel\Reverb\Output;

class Channel
{
Expand Down Expand Up @@ -82,29 +80,31 @@ public function subscribed(Connection $connection): bool
/**
* Send a message to all connections subscribed to the channel.
*/
public function broadcast(Application $app, array $payload, Connection $except = null): void
public function broadcast(array $payload, Connection $except = null): void
{
collect($this->connections())
->each(function ($connection) use ($payload, $except) {
$message = json_encode(
Arr::except($payload, 'except')
);

$chunks = array_chunk($this->connections(), 100);

foreach ($chunks as $connections) {
foreach ($connections as $connection) {
if ($except && $except->id() === $connection->connection()->id()) {
return;
continue;
}

if (isset($payload['except']) && $payload['except'] === $connection->connection()->id()) {
return;
continue;
}

try {
$connection->send(
json_encode(
Arr::except($payload, 'except')
)
);
$connection->send($message);
} catch (Exception $e) {
Output::error('Broadcasting to '.$connection->id().' resulted in an error');
Output::info($e->getMessage());
//
}
});
}
}
}

/**
Expand Down
2 changes: 0 additions & 2 deletions src/Channels/PresenceChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public function subscribe(Connection $connection, string $auth = null, string $d
parent::subscribe($connection, $auth, $data);

$this->broadcast(
$connection->app(),
[
'event' => 'pusher_internal:member_added',
'data' => $data ? json_decode($data, true) : [],
Expand All @@ -37,7 +36,6 @@ public function unsubscribe(Connection $connection): void

if ($userId = $subscription->data('user_id')) {
$this->broadcast(
$connection->app(),
[
'event' => 'pusher_internal:member_removed',
'data' => ['user_id' => $userId],
Expand Down
1 change: 1 addition & 0 deletions src/Concerns/ClosesConnections.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ protected function close(Connection $connection, int $statusCode = 400, string $
$response = new Response($statusCode, $headers, $message);

$connection->send(Message::toString($response));

$connection->close();
}
}
5 changes: 2 additions & 3 deletions src/Contracts/ChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Laravel\Reverb\Contracts;

use Illuminate\Support\Collection;
use Laravel\Reverb\Application;
use Laravel\Reverb\Channels\Channel;

Expand All @@ -21,7 +20,7 @@ public function for(Application $application): ChannelManager;
/**
* Get all the channels.
*/
public function all(): Collection;
public function all(): array;

/**
* Find the given channel.
Expand All @@ -31,7 +30,7 @@ public function find(string $channel): Channel;
/**
* Get all the connections for the given channels.
*/
public function connections(string $channel = null): Collection;
public function connections(string $channel = null): array;

/**
* Unsubscribe from all channels.
Expand Down
50 changes: 22 additions & 28 deletions src/Contracts/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,27 @@

namespace Laravel\Reverb\Contracts;

use Carbon\Carbon;
use Illuminate\Support\Facades\App;
use Laravel\Reverb\Application;
use Laravel\Reverb\Output;
use Laravel\Reverb\Pusher\Event as PusherEvent;

abstract class Connection
{
/**
* The last time the connection was seen.
*/
protected ?string $lastSeenAt = null;
protected ?int $lastSeenAt;

/**
* Stores the ping state of the connection.
*
* @var \Carbon\Carbon
*/
protected $hasBeenPinged = false;

protected $pusher;

public function __construct(
protected Application $application,
protected ?string $origin
) {
$this->lastSeenAt = time();
}

/**
Expand Down Expand Up @@ -70,53 +67,50 @@ public function origin(): ?string
public function ping(): void
{
$this->hasBeenPinged = true;
}

PusherEvent::ping($this);

Output::info('Connection Pinged', $this->id());
/**
* Get the last time the connection was seen.
*/
public function lastSeenAt(): ?int
{
return $this->lastSeenAt;
}

/**
* Touch the connection last seen at timestamp.
* Set the connection last seen at timestamp.
*/
public function touch(): Connection
public function setLastSeenAt(int $time): Connection
{
$this->lastSeenAt = (string) now();
$this->lastSeenAt = $time;

return $this;
}

/**
* Disconnect and unsubscribe from all channels.
* Touch the connection last seen at timestamp.
*/
public function disconnect(): void
public function touch(): Connection
{
App::make(ChannelManager::class)
->for($this->app())
->unsubscribeFromAll($this);
$this->setLastSeenAt(time());

$this->terminate();
return $this;
}

/**
* Get the last time the connection was seen.
* Disconnect and unsubscribe from all channels.
*/
public function lastSeenAt(): ?Carbon
public function disconnect(): void
{
return $this->lastSeenAt ? Carbon::parse($this->lastSeenAt) : null;
$this->terminate();
}

/**
* Determine whether the connection is still active.
*/
public function isActive(): bool
{
return $this->lastSeenAt() &&
$this->lastSeenAt()->isAfter(
now()->subMinutes(
$this->app()->pingInterval()
)
);
return time() < $this->lastSeenAt + $this->app()->pingInterval();
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static function dispatchSynchronously(Application $app, array $payload, C
$channel = app(ChannelManager::class)->for($app)->find($channel);
$payload['channel'] = $channel->name();

$channel->broadcast($app, $payload, $connection);
$channel->broadcast($payload, $connection);
}
}
}
3 changes: 3 additions & 0 deletions src/Http/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class Request
*/
const MAX_SIZE = 4096;

/**
* Turn the raw message into a Psr7 request.
*/
public static function from(string $message, Connection $connection): ?RequestInterface
{
$connection->appendToBuffer($message);
Expand Down
81 changes: 71 additions & 10 deletions src/Http/Router.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

namespace Laravel\Reverb\Http;

use Closure;
use GuzzleHttp\Psr7\Message;
use Illuminate\Support\Arr;
use Laravel\Reverb\Concerns\ClosesConnections;
use Laravel\Reverb\WebSockets\WsConnection;
use Psr\Http\Message\RequestInterface;
use Ratchet\RFC6455\Handshake\RequestVerifier;
use Ratchet\RFC6455\Handshake\ServerNegotiator;
use ReflectionFunction;
use ReflectionMethod;
use Symfony\Component\Routing\Exception\MethodNotAllowedException;
use Symfony\Component\Routing\Exception\ResourceNotFoundException;
use Symfony\Component\Routing\Matcher\UrlMatcherInterface;
Expand All @@ -17,8 +20,14 @@ class Router
{
use ClosesConnections;

/**
* The server negotiator.
*/
protected ServerNegotiator $negotiator;

public function __construct(protected UrlMatcherInterface $matcher)
{
$this->negotiator = new ServerNegotiator(new RequestVerifier);
}

/**
Expand All @@ -31,10 +40,6 @@ public function dispatch(RequestInterface $request, Connection $connection): mix
$context->setMethod($request->getMethod());
$context->setHost($uri->getHost());

if ($this->isWebSocketRequest($request)) {
$connection = $this->attemptUpgrade($request, $connection);
}

try {
$route = $this->matcher->match($uri->getPath());
} catch (MethodNotAllowedException $e) {
Expand All @@ -43,13 +48,31 @@ public function dispatch(RequestInterface $request, Connection $connection): mix
return $this->close($connection, 404, 'Not found.');
}

$response = $route['_controller']($request, $connection, ...Arr::except($route, ['_controller', '_route']));
$controller = $this->controller($route);

if ($this->isWebSocketRequest($request)) {
$wsConnection = $this->attemptUpgrade($request, $connection);

if (! $this->isWebSocketRequest($request)) {
return $connection->send($response)->close();
return $controller($request, $wsConnection, ...Arr::except($route, ['_controller', '_route']));
}

return null;
$routeParameters = Arr::except($route, ['_controller', '_route']) + ['request' => $request, 'connection' => $connection];

$response = $controller(
...$this->arguments($controller, $routeParameters)
);

return $connection->send($response)->close();
}

/**
* Get the controller callable for the route.
*
* @param array<string, mixed> $route
*/
protected function controller(array $route): callable
{
return $route['_controller'];
}

/**
Expand All @@ -65,11 +88,49 @@ protected function isWebSocketRequest(RequestInterface $request): bool
*/
protected function attemptUpgrade(RequestInterface $request, Connection $connection): WsConnection
{
$negotiator = new ServerNegotiator(new RequestVerifier);
$response = $negotiator->handshake($request);
$response = $this->negotiator->handshake($request);

$connection->write(Message::toString($response));

return new WsConnection($connection);
}

/**
* Find the arguments for the controller.
*
* @return array<int, mixed>
*/
public function arguments(callable $controller, array $routeParameters): array
{
$parameters = $this->parameters($controller);

return array_map(function ($parameter) use ($routeParameters) {
return $routeParameters[$parameter['name']] ?? null;
}, $parameters);
}

/**
* Find the parameters for the controller.
*
* @return array<int, array{ name: string, type: string, position: int }>
*/
public function parameters(mixed $controller): array
{
$method = match (true) {
$controller instanceof Closure => new ReflectionFunction($controller),
is_string($controller) => count($parts = explode('::', $controller)) > 1 ? new ReflectionMethod(...$parts) : new ReflectionFunction($controller),
! is_array($controller) => new ReflectionMethod($controller, '__invoke'),
is_array($controller) => new ReflectionMethod($controller[0], $controller[1]),
};

$parameters = $method->getParameters();

return array_map(function ($parameter) {
return [
'name' => $parameter->getName(),
'type' => $parameter->getType()->getName(),
'position' => $parameter->getPosition(),
];
}, $parameters);
}
}
Loading

0 comments on commit 5e55523

Please sign in to comment.