Skip to content

Commit

Permalink
array store
Browse files Browse the repository at this point in the history
  • Loading branch information
joedixon committed Nov 15, 2023
1 parent 55807aa commit 9003a64
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 90 deletions.
4 changes: 1 addition & 3 deletions src/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ public function unsubscribe(Connection $connection): void
*/
public function broadcast(Application $app, array $payload, Connection $except = null): void
{
App::make(ChannelManager::class)
->for($app)
->connections($this)
collect(App::make(ChannelManager::class)->for($app)->connections($this))
->each(function ($connection) use ($payload, $except) {
if ($except && $except->identifier() === $connection->identifier()) {
return;
Expand Down
9 changes: 2 additions & 7 deletions src/Contracts/ChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,12 @@ public function all(): Collection;
*/
public function unsubscribeFromAll(Connection $connection): void;

/**
* Get all connection keys for the given channel.
*/
public function connectionKeys(Channel $channel): Collection;

/**
* Get all connections for the given channel.
*
* @return \Laravel\Reverb\Managers\Connections|\Laravel\Reverb\Connection[]|string[]
* @return <array string, \Laravel\Reverb\Connection>
*/
public function connections(Channel $channel): Collection;
public function connections(Channel $channel): array;

/**
* Flush the channel manager repository.
Expand Down
89 changes: 14 additions & 75 deletions src/Managers/ChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,25 @@
use Laravel\Reverb\Connection;
use Laravel\Reverb\Contracts\ApplicationProvider;
use Laravel\Reverb\Contracts\ChannelManager as ChannelManagerInterface;
use Laravel\Reverb\Contracts\ConnectionManager;

class ChannelManager implements ChannelManagerInterface
{
use EnsuresIntegrity, InteractsWithApplications;

/**
* Connection store.
*
* @var array<string, array<string, array<string, \Laravel\Reverb\Connection>>>
*/
protected $connections = [];

/**
* The appliation instance.
*
* @var \Laravel\Reverb\Application
*/
protected $application;

public function __construct(
protected Repository $repository,
protected ConnectionManager $connections,
protected $prefix = 'reverb'
) {
}

/**
* Get the application instance.
*/
Expand All @@ -46,25 +45,15 @@ public function app(): ?Application
*/
public function subscribe(Channel $channel, Connection $connection, $data = []): void
{
$this->mutex(function () use ($channel, $connection, $data) {
$connections = $this->connectionKeys($channel)
->put($connection->identifier(), $data);

$this->syncConnections($channel, $connections);
});
$this->connections[$this->application->id()][$channel->name()][$connection->identifier()] = $connection;
}

/**
* Unsubscribe from a channel.
*/
public function unsubscribe(Channel $channel, Connection $connection): void
{
$this->mutex(function () use ($channel, $connection) {
$connections = $this->connectionKeys($channel)
->reject(fn ($data, $identifier) => (string) $identifier === $connection->identifier());

$this->syncConnections($channel, $connections);
});
unset($this->connections[$this->application->id()][$channel->name()][$connection->identifier()]);
}

/**
Expand All @@ -87,51 +76,14 @@ public function unsubscribeFromAll(Connection $connection): void
});
}

/**
* Get all connection keys for the given channel.
*/
public function connectionKeys(Channel $channel): Collection
{
return $this->channel($channel);
}

/**
* Get all connections for the given channel.
*
* @return \Laravel\Reverb\Managers\Connections|\Laravel\Reverb\Connection[]|string[]
* @return <array string, \Laravel\Reverb\Connection>
*/
public function connections(Channel $channel): Collection
public function connections(Channel $channel): array
{
return collect($this->connections->for($this->application)->all())
->intersectByKeys(
$this->connectionKeys($channel)
);
}

/**
* Sync the connections for a channel.
*/
protected function syncConnections(Channel $channel, Collection $connections): void
{
$channels = $this->channels();

$channels[$channel->name()] = $connections;

$this->repository->forever($this->key(), $channels);
}

/**
* Get the key for the channels.
*/
protected function key(): string
{
$key = $this->prefix;

if ($this->application) {
$key .= ":{$this->application->id()}";
}

return $key.':channels';
return $this->connections[$this->application->id()][$channel->name()] ?? [];
}

/**
Expand All @@ -147,7 +99,7 @@ protected function channel(Channel $channel): Collection
*/
protected function channels(Channel $channel = null): Collection
{
$channels = $this->repository->get($this->key(), []);
$channels = $this->connections[$this->application->id()];

if ($channel) {
return collect($channels[$channel->name()] ?? []);
Expand All @@ -156,18 +108,6 @@ protected function channels(Channel $channel = null): Collection
return collect($channels ?: []);
}

/**
* Get the data stored for a connection.
*/
public function data(Channel $channel, Connection $connection): array
{
if (! $data = $this->connectionKeys($channel)->get($connection->identifier())) {
return [];
}

return (array) $data;
}

/**
* Flush the channel manager repository.
*/
Expand All @@ -176,8 +116,7 @@ public function flush(): void
App::make(ApplicationProvider::class)
->all()
->each(function (Application $application) {
$this->for($application);
$this->repository->forget($this->key());
$this->connections[$application->id()] = [];
});
}
}
6 changes: 1 addition & 5 deletions src/Servers/Reverb/ReverbProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ public function buildConnectionManager(): ConnectionManagerInterface
*/
public function buildChannelManager(): ChannelManagerInterface
{
return new ChannelManager(
$this->app['cache']->store('array'),
$this->app->make(ConnectionManagerInterface::class),
$this->config['connection_manager']['prefix'] ?? 'reverb'
);
return new ChannelManager;
}
}

0 comments on commit 9003a64

Please sign in to comment.