Skip to content

Commit

Permalink
Merge pull request #36 from php-service-bus/connection_status
Browse files Browse the repository at this point in the history
Changing connection status after a caught exception
  • Loading branch information
mmasiukevich authored Jan 30, 2022
2 parents 01d4983 + bcfd2ae commit 5d8155b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 24 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
},
"require-dev": {
"phpunit/phpunit": "v9.5.*",
"vimeo/psalm": "v4.18.*",
"vimeo/psalm": "v4.19.*",
"phpstan/phpstan": "v1.4.*"
},
"prefer-stable": true,
Expand Down
59 changes: 36 additions & 23 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace PHPinnacle\Ridge;

use Amp\Loop;
use PHPinnacle\Ridge\Exception\ConnectionException;
use function Amp\asyncCall;
use function Amp\call;
use Amp\Deferred;
Expand Down Expand Up @@ -148,6 +149,8 @@ function () {
function(): void
{
if($this->connection->connected() === false) {
$this->state = self::STATE_NOT_CONNECTED;

throw Exception\ClientException::disconnected();
}
}
Expand All @@ -167,37 +170,41 @@ public function disconnect(int $code = 0, string $reason = ''): Promise

return call(
function () use ($code, $reason) {
if (\in_array($this->state, [self::STATE_NOT_CONNECTED, self::STATE_DISCONNECTING])) {
return;
}

if ($this->state !== self::STATE_CONNECTED) {
throw Exception\ClientException::notConnected();
}
try {
if (\in_array($this->state, [self::STATE_NOT_CONNECTED, self::STATE_DISCONNECTING])) {
return;
}

if($this->connectionMonitorWatcherId !== null){
Loop::cancel($this->connectionMonitorWatcherId);
if ($this->state !== self::STATE_CONNECTED) {
throw Exception\ClientException::notConnected();
}

$this->connectionMonitorWatcherId = null;
}
if($this->connectionMonitorWatcherId !== null){
Loop::cancel($this->connectionMonitorWatcherId);

$this->state = self::STATE_DISCONNECTING;
$this->connectionMonitorWatcherId = null;
}

if ($code === 0) {
$promises = [];
$this->state = self::STATE_DISCONNECTING;

foreach ($this->channels as $channel) {
$promises[] = $channel->close($code, $reason);
}
if ($code === 0) {
$promises = [];

yield $promises;
}
foreach ($this->channels as $channel) {
$promises[] = $channel->close($code, $reason);
}

yield $this->connectionClose($code, $reason);
yield $promises;
}

$this->connection->close();
yield $this->connectionClose($code, $reason);

$this->state = self::STATE_NOT_CONNECTED;
$this->connection->close();
}
finally
{
$this->state = self::STATE_NOT_CONNECTED;
}
}
);
}
Expand Down Expand Up @@ -250,7 +257,13 @@ function () {
});

return $channel;
} catch (\Throwable $error) {
}
catch(ConnectionException $exception) {
$this->state = self::STATE_NOT_CONNECTED;

throw $exception;
}
catch (\Throwable $error) {
throw Exception\ClientException::unexpectedResponse($error);
}
}
Expand Down

0 comments on commit 5d8155b

Please sign in to comment.