Skip to content

Commit

Permalink
prevent reconnecting for controlled disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
joedixon committed Dec 7, 2024
1 parent 84365ae commit 82d87b3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
25 changes: 17 additions & 8 deletions src/Servers/Reverb/Publishing/RedisClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ class RedisClient
*/
protected int $clientReconnectionTimer = 0;

/**
* Determine if the client should attempt to reconnect when disconnected from the server.
*/
protected bool $shouldReconnect = true;

/**
* Subscription events queued during while disconnected from Redis.
*/
Expand Down Expand Up @@ -63,7 +68,9 @@ function (Client $client) {
$this->client = $client;
$this->clientReconnectionTimer = 0;
$this->configureClientErrorHandler();
$this->onConnect && call_user_func($this->onConnect, $client);
if ($this->onConnect) {
call_user_func($this->onConnect, $client);
}

Log::info("Redis connection to [{$this->name}] successful");
},
Expand All @@ -80,6 +87,10 @@ function (Exception $e) {
*/
public function reconnect(): void
{
if (! $this->shouldReconnect) {
return;
}

$this->loop->addTimer(1, function () {
$this->clientReconnectionTimer++;
if ($this->clientReconnectionTimer >= $this->reconnectionTimeout()) {
Expand All @@ -97,6 +108,8 @@ public function reconnect(): void
*/
public function disconnect(): void
{
$this->shouldReconnect = false;

$this->client?->close();
}

Expand All @@ -106,7 +119,7 @@ public function disconnect(): void
public function subscribe(): void
{
if (! $this->isConnected($this->client)) {
$this->queueSubscriptionEvent();
$this->queueSubscriptionEvent('subscribe', []);

return;
}
Expand Down Expand Up @@ -165,9 +178,9 @@ protected function configureClientErrorHandler(): void
/**
* Queue the given subscription event.
*/
protected function queueSubscriptionEvent(): void
protected function queueSubscriptionEvent($event, $payload): void
{
$this->queuedSubscriptionEvents['subscribe'] = true;
$this->queuedSubscriptionEvents[$event] = $payload;
}

/**
Expand All @@ -183,7 +196,6 @@ protected function queuePublishEvent(array $payload): void
*/
protected function processQueuedSubscriptionEvents(): void
{
dump($this->queuedSubscriptionEvents);
foreach ($this->queuedSubscriptionEvents as $event => $args) {
match ($event) {
'subscribe' => $this->subscribe(),
Expand All @@ -200,7 +212,6 @@ protected function processQueuedSubscriptionEvents(): void
*/
protected function processQueuedPublishEvents(): void
{
dump($this->queuedPublishEvents);
foreach ($this->queuedPublishEvents as $event) {
$this->publish($event);
}
Expand Down Expand Up @@ -248,8 +259,6 @@ protected function redisUrl(): string

/**
* Determine the configured reconnection timeout.
*
* @return void
*/
protected function reconnectionTimeout(): int
{
Expand Down
3 changes: 2 additions & 1 deletion src/Servers/Reverb/Publishing/RedisPubSubProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public function connect(LoopInterface $loop): void
$this->channel,
'subscriber',
$this->server,
fn () => $this->subscribe());
fn () => $this->subscribe()
);
$this->subscribingClient->connect();

$this->publishingClient = new RedisClient(
Expand Down

0 comments on commit 82d87b3

Please sign in to comment.