Skip to content

Commit

Permalink
Merge pull request #484 from clue-labs/connection-close
Browse files Browse the repository at this point in the history
Consistently close HTTP client connections when response stream closes
  • Loading branch information
SimonFrings authored Jan 19, 2023
2 parents ce16c02 + 28b598a commit b34bbed
Show file tree
Hide file tree
Showing 5 changed files with 487 additions and 178 deletions.
98 changes: 61 additions & 37 deletions src/Io/ClientRequestStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use Evenement\EventEmitter;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Http\Message\Response;
use React\Promise;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
Expand All @@ -14,7 +16,7 @@
* @event response
* @event drain
* @event error
* @event end
* @event close
* @internal
*/
class ClientRequestStream extends EventEmitter implements WritableStreamInterface
Expand All @@ -31,9 +33,11 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac
private $request;

/** @var ?ConnectionInterface */
private $stream;
private $connection;

/** @var string */
private $buffer = '';

private $buffer;
private $responseFactory;
private $state = self::STATE_INIT;
private $ended = false;
Expand All @@ -56,22 +60,22 @@ private function writeHead()
$this->state = self::STATE_WRITING_HEAD;

$request = $this->request;
$streamRef = &$this->stream;
$connectionRef = &$this->connection;
$stateRef = &$this->state;
$pendingWrites = &$this->pendingWrites;
$that = $this;

$promise = $this->connect();
$promise->then(
function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &$pendingWrites, $that) {
$streamRef = $stream;
assert($streamRef instanceof ConnectionInterface);
function (ConnectionInterface $connection) use ($request, &$connectionRef, &$stateRef, &$pendingWrites, $that) {
$connectionRef = $connection;
assert($connectionRef instanceof ConnectionInterface);

$stream->on('drain', array($that, 'handleDrain'));
$stream->on('data', array($that, 'handleData'));
$stream->on('end', array($that, 'handleEnd'));
$stream->on('error', array($that, 'handleError'));
$stream->on('close', array($that, 'handleClose'));
$connection->on('drain', array($that, 'handleDrain'));
$connection->on('data', array($that, 'handleData'));
$connection->on('end', array($that, 'handleEnd'));
$connection->on('error', array($that, 'handleError'));
$connection->on('close', array($that, 'close'));

assert($request instanceof RequestInterface);
$headers = "{$request->getMethod()} {$request->getRequestTarget()} HTTP/{$request->getProtocolVersion()}\r\n";
Expand All @@ -81,7 +85,7 @@ function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &
}
}

$more = $stream->write($headers . "\r\n" . $pendingWrites);
$more = $connection->write($headers . "\r\n" . $pendingWrites);

assert($stateRef === ClientRequestStream::STATE_WRITING_HEAD);
$stateRef = ClientRequestStream::STATE_HEAD_WRITTEN;
Expand Down Expand Up @@ -111,7 +115,7 @@ public function write($data)

// write directly to connection stream if already available
if (self::STATE_HEAD_WRITTEN <= $this->state) {
return $this->stream->write($data);
return $this->connection->write($data);
}

// otherwise buffer and try to establish connection
Expand Down Expand Up @@ -155,26 +159,50 @@ public function handleData($data)
$response = gPsr\parse_response($this->buffer);
$bodyChunk = (string) $response->getBody();
} catch (\InvalidArgumentException $exception) {
$this->emit('error', array($exception));
}

$this->buffer = null;

$this->stream->removeListener('drain', array($this, 'handleDrain'));
$this->stream->removeListener('data', array($this, 'handleData'));
$this->stream->removeListener('end', array($this, 'handleEnd'));
$this->stream->removeListener('error', array($this, 'handleError'));
$this->stream->removeListener('close', array($this, 'handleClose'));

if (!isset($response)) {
$this->closeError($exception);
return;
}

$this->stream->on('close', array($this, 'handleClose'));
// response headers successfully received => remove listeners for connection events
$connection = $this->connection;
assert($connection instanceof ConnectionInterface);
$connection->removeListener('drain', array($this, 'handleDrain'));
$connection->removeListener('data', array($this, 'handleData'));
$connection->removeListener('end', array($this, 'handleEnd'));
$connection->removeListener('error', array($this, 'handleError'));
$connection->removeListener('close', array($this, 'close'));
$this->connection = null;
$this->buffer = '';

// take control over connection handling and close connection once response body closes
$that = $this;
$input = $body = new CloseProtectionStream($connection);
$input->on('close', function () use ($connection, $that) {
$connection->close();
$that->close();
});

// determine length of response body
$length = null;
$code = $response->getStatusCode();
if ($this->request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) {
$length = 0;
} elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') {
$body = new ChunkedDecoder($body);
} elseif ($response->hasHeader('Content-Length')) {
$length = (int) $response->getHeaderLine('Content-Length');
}
$response = $response->withBody($body = new ReadableBodyStream($body, $length));

$this->emit('response', array($response, $this->stream));
// emit response with streaming response body (see `Sender`)
$this->emit('response', array($response, $body));

$this->stream->emit('data', array($bodyChunk));
// re-emit HTTP response body to trigger body parsing if parts of it are buffered
if ($bodyChunk !== '') {
$input->handleData($bodyChunk);
} elseif ($length === 0) {
$input->handleEnd();
}
}
}

Expand All @@ -196,12 +224,6 @@ public function handleError(\Exception $error)
));
}

/** @internal */
public function handleClose()
{
$this->close();
}

/** @internal */
public function closeError(\Exception $error)
{
Expand All @@ -220,9 +242,11 @@ public function close()

$this->state = self::STATE_END;
$this->pendingWrites = '';
$this->buffer = '';

if ($this->stream) {
$this->stream->close();
if ($this->connection instanceof ConnectionInterface) {
$this->connection->close();
$this->connection = null;
}

$this->emit('close');
Expand Down
19 changes: 5 additions & 14 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\LoopInterface;
use React\Http\Client\Client as HttpClient;
use React\Http\Message\Response;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Socket\ConnectorInterface;
Expand Down Expand Up @@ -95,8 +94,10 @@ public function send(RequestInterface $request)
}

// automatically add `Connection: close` request header for HTTP/1.1 requests to avoid connection reuse
if ($request->getProtocolVersion() === '1.1' && !$request->hasHeader('Connection')) {
if ($request->getProtocolVersion() === '1.1') {
$request = $request->withHeader('Connection', 'close');
} else {
$request = $request->withoutHeader('Connection');
}

// automatically add `Authorization: Basic …` request header if URL includes `user:pass@host`
Expand All @@ -116,18 +117,8 @@ public function send(RequestInterface $request)
$deferred->reject($error);
});

$requestStream->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($deferred, $request) {
$length = null;
$code = $response->getStatusCode();
if ($request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) {
$length = 0;
} elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') {
$body = new ChunkedDecoder($body);
} elseif ($response->hasHeader('Content-Length')) {
$length = (int) $response->getHeaderLine('Content-Length');
}

$deferred->resolve($response->withBody(new ReadableBodyStream($body, $length)));
$requestStream->on('response', function (ResponseInterface $response) use ($deferred, $request) {
$deferred->resolve($response);
});

if ($body instanceof ReadableStreamInterface) {
Expand Down
Loading

0 comments on commit b34bbed

Please sign in to comment.