Skip to content

Commit

Permalink
Support cancellation of pending connection attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Feb 24, 2019
1 parent 2a37657 commit 228c16c
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 6 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ $factory = new Factory($loop, $connector);

#### createClient()

The `createClient($redisUri)` method can be used to create a new [`Client`](#client).
The `createClient($redisUri): PromiseInterface<Client,Exception>` method can be used to
create a new [`Client`](#client).

It helps with establishing a plain TCP/IP or secure TLS connection to Redis
and optionally authenticating (AUTH) and selecting the right database (SELECT).

Expand All @@ -115,6 +117,24 @@ $factory->createClient('redis://localhost:6379')->then(
);
```

The method returns a [Promise](https://github.com/reactphp/promise) that
will resolve with a [`Client`](#client)
instance on success or will reject with an `Exception` if the URL is
invalid or the connection or authentication fails.

The returned Promise is implemented in such a way that it can be
cancelled when it is still pending. Cancelling a pending promise will
reject its value with an Exception and will cancel the underlying TCP/IP
connection attempt and/or Redis authentication.

```php
$promise = $factory->createConnection($redisUri);

$loop->addTimer(3.0, function () use ($promise) {
$promise->cancel();
});
```

The `$redisUri` can be given in the
[standard](https://www.iana.org/assignments/uri-schemes/prov/redis) form
`[redis[s]://][:auth@]host[:port][/db]`.
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"evenement/evenement": "^3.0 || ^2.0 || ^1.0",
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3",
"react/promise": "^2.0 || ^1.1",
"react/socket": "^1.0 || ^0.8.3"
"react/socket": "^1.1"
},
"autoload": {
"psr-4": { "Clue\\React\\Redis\\": "src/" }
Expand Down
21 changes: 17 additions & 4 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

namespace Clue\React\Redis;

use Clue\React\Redis\StreamingClient;
use Clue\Redis\Protocol\Factory as ProtocolFactory;
use React\EventLoop\LoopInterface;
use React\Promise;
use React\Promise\Deferred;
use React\Socket\ConnectionInterface;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;
Expand Down Expand Up @@ -50,9 +50,20 @@ public function createClient($target)
return Promise\reject($e);
}

$protocol = $this->protocol;
$connecting = $this->connector->connect($parts['authority']);
$deferred = new Deferred(function ($_, $reject) use ($connecting) {
// connection cancelled, start with rejecting attempt, then clean up
$reject(new \RuntimeException('Connection to database server cancelled'));

// either close successful connection or cancel pending connection attempt
$connecting->then(function (ConnectionInterface $connection) {
$connection->close();
});
$connecting->cancel();
});

$promise = $this->connector->connect($parts['authority'])->then(function (ConnectionInterface $stream) use ($protocol) {
$protocol = $this->protocol;
$promise = $connecting->then(function (ConnectionInterface $stream) use ($protocol) {
return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer());
});

Expand Down Expand Up @@ -84,7 +95,9 @@ function ($error) use ($client) {
});
}

return $promise;
$promise->then(array($deferred, 'resolve'), array($deferred, 'reject'));

return $deferred->promise();
}

/**
Expand Down
33 changes: 33 additions & 0 deletions tests/FactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Clue\React\Redis\Factory;
use React\Promise;
use React\Promise\Deferred;

class FactoryTest extends TestCase
{
Expand Down Expand Up @@ -153,4 +154,36 @@ public function testWillRejectIfTargetIsInvalid()

$this->expectPromiseReject($promise);
}

public function testCancelWillRejectPromise()
{
$promise = new \React\Promise\Promise(function () { });
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn($promise);

$promise = $this->factory->createClient('redis://127.0.0.1:2');
$promise->cancel();

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
}

public function testCancelWillCancelConnectorWhenConnectionIsPending()
{
$deferred = new Deferred($this->expectCallableOnce());
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn($deferred->promise());

$promise = $this->factory->createClient('redis://127.0.0.1:2');
$promise->cancel();
}

public function testCancelWillCloseConnectionWhenConnectionWaitsForSelect()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->once())->method('write');
$stream->expects($this->once())->method('close');

$this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream));

$promise = $this->factory->createClient('redis://127.0.0.1:2/123');
$promise->cancel();
}
}

0 comments on commit 228c16c

Please sign in to comment.