Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
joedixon committed May 8, 2024
1 parent d0401b2 commit 6d86fd2
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
4 changes: 4 additions & 0 deletions src/Protocols/Pusher/MetricsHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,15 @@ protected function gatherMetricsFromSubscribers(Application $application, string
*/
protected function requestMetricsFromSubscribers(Application $application, string $key, string $type, ?array $options): void
{
dump('Requesting metrics from subscribers using '.$key);

$this->pubSubProvider->publish([
'type' => 'metrics',
'key' => $key,
'application' => serialize($application),
'payload' => ['type' => $type, 'options' => $options],
])->then(function ($total) {
dump('Total subscribers: '.$total);
$this->subscribers = $total;
});
}
Expand Down Expand Up @@ -211,6 +214,7 @@ protected function listenForMetrics(string $key): Deferred
$deferred = new Deferred;

$this->pubSubProvider->on('metrics-retrieved', function ($payload) use ($key, $deferred) {
dump('Metrics received for '.$key);
if ($payload['key'] !== $key) {
return;
}
Expand Down
12 changes: 6 additions & 6 deletions src/Servers/Reverb/Publishing/RedisPubSubProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ public function subscribe(): void
{
$this->ensureConnected();

$this->subscribingClient->subscribe($this->channel)
->then(function () {
$this->subscribingClient->on('message', function (string $channel, string $payload) {
$this->messageHandler->handle($payload);
});
});
$this->subscribingClient->subscribe($this->channel);

$this->subscribingClient->on('message', function (string $channel, string $payload) {
$this->messageHandler->handle($payload);
});
}

/**
Expand All @@ -65,6 +64,7 @@ public function subscribe(): void
public function on(string $event, callable $callback): void
{
$this->subscribingClient->on('message', function (string $channel, string $payload) use ($event, $callback) {
dump($payload);
$payload = json_decode($payload, associative: true, flags: JSON_THROW_ON_ERROR);

if (($payload['type'] ?? null) === $event) {
Expand Down
2 changes: 1 addition & 1 deletion tests/ReverbTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected function resetFiber(): void
*/
public function stopServer(): void
{
rescue(fn () => app(PubSubProvider::class)->disconnect());
app(PubSubProvider::class)->disconnect();


if ($this->server) {
Expand Down

0 comments on commit 6d86fd2

Please sign in to comment.