Skip to content

Commit

Permalink
Implement the Happy Eye Balls RFC's
Browse files Browse the repository at this point in the history
By using the happy eye balls algorythm as descripted in RFC6555 and
RFC8305 it will connect to the quickest responding server with a
preference for IPv6.
  • Loading branch information
WyriHaximus committed Mar 5, 2019
1 parent 2cf8dfa commit 4662be9
Show file tree
Hide file tree
Showing 2 changed files with 711 additions and 0 deletions.
284 changes: 284 additions & 0 deletions src/HappyEyeBallsConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
<?php

namespace React\Socket;

use React\Dns\Model\Message;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise;
use React\Promise\CancellablePromiseInterface;
use InvalidArgumentException;
use RuntimeException;

final class HappyEyeBallsConnector implements ConnectorInterface
{
private $loop;
private $connector;
private $resolver;

public function __construct(LoopInterface $loop, ConnectorInterface $connector, Resolver $resolver)
{
$this->loop = $loop;
$this->connector = $connector;
$this->resolver = $resolver;
}

public function connect($uri)
{
if (\strpos($uri, '://') === false) {
$parts = \parse_url('tcp://' . $uri);
unset($parts['scheme']);
} else {
$parts = \parse_url($uri);
}

if (!$parts || !isset($parts['host'])) {
return Promise\reject(new \InvalidArgumentException('Given URI "' . $uri . '" is invalid'));
}

$host = \trim($parts['host'], '[]');
$connector = $this->connector;

// skip DNS lookup / URI manipulation if this URI already contains an IP
if (false !== \filter_var($host, \FILTER_VALIDATE_IP)) {
return $connector->connect($uri);
}

$that = $this;
$loop = $this->loop;
$resolver = $this->resolver;
$resolved = array();
$resolverPromises = array();
$connectionPromises = array();
$timer = null;

return new Promise\Promise(function ($resolve, $reject) use (&$timer, &$resolverPromises, &$connectionPromises, &$resolved, $loop, $resolver, $uri, $parts, $host, $that) {
$connectQueue = array();
$ipsCount = 0;
$failureCount = 0;
$check = function () use (&$timer, &$resolved, &$connectQueue, &$failureCount, &$ipsCount, &$connectionPromises, &$resolverPromises, $loop, $uri, $parts, $host, $that, $resolve, $reject, &$check) {
$hasBeenResolved = true;
foreach ($resolved as $typeHasBeenResolved) {
if ($typeHasBeenResolved === false) {
$hasBeenResolved = false;

break;
}
}

if ($hasBeenResolved === true && \count($connectQueue) === 0 && $timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
$timer = null;
return;
}

if (\count($connectQueue) === 0 && $timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
$timer = null;
return;
}

$ip = \array_shift($connectQueue);

if (\count($connectQueue) === 0 && $timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
$timer = null;
}

$connectionPromises[$ip] = $that->attemptConnection($ip, $uri, $parts, $host)->then(function ($connection) use ($resolve, &$connectionPromises, &$resolverPromises, $ip, &$timer, $loop) {
unset($connectionPromises[$ip]);

/** @var CancellablePromiseInterface $promise */
foreach ($connectionPromises as $connectionPromise) {
if (!($connectionPromise instanceof CancellablePromiseInterface)) {
continue;
}

$connectionPromise->cancel();
$connectionPromise = null;
}

/** @var CancellablePromiseInterface $promise */
foreach ($resolverPromises as $resolverPromise) {
if (!($resolverPromise instanceof CancellablePromiseInterface)) {
continue;
}

$resolverPromise->cancel();
$resolverPromise = null;
}

if ($timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
$timer = null;
}

$resolve($connection);
}, function () use (&$failureCount, &$ipsCount, &$resolved, $reject, $loop, &$timer, $host, &$connectionPromises, $ip) {
unset($connectionPromises[$ip]);

$failureCount++;

foreach ($resolved as $typeHasBeenResolved) {
if ($typeHasBeenResolved === false) {
return;
}
}

if ($ipsCount === $failureCount) {
if ($timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
$timer = null;
}
$reject(new \RuntimeException('All attempts to connect to "' . $host . '" have failed'));
}
});

if (\count($connectQueue) > 0 && $timer === null) {
$timer = $loop->addPeriodicTimer(0.05, $check);
}
};

foreach (array(Message::TYPE_AAAA, Message::TYPE_A) as $type) {
$resolved[$type] = false;
$resolverPromises[$type] = $resolver->resolveAll($host, $type)->then(function (array $ips) use (&$connectQueue, &$resolved, $type, $check, &$ipsCount, &$resolverPromises, &$timer) {
unset($resolverPromises[$type]);
$resolved[$type] = true;
$ipsCount += \count($ips);
$connectQueueStash = $connectQueue;
$connectQueue = array();
while (\count($connectQueueStash) > 0 || \count($ips) > 0) {
if (\count($ips) > 0) {
$connectQueue[] = \array_shift($ips);
}
if (\count($connectQueueStash) > 0) {
$connectQueue[] = \array_shift($connectQueueStash);
}
}
$check();
}, function () use (&$resolved, $type, &$resolverPromises, &$ipsCount, $reject, $uri) {
unset($resolverPromises[$type]);
$resolved[$type] = true;

foreach ($resolved as $typeHasBeenResolved) {
if ($typeHasBeenResolved === false) {
return;
}
}

if ($ipsCount === 0) {
unset($resolverPromises, $resolved);
$reject(new \RuntimeException('Connection to ' . $uri . ' failed during DNS lookup: DNS error'));
$reject = null;
}
});
}
}, function ($_, $reject) use (&$resolverPromises, &$connectionPromises, &$timer, &$resolved, $loop, $uri) {
if ($timer instanceof TimerInterface) {
$loop->cancelTimer($timer);
$timer = null;
}
foreach ($resolverPromises as $resolverPromise) {
if ($resolverPromise instanceof CancellablePromiseInterface) {
$resolverPromise->cancel();
$resolverPromise = null;
}
}
$resolverPromises = null;
foreach ($connectionPromises as $connectionPromise) {
if ($connectionPromise instanceof CancellablePromiseInterface) {
$connectionPromise->cancel();
$connectionPromise = null;
}
}
$connectionPromises = null;

foreach ($resolved as $typeHasBeenResolved) {
if ($typeHasBeenResolved === true) {
return;
}
}

$reject(new \RuntimeException('Connection to ' . $uri . ' cancelled during DNS lookup'));

$_ = $reject = null;
});
}

/**
* @internal
*/
public function attemptConnection($ip, $uri, $parts, $host)
{
$connector = $this->connector;
$resolved = null;
$promise = null;

return new Promise\Promise(
function ($resolve, $reject) use (&$promise, &$resolved, $ip, $uri, $connector, $host, $parts) {
$resolved = $ip;
$uri = '';

// prepend original scheme if known
if (isset($parts['scheme'])) {
$uri .= $parts['scheme'] . '://';
}

if (\strpos($ip, ':') !== false) {
// enclose IPv6 addresses in square brackets before appending port
$uri .= '[' . $ip . ']';
} else {
$uri .= $ip;
}

// append original port if known
if (isset($parts['port'])) {
$uri .= ':' . $parts['port'];
}

// append orignal path if known
if (isset($parts['path'])) {
$uri .= $parts['path'];
}

// append original query if known
if (isset($parts['query'])) {
$uri .= '?' . $parts['query'];
}

// append original hostname as query if resolved via DNS and if
// destination URI does not contain "hostname" query param already
$args = array();
\parse_str(isset($parts['query']) ? $parts['query'] : '', $args);
if ($host !== $ip && !isset($args['hostname'])) {
$uri .= (isset($parts['query']) ? '&' : '?') . 'hostname=' . \rawurlencode($host);
}

// append original fragment if known
if (isset($parts['fragment'])) {
$uri .= '#' . $parts['fragment'];
}

$promise = $connector->connect($uri);
$promise->then($resolve, $reject);
},
function ($_, $reject) use (&$promise, &$resolved, $uri) {
// cancellation should reject connection attempt
// (try to) cancel pending connection attempt
if ($resolved === null) {
$reject(new \RuntimeException('Connection to ' . $uri . ' cancelled during connection attempt'));
}

if ($promise instanceof CancellablePromiseInterface) {
// overwrite callback arguments for PHP7+ only, so they do not show
// up in the Exception trace and do not cause a possible cyclic reference.
$_ = $reject = null;

$promise->cancel();
$promise = null;
}
}
);
}
}
Loading

0 comments on commit 4662be9

Please sign in to comment.