Skip to content

Commit

Permalink
Cleans up channels (#30)
Browse files Browse the repository at this point in the history
* clean up channels

* Fix code styling

* revert
  • Loading branch information
joedixon authored Dec 6, 2023
1 parent 8a06931 commit 0988ce0
Show file tree
Hide file tree
Showing 26 changed files with 139 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/Channels/CacheChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CacheChannel extends Channel
/**
* Send a message to all connections subscribed to the channel.
*/
public function broadcast(array $payload, Connection $except = null): void
public function broadcast(array $payload, ?Connection $except = null): void
{
$this->payload = $payload;

Expand All @@ -24,7 +24,7 @@ public function broadcast(array $payload, Connection $except = null): void
/**
* Broadcast a message triggered from an internal source.
*/
public function broadcastInternally(array $payload, Connection $except = null): void
public function broadcastInternally(array $payload, ?Connection $except = null): void
{
parent::broadcast($payload, $except);
}
Expand Down
11 changes: 8 additions & 3 deletions src/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Laravel\Reverb\Concerns\SerializesChannels;
use Laravel\Reverb\Contracts\ChannelConnectionManager;
use Laravel\Reverb\Contracts\ChannelManager;
use Laravel\Reverb\Contracts\Connection;

class Channel
Expand Down Expand Up @@ -57,7 +58,7 @@ public function findById(string $id): ?Connection
/**
* Subscribe to the given channel.
*/
public function subscribe(Connection $connection, string $auth = null, string $data = null): void
public function subscribe(Connection $connection, ?string $auth = null, ?string $data = null): void
{
$this->connections->add($connection, $data ? json_decode($data, true) : []);
}
Expand All @@ -68,6 +69,10 @@ public function subscribe(Connection $connection, string $auth = null, string $d
public function unsubscribe(Connection $connection): void
{
$this->connections->remove($connection);

if ($this->connections->isEmpty()) {
app(ChannelManager::class)->for($connection->app())->remove($this);
}
}

/**
Expand All @@ -81,7 +86,7 @@ public function subscribed(Connection $connection): bool
/**
* Send a message to all connections subscribed to the channel.
*/
public function broadcast(array $payload, Connection $except = null): void
public function broadcast(array $payload, ?Connection $except = null): void
{
if ($except === null) {
$this->broadcastToAll($payload);
Expand Down Expand Up @@ -115,7 +120,7 @@ public function broadcastToAll(array $payload): void
/**
* Broadcast a message triggered from an internal source.
*/
public function broadcastInternally(array $payload, Connection $except = null): void
public function broadcastInternally(array $payload, ?Connection $except = null): void
{
$this->broadcast($payload, $except);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Channels/Concerns/InteractsWithPresenceChannels.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait InteractsWithPresenceChannels
/**
* Subscribe to the given channel.
*/
public function subscribe(Connection $connection, string $auth = null, string $data = null): void
public function subscribe(Connection $connection, ?string $auth = null, ?string $data = null): void
{
$this->verify($connection, $auth, $data);

Expand Down
4 changes: 2 additions & 2 deletions src/Channels/Concerns/InteractsWithPrivateChannels.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait InteractsWithPrivateChannels
/**
* Subscribe to the given channel.
*/
public function subscribe(Connection $connection, string $auth = null, string $data = null): void
public function subscribe(Connection $connection, ?string $auth = null, ?string $data = null): void
{
$this->verify($connection, $auth, $data);

Expand All @@ -21,7 +21,7 @@ public function subscribe(Connection $connection, string $auth = null, string $d
/**
* Deteremine whether the given auth token is valid.
*/
protected function verify(Connection $connection, string $auth, string $data = null): bool
protected function verify(Connection $connection, string $auth, ?string $data = null): bool
{
$signature = "{$connection->id()}:{$this->name()}";

Expand Down
5 changes: 5 additions & 0 deletions src/Contracts/ChannelConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public function findById(string $id): ?ChannelConnection;
*/
public function all(): array;

/**
* Determine whether any connections remain on the channel.
*/
public function isEmpty(): bool;

/**
* Flush the channel connection manager.
*/
Expand Down
7 changes: 6 additions & 1 deletion src/Contracts/ChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@ public function find(string $channel): Channel;
/**
* Get all the connections for the given channels.
*/
public function connections(string $channel = null): array;
public function connections(?string $channel = null): array;

/**
* Unsubscribe from all channels.
*/
public function unsubscribeFromAll(Connection $connection): void;

/**
* Remove the given channel.
*/
public function remove(Channel $channel): void;

/**
* Flush the channel manager repository.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/Contracts/Logger.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ interface Logger
/**
* Log an infomational message.
*/
public function info(string $title, string $message = null): void;
public function info(string $title, ?string $message = null): void;

/**
* Log an error message.
Expand Down
4 changes: 2 additions & 2 deletions src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Event
/**
* Dispatch a message to a channel.
*/
public static function dispatch(Application $app, array $payload, Connection $connection = null): void
public static function dispatch(Application $app, array $payload, ?Connection $connection = null): void
{
$server = app(ServerManager::class);

Expand All @@ -30,7 +30,7 @@ public static function dispatch(Application $app, array $payload, Connection $co
/**
* Notify all connections subscribed to the given channel.
*/
public static function dispatchSynchronously(Application $app, array $payload, Connection $connection = null): void
public static function dispatchSynchronously(Application $app, array $payload, ?Connection $connection = null): void
{
$channels = Arr::wrap($payload['channels'] ?? $payload['channel'] ?? []);

Expand Down
2 changes: 1 addition & 1 deletion src/Loggers/CliLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __construct(protected OutputStyle $output)
/**
* Log an infomational message.
*/
public function info(string $title, string $message = null): void
public function info(string $title, ?string $message = null): void
{
$this->components->twoColumnDetail($title, $message);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Loggers/NullLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class NullLogger implements Logger
/**
* Log an infomational message.
*/
public function info(string $title, string $message = null): void
public function info(string $title, ?string $message = null): void
{
//
}
Expand Down
2 changes: 1 addition & 1 deletion src/Loggers/StandardLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class StandardLogger implements Logger
/**
* Log an infomational message
*/
public function info(string $title, string $message = null): void
public function info(string $title, ?string $message = null): void
{
$output = $title;

Expand Down
8 changes: 8 additions & 0 deletions src/Managers/ArrayChannelConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public function all(): array
return $this->connections;
}

/**
* Determine whether any connections remain on the channel.
*/
public function isEmpty(): bool
{
return empty($this->connections);
}

/**
* Flush the channel connection manager.
*/
Expand Down
12 changes: 10 additions & 2 deletions src/Managers/ArrayChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function find(string $channel): Channel
*
* @return array<string, \Laravel\Reverb\Servers\Reverb\ChannelConnection>
*/
public function connections(string $channel = null): array
public function connections(?string $channel = null): array
{
$channels = Arr::wrap($this->channels($channel));

Expand All @@ -77,6 +77,14 @@ public function unsubscribeFromAll(Connection $connection): void
}
}

/**
* Remove the given channel.
*/
public function remove(Channel $channel): void
{
unset($this->applications[$this->application->id()][$channel->name()]);
}

/**
* Get the given channel.
*/
Expand All @@ -90,7 +98,7 @@ public function channel(string $channel): Channel
*
* @return \Laravel\Reverb\Channels\Channel|array<string, \Laravel\Reverb\Channels\Channel>
*/
public function channels(string $channel = null): Channel|array
public function channels(?string $channel = null): Channel|array
{
if (! isset($this->applications[$this->application->id()])) {
$this->applications[$this->application->id()] = [];
Expand Down
8 changes: 8 additions & 0 deletions src/Managers/CacheChannelConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ public function findById(string $id): ?ChannelConnection
);
}

/**
* Determine whether any connections remain on the channel.
*/
public function isEmpty(): bool
{
return empty($this->repository->get($this->key(), []));
}

/**
* Get all the connections.
*/
Expand Down
16 changes: 14 additions & 2 deletions src/Managers/CacheChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function find(string $channel): Channel
*
* @return array<string, \Laravel\Reverb\Servers\Reverb\ChannelConnection>
*/
public function connections(string $channel = null): array
public function connections(?string $channel = null): array
{
$channels = Arr::wrap($this->channels($channel));

Expand All @@ -77,6 +77,18 @@ public function unsubscribeFromAll(Connection $connection): void
}
}

/**
* Remove the given channel.
*/
public function remove(Channel $channel): void
{
$channels = $this->channels();

unset($channels[$channel->name()]);

$this->repository->forever($this->prefix, $channels);
}

/**
* Get the given channel.
*/
Expand All @@ -90,7 +102,7 @@ public function channel(string $channel): Channel
*
* @return \Laravel\Reverb\Channels\Channel|array<string, \Laravel\Reverb\Channels\Channel>
*/
public function channels(string $channel = null): Channel|array
public function channels(?string $channel = null): Channel|array
{
$channels = $this->repository->get($this->prefix, []);

Expand Down
8 changes: 4 additions & 4 deletions src/Pusher/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function acknowledge(Connection $connection): void
/**
* Subscribe to the given channel.
*/
public function subscribe(Connection $connection, string $channel, string $auth = null, string $data = null): void
public function subscribe(Connection $connection, string $channel, ?string $auth = null, ?string $data = null): void
{
$channel = $this->channels
->for($connection->app())
Expand Down Expand Up @@ -122,7 +122,7 @@ public function ping(Connection $connection): void
/**
* Send a response to the given connection.
*/
public function send(Connection $connection, string $event, array $data = [], string $channel = null): void
public function send(Connection $connection, string $event, array $data = [], ?string $channel = null): void
{
$connection->send(
static::formatPayload($event, $data, $channel)
Expand All @@ -132,7 +132,7 @@ public function send(Connection $connection, string $event, array $data = [], st
/**
* Send an internal response to the given connection.
*/
public function sendInternally(Connection $connection, string $event, array $data = [], string $channel = null): void
public function sendInternally(Connection $connection, string $event, array $data = [], ?string $channel = null): void
{
$connection->send(
static::formatInternalPayload($event, $data, $channel)
Expand All @@ -142,7 +142,7 @@ public function sendInternally(Connection $connection, string $event, array $dat
/**
* Format the payload for the given event.
*/
public function formatPayload(string $event, array $data = [], string $channel = null, string $prefix = 'pusher:'): string|false
public function formatPayload(string $event, array $data = [], ?string $channel = null, string $prefix = 'pusher:'): string|false
{
return json_encode(
array_filter([
Expand Down
2 changes: 1 addition & 1 deletion src/Servers/Reverb/ChannelConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function connection(): Connection
/**
* Get the connection data.
*/
public function data(string $key = null): mixed
public function data(?string $key = null): mixed
{
if ($key) {
return Arr::get($this->data, $key);
Expand Down
2 changes: 1 addition & 1 deletion src/Servers/Reverb/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Connection extends ConnectionContract
*/
protected $hasBeenPinged = false;

public function __construct(protected WsConnection $connection, Application $application, string $origin = null)
public function __construct(protected WsConnection $connection, Application $application, ?string $origin = null)
{
parent::__construct($application, $origin);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Servers/Reverb/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Factory
/**
* Create a new WebSocket server instance.
*/
public static function make(string $host = '0.0.0.0', string $port = '8080', LoopInterface $loop = null)
public static function make(string $host = '0.0.0.0', string $port = '8080', ?LoopInterface $loop = null)
{
$loop = $loop ?: Loop::get();
$socket = new SocketServer("{$host}:{$port}", [], $loop);
Expand Down
2 changes: 1 addition & 1 deletion src/WebSockets/WsConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public function withMaxMessageSize(int $size): void
/**
* Close the connection.
*/
public function close(FrameInterface $frame = null): void
public function close(?FrameInterface $frame = null): void
{
if ($frame) {
$this->send($frame);
Expand Down
4 changes: 2 additions & 2 deletions tests/ApiGatewayTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public function send(array $message, ?string $connectionId = 'abc-123', $appKey
*
* @param string $appKey
*/
public function subscribe(string $channel, ?array $data = [], string $auth = null, ?string $connectionId = 'abc-123', $appKey = 'pusher-key'): void
public function subscribe(string $channel, ?array $data = [], ?string $auth = null, ?string $connectionId = 'abc-123', $appKey = 'pusher-key'): void
{
$data = ! empty($data) ? json_encode($data) : null;

Expand Down Expand Up @@ -161,7 +161,7 @@ public function disconnect($connectionId = 'abc-123'): void
*
* @return void
*/
public function assertSent(string $connectionId = null, mixed $message = null, int $times = null)
public function assertSent(?string $connectionId = null, mixed $message = null, ?int $times = null)
{
Bus::assertDispatched(SendToConnection::class, function ($job) use ($connectionId, $message) {
return ($connectionId ? $job->connectionId === $connectionId : true)
Expand Down
2 changes: 1 addition & 1 deletion tests/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Connection extends BaseConnection

public $id;

public function __construct(string $identifier = null)
public function __construct(?string $identifier = null)
{
if ($identifier) {
$this->identifier = $identifier;
Expand Down
11 changes: 11 additions & 0 deletions tests/Feature/Reverb/ServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,17 @@
expect($message)->toBe('Maximum message size exceeded');
});

it('removes a channel when no subscribers remain', function () {
$connection = $this->connect();
$this->subscribe('test-channel', connection: $connection);

expect(channelManager()->all())->toHaveCount(1);

$this->unsubscribe('test-channel', $connection);

expect(channelManager()->all())->toHaveCount(0);
});

it('clears application state between requests', function () {
$this->subscribe('test-channel');

Expand Down
Loading

0 comments on commit 0988ce0

Please sign in to comment.