Skip to content

Commit

Permalink
Implement the Happy Eye Balls RFC's
Browse files Browse the repository at this point in the history
By using the happy eye balls algorithm as described in RFC6555 and
RFC8305 it will connect to the quickest responding server with a
preference for IPv6.
  • Loading branch information
WyriHaximus committed May 18, 2019
1 parent 2cf8dfa commit 3117a07
Show file tree
Hide file tree
Showing 4 changed files with 698 additions and 0 deletions.
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ handle multiple concurrent connections without blocking.
* [Connector](#connector)
* [Advanced client usage](#advanced-client-usage)
* [TcpConnector](#tcpconnector)
* [HappyEyeBallsConnector](#happyeyeballsconnector)
* [DnsConnector](#dnsconnector)
* [SecureConnector](#secureconnector)
* [TimeoutConnector](#timeoutconnector)
Expand Down Expand Up @@ -1154,6 +1155,60 @@ be used to set up the TLS peer name.
This is used by the `SecureConnector` and `DnsConnector` to verify the peer
name and can also be used if you want a custom TLS peer name.

#### HappyEyeBallsConnector

The `HappyEyeBallsConnector` class implements the
[`ConnectorInterface`](#connectorinterface) and allows you to create plaintext
TCP/IP connections to any hostname-port-combination. Internally it implements the
happy eyeballs algorythm from [`RFC6555`](https://tools.ietf.org/html/rfc6555) and
[`RFC8305`](https://tools.ietf.org/html/rfc8305) to support IPv6 and IPv4 hostnames.

It does so by decorating a given `TcpConnector` instance so that it first
looks up the given domain name via DNS (if applicable) and then establishes the
underlying TCP/IP connection to the resolved target IP address.

Make sure to set up your DNS resolver and underlying TCP connector like this:

```php
$dnsResolverFactory = new React\Dns\Resolver\Factory();
$dns = $dnsResolverFactory->createCached('8.8.8.8', $loop);

$dnsConnector = new React\Socket\HappyEyeBallsConnector($loop, $tcpConnector, $dns);

$dnsConnector->connect('www.google.com:80')->then(function (React\Socket\ConnectionInterface $connection) {
$connection->write('...');
$connection->end();
});

$loop->run();
```

See also the [examples](examples).

Pending connection attempts can be cancelled by cancelling its pending promise like so:

```php
$promise = $dnsConnector->connect('www.google.com:80');

$promise->cancel();
```

Calling `cancel()` on a pending promise will cancel the underlying DNS lookups
and/or the underlying TCP/IP connection(s) and reject the resulting promise.


> Advanced usage: Internally, the `HappyEyeBallsConnector` relies on a `Resolver` to
look up the IP addresses for the given hostname.
It will then replace the hostname in the destination URI with this IP's and
append a `hostname` query parameter and pass this updated URI to the underlying
connector.
The Happy Eye Balls algorythm describes looking the IPv6 and IPv4 address for
the given hostname so this connector sends out two DNS lookups for the A and
AAAA records. It then uses all IP addresses (both v6 and v4) and tries to
connect to all of them with a 50ms interval in between. Alterating between IPv6
and IPv4 addresses. When a connection is established all the other DNS lookups
and connection attempts are cancelled.

#### DnsConnector

The `DnsConnector` class implements the
Expand Down
306 changes: 306 additions & 0 deletions src/HappyEyeBallsConnectionBuilder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
<?php

namespace React\Socket;

use React\Dns\Model\Message;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise;
use React\Promise\CancellablePromiseInterface;
use InvalidArgumentException;
use RuntimeException;

/**
* @internal
*/
final class HappyEyeBallsConnectionBuilder
{
public $loop;
public $connector;
public $resolver;
public $uri;
public $host;
public $resolved = array();
public $resolverPromises = array();
public $connectionPromises = array();
public $connectQueue = array();
public $timer;
public $parts;
public $ipsCount = 0;
public $failureCount = 0;
public $resolve;
public $reject;

public static function create(LoopInterface $loop, ConnectorInterface $connector, Resolver $resolver, $uri)
{
$connectionBuilder = new self($loop, $connector, $resolver, $uri);

return $connectionBuilder->connect();
}

private function __construct(LoopInterface $loop, ConnectorInterface $connector, Resolver $resolver, $uri)
{
$this->loop = $loop;
$this->connector = $connector;
$this->resolver = $resolver;
$this->uri = $uri;

if (\strpos($uri, '://') === false) {
$this->parts = \parse_url('tcp://' . $uri);
unset($this->parts['scheme']);
} else {
$this->parts = \parse_url($uri);
}

if (!$this->parts || !isset($this->parts['host'])) {
throw new \InvalidArgumentException('Given URI "' . $uri . '" is invalid');
}

$this->host = \trim($this->parts['host'], '[]');
}

/**
* @internal
*/
public function connect()
{
// skip DNS lookup / URI manipulation if this URI already contains an IP
if (false !== \filter_var($this->host, \FILTER_VALIDATE_IP)) {
return $this->connector->connect($this->uri);
}

$that = $this;
return new Promise\Promise(function ($resolve, $reject) use ($that) {
$that->resolve = $resolve;
$that->reject = $reject;
foreach (array(Message::TYPE_AAAA, Message::TYPE_A) as $type) {
$that->resolved[$type] = false;
$that->resolverPromises[$type] = $that->resolver->resolveAll($that->host, $type)->then(function (array $ips) use ($that, $type) {
unset($that->resolverPromises[$type]);
$that->resolved[$type] = true;

$that->pushIpsOntoQueue($ips);

$that->check();
}, function () use ($type, $reject, $that) {
unset($that->resolverPromises[$type]);
$that->resolved[$type] = true;

if ($that->hasBeenResolved() === false) {
return;
}

if ($that->ipsCount === 0) {
$that->resolved = null;
$that->resolverPromises = null;
$reject(new \RuntimeException('Connection to ' . $that->uri . ' failed during DNS lookup: DNS error'));
}
});
}
}, function ($_, $reject) use ($that) {
$that->cleanUp();

$reject(new \RuntimeException('Connection to ' . $that->uri . ' cancelled during DNS lookup'));

$_ = $reject = null;
});
}

/**
* @internal
*/
public function check()
{
$hasBeenResolved = $this->hasBeenResolved();

if ($hasBeenResolved === true && \count($this->connectQueue) === 0 && $this->timer instanceof TimerInterface) {
$this->loop->cancelTimer($this->timer);
$this->timer = null;
return;
}

if (\count($this->connectQueue) === 0 && $this->timer instanceof TimerInterface) {
$this->loop->cancelTimer($this->timer);
$this->timer = null;
return;
}

$ip = \array_shift($this->connectQueue);

if (\count($this->connectQueue) === 0 && $this->timer instanceof TimerInterface) {
$this->loop->cancelTimer($this->timer);
$this->timer = null;
}

$that = $this;
$resolve = $that->resolve;
$reject = $that->reject;
$that->connectionPromises[$ip] = $this->attemptConnection($ip)->then(function ($connection) use ($that, $ip, $resolve) {
unset($that->connectionPromises[$ip]);

$that->cleanUp();

$resolve($connection);
}, function () use ($that, $ip, $resolve, $reject) {
unset($that->connectionPromises[$ip]);

$that->failureCount++;

if ($that->hasBeenResolved() === false) {
return;
}

if ($that->ipsCount === $that->failureCount) {
$that->cleanUp();

$reject(new \RuntimeException('All attempts to connect to "' . $that->host . '" have failed'));
}
});

if (\count($this->connectQueue) > 0 && $this->timer === null) {
$this->timer = $this->loop->addPeriodicTimer(0.05, array($this, 'check'));
}
}

/**
* @internal
*/
public function attemptConnection($ip)
{
$resolved = null;
$promise = null;
$that = $this;

return new Promise\Promise(
function ($resolve, $reject) use (&$promise, &$resolved, $that, $ip) {
$resolved = $ip;
$uri = '';

// prepend original scheme if known
if (isset($that->parts['scheme'])) {
$uri .= $that->parts['scheme'] . '://';
}

if (\strpos($ip, ':') !== false) {
// enclose IPv6 addresses in square brackets before appending port
$uri .= '[' . $ip . ']';
} else {
$uri .= $ip;
}

// append original port if known
if (isset($that->parts['port'])) {
$uri .= ':' . $that->parts['port'];
}

// append orignal path if known
if (isset($that->parts['path'])) {
$uri .= $that->parts['path'];
}

// append original query if known
if (isset($that->parts['query'])) {
$uri .= '?' . $that->parts['query'];
}

// append original hostname as query if resolved via DNS and if
// destination URI does not contain "hostname" query param already
$args = array();
\parse_str(isset($that->parts['query']) ? $that->parts['query'] : '', $args);
if ($that->host !== $ip && !isset($args['hostname'])) {
$uri .= (isset($that->parts['query']) ? '&' : '?') . 'hostname=' . \rawurlencode($that->host);
}

// append original fragment if known
if (isset($that->parts['fragment'])) {
$uri .= '#' . $that->parts['fragment'];
}

$promise = $that->connector->connect($uri);
$promise->then($resolve, $reject);
},
function ($_, $reject) use (&$promise, &$resolved, $that) {
// cancellation should reject connection attempt
// (try to) cancel pending connection attempt
if ($resolved === null) {
$reject(new \RuntimeException('Connection to ' . $that->uri . ' cancelled during connection attempt'));
}

if ($promise instanceof CancellablePromiseInterface) {
// overwrite callback arguments for PHP7+ only, so they do not show
// up in the Exception trace and do not cause a possible cyclic reference.
$_ = $reject = null;

$promise->cancel();
$promise = null;
}
}
);
}

/**
* @internal
*/
public function cleanUp()
{
/** @var CancellablePromiseInterface $promise */
foreach ($this->connectionPromises as $index => $connectionPromise) {
if ($connectionPromise instanceof CancellablePromiseInterface) {
$connectionPromise->cancel();
}
unset($this->connectionPromises[$index]);
$connectionPromise = null;
}
$this->connectionPromises = null;

/** @var CancellablePromiseInterface $promise */
foreach ($this->resolverPromises as $index => $resolverPromise) {
if ($resolverPromise instanceof CancellablePromiseInterface) {
$resolverPromise->cancel();
}
unset($this->resolverPromises[$index]);
$resolverPromise = null;
}
$this->resolverPromises = null;

if ($this->timer instanceof TimerInterface) {
$this->loop->cancelTimer($this->timer);
$this->timer = null;
}
}

/**
* @internal
*/
public function hasBeenResolved()
{
foreach ($this->resolved as $typeHasBeenResolved) {
if ($typeHasBeenResolved === false) {
return false;

break;
}
}

return true;
}

/**
* @internal
*/
public function pushIpsOntoQueue(array $ips)
{
$this->ipsCount += \count($ips);
$connectQueueStash = $this->connectQueue;
$this->connectQueue = array();
while (\count($connectQueueStash) > 0 || \count($ips) > 0) {
if (\count($ips) > 0) {
$this->connectQueue[] = \array_shift($ips);
}
if (\count($connectQueueStash) > 0) {
$this->connectQueue[] = \array_shift($connectQueueStash);
}
}
}
}
Loading

0 comments on commit 3117a07

Please sign in to comment.