Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistently close HTTP client connections when response stream closes #484

Merged
merged 3 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 61 additions & 37 deletions src/Io/ClientRequestStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use Evenement\EventEmitter;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Http\Message\Response;
use React\Promise;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
Expand All @@ -14,7 +16,7 @@
* @event response
* @event drain
* @event error
* @event end
* @event close
* @internal
*/
class ClientRequestStream extends EventEmitter implements WritableStreamInterface
Expand All @@ -31,9 +33,11 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac
private $request;

/** @var ?ConnectionInterface */
private $stream;
private $connection;

/** @var string */
private $buffer = '';

private $buffer;
private $responseFactory;
private $state = self::STATE_INIT;
private $ended = false;
Expand All @@ -56,22 +60,22 @@ private function writeHead()
$this->state = self::STATE_WRITING_HEAD;

$request = $this->request;
$streamRef = &$this->stream;
$connectionRef = &$this->connection;
$stateRef = &$this->state;
$pendingWrites = &$this->pendingWrites;
$that = $this;

$promise = $this->connect();
$promise->then(
function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &$pendingWrites, $that) {
$streamRef = $stream;
assert($streamRef instanceof ConnectionInterface);
function (ConnectionInterface $connection) use ($request, &$connectionRef, &$stateRef, &$pendingWrites, $that) {
$connectionRef = $connection;
assert($connectionRef instanceof ConnectionInterface);

$stream->on('drain', array($that, 'handleDrain'));
$stream->on('data', array($that, 'handleData'));
$stream->on('end', array($that, 'handleEnd'));
$stream->on('error', array($that, 'handleError'));
$stream->on('close', array($that, 'handleClose'));
$connection->on('drain', array($that, 'handleDrain'));
$connection->on('data', array($that, 'handleData'));
$connection->on('end', array($that, 'handleEnd'));
$connection->on('error', array($that, 'handleError'));
$connection->on('close', array($that, 'close'));

assert($request instanceof RequestInterface);
$headers = "{$request->getMethod()} {$request->getRequestTarget()} HTTP/{$request->getProtocolVersion()}\r\n";
Expand All @@ -81,7 +85,7 @@ function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &
}
}

$more = $stream->write($headers . "\r\n" . $pendingWrites);
$more = $connection->write($headers . "\r\n" . $pendingWrites);

assert($stateRef === ClientRequestStream::STATE_WRITING_HEAD);
$stateRef = ClientRequestStream::STATE_HEAD_WRITTEN;
Expand Down Expand Up @@ -111,7 +115,7 @@ public function write($data)

// write directly to connection stream if already available
if (self::STATE_HEAD_WRITTEN <= $this->state) {
return $this->stream->write($data);
return $this->connection->write($data);
}

// otherwise buffer and try to establish connection
Expand Down Expand Up @@ -155,26 +159,50 @@ public function handleData($data)
$response = gPsr\parse_response($this->buffer);
$bodyChunk = (string) $response->getBody();
} catch (\InvalidArgumentException $exception) {
$this->emit('error', array($exception));
}

$this->buffer = null;

$this->stream->removeListener('drain', array($this, 'handleDrain'));
$this->stream->removeListener('data', array($this, 'handleData'));
$this->stream->removeListener('end', array($this, 'handleEnd'));
$this->stream->removeListener('error', array($this, 'handleError'));
$this->stream->removeListener('close', array($this, 'handleClose'));

if (!isset($response)) {
$this->closeError($exception);
return;
}

$this->stream->on('close', array($this, 'handleClose'));
// response headers successfully received => remove listeners for connection events
$connection = $this->connection;
assert($connection instanceof ConnectionInterface);
$connection->removeListener('drain', array($this, 'handleDrain'));
$connection->removeListener('data', array($this, 'handleData'));
$connection->removeListener('end', array($this, 'handleEnd'));
$connection->removeListener('error', array($this, 'handleError'));
$connection->removeListener('close', array($this, 'close'));
$this->connection = null;
$this->buffer = '';

// take control over connection handling and close connection once response body closes
$that = $this;
$input = $body = new CloseProtectionStream($connection);
$input->on('close', function () use ($connection, $that) {
$connection->close();
$that->close();
});

// determine length of response body
$length = null;
$code = $response->getStatusCode();
if ($this->request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) {
$length = 0;
} elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') {
$body = new ChunkedDecoder($body);
} elseif ($response->hasHeader('Content-Length')) {
$length = (int) $response->getHeaderLine('Content-Length');
}
$response = $response->withBody($body = new ReadableBodyStream($body, $length));

$this->emit('response', array($response, $this->stream));
// emit response with streaming response body (see `Sender`)
$this->emit('response', array($response, $body));

$this->stream->emit('data', array($bodyChunk));
// re-emit HTTP response body to trigger body parsing if parts of it are buffered
if ($bodyChunk !== '') {
$input->handleData($bodyChunk);
} elseif ($length === 0) {
$input->handleEnd();
}
}
}

Expand All @@ -196,12 +224,6 @@ public function handleError(\Exception $error)
));
}

/** @internal */
public function handleClose()
{
$this->close();
}

/** @internal */
public function closeError(\Exception $error)
{
Expand All @@ -220,9 +242,11 @@ public function close()

$this->state = self::STATE_END;
$this->pendingWrites = '';
$this->buffer = '';

if ($this->stream) {
$this->stream->close();
if ($this->connection instanceof ConnectionInterface) {
$this->connection->close();
$this->connection = null;
}

$this->emit('close');
Expand Down
19 changes: 5 additions & 14 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\LoopInterface;
use React\Http\Client\Client as HttpClient;
use React\Http\Message\Response;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Socket\ConnectorInterface;
Expand Down Expand Up @@ -95,8 +94,10 @@ public function send(RequestInterface $request)
}

// automatically add `Connection: close` request header for HTTP/1.1 requests to avoid connection reuse
if ($request->getProtocolVersion() === '1.1' && !$request->hasHeader('Connection')) {
if ($request->getProtocolVersion() === '1.1') {
$request = $request->withHeader('Connection', 'close');
} else {
$request = $request->withoutHeader('Connection');
}

// automatically add `Authorization: Basic …` request header if URL includes `user:pass@host`
Expand All @@ -116,18 +117,8 @@ public function send(RequestInterface $request)
$deferred->reject($error);
});

$requestStream->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($deferred, $request) {
$length = null;
$code = $response->getStatusCode();
if ($request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) {
$length = 0;
} elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') {
$body = new ChunkedDecoder($body);
} elseif ($response->hasHeader('Content-Length')) {
$length = (int) $response->getHeaderLine('Content-Length');
}

$deferred->resolve($response->withBody(new ReadableBodyStream($body, $length)));
$requestStream->on('response', function (ResponseInterface $response) use ($deferred, $request) {
$deferred->resolve($response);
});

if ($body instanceof ReadableStreamInterface) {
Expand Down
Loading