diff --git a/src/Servers/Reverb/Publishing/RedisClient.php b/src/Servers/Reverb/Publishing/RedisClient.php index fa98f31..02b1031 100644 --- a/src/Servers/Reverb/Publishing/RedisClient.php +++ b/src/Servers/Reverb/Publishing/RedisClient.php @@ -27,6 +27,11 @@ class RedisClient */ protected int $clientReconnectionTimer = 0; + /** + * Determine if the client should attempt to reconnect when disconnected from the server. + */ + protected bool $shouldReconnect = true; + /** * Subscription events queued during while disconnected from Redis. */ @@ -63,7 +68,9 @@ function (Client $client) { $this->client = $client; $this->clientReconnectionTimer = 0; $this->configureClientErrorHandler(); - $this->onConnect && call_user_func($this->onConnect, $client); + if ($this->onConnect) { + call_user_func($this->onConnect, $client); + } Log::info("Redis connection to [{$this->name}] successful"); }, @@ -80,6 +87,10 @@ function (Exception $e) { */ public function reconnect(): void { + if (! $this->shouldReconnect) { + return; + } + $this->loop->addTimer(1, function () { $this->clientReconnectionTimer++; if ($this->clientReconnectionTimer >= $this->reconnectionTimeout()) { @@ -97,6 +108,8 @@ public function reconnect(): void */ public function disconnect(): void { + $this->shouldReconnect = false; + $this->client?->close(); } @@ -106,7 +119,7 @@ public function disconnect(): void public function subscribe(): void { if (! $this->isConnected($this->client)) { - $this->queueSubscriptionEvent(); + $this->queueSubscriptionEvent('subscribe', []); return; } @@ -165,9 +178,9 @@ protected function configureClientErrorHandler(): void /** * Queue the given subscription event. */ - protected function queueSubscriptionEvent(): void + protected function queueSubscriptionEvent($event, $payload): void { - $this->queuedSubscriptionEvents['subscribe'] = true; + $this->queuedSubscriptionEvents[$event] = $payload; } /** @@ -183,7 +196,6 @@ protected function queuePublishEvent(array $payload): void */ protected function processQueuedSubscriptionEvents(): void { - dump($this->queuedSubscriptionEvents); foreach ($this->queuedSubscriptionEvents as $event => $args) { match ($event) { 'subscribe' => $this->subscribe(), @@ -200,7 +212,6 @@ protected function processQueuedSubscriptionEvents(): void */ protected function processQueuedPublishEvents(): void { - dump($this->queuedPublishEvents); foreach ($this->queuedPublishEvents as $event) { $this->publish($event); } @@ -248,8 +259,6 @@ protected function redisUrl(): string /** * Determine the configured reconnection timeout. - * - * @return void */ protected function reconnectionTimeout(): int { diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index ae03e4b..e1ff94d 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -33,7 +33,8 @@ public function connect(LoopInterface $loop): void $this->channel, 'subscriber', $this->server, - fn () => $this->subscribe()); + fn () => $this->subscribe() + ); $this->subscribingClient->connect(); $this->publishingClient = new RedisClient(