diff --git a/src/Io/ClientRequestStream.php b/src/Io/ClientRequestStream.php index 29536e88..bdaa54f1 100644 --- a/src/Io/ClientRequestStream.php +++ b/src/Io/ClientRequestStream.php @@ -4,6 +4,8 @@ use Evenement\EventEmitter; use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; +use React\Http\Message\Response; use React\Promise; use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; @@ -14,7 +16,7 @@ * @event response * @event drain * @event error - * @event end + * @event close * @internal */ class ClientRequestStream extends EventEmitter implements WritableStreamInterface @@ -31,9 +33,11 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac private $request; /** @var ?ConnectionInterface */ - private $stream; + private $connection; + + /** @var string */ + private $buffer = ''; - private $buffer; private $responseFactory; private $state = self::STATE_INIT; private $ended = false; @@ -56,22 +60,22 @@ private function writeHead() $this->state = self::STATE_WRITING_HEAD; $request = $this->request; - $streamRef = &$this->stream; + $connectionRef = &$this->connection; $stateRef = &$this->state; $pendingWrites = &$this->pendingWrites; $that = $this; $promise = $this->connect(); $promise->then( - function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &$pendingWrites, $that) { - $streamRef = $stream; - assert($streamRef instanceof ConnectionInterface); + function (ConnectionInterface $connection) use ($request, &$connectionRef, &$stateRef, &$pendingWrites, $that) { + $connectionRef = $connection; + assert($connectionRef instanceof ConnectionInterface); - $stream->on('drain', array($that, 'handleDrain')); - $stream->on('data', array($that, 'handleData')); - $stream->on('end', array($that, 'handleEnd')); - $stream->on('error', array($that, 'handleError')); - $stream->on('close', array($that, 'handleClose')); + $connection->on('drain', array($that, 'handleDrain')); + $connection->on('data', array($that, 'handleData')); + $connection->on('end', array($that, 'handleEnd')); + $connection->on('error', array($that, 'handleError')); + $connection->on('close', array($that, 'close')); assert($request instanceof RequestInterface); $headers = "{$request->getMethod()} {$request->getRequestTarget()} HTTP/{$request->getProtocolVersion()}\r\n"; @@ -81,7 +85,7 @@ function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, & } } - $more = $stream->write($headers . "\r\n" . $pendingWrites); + $more = $connection->write($headers . "\r\n" . $pendingWrites); assert($stateRef === ClientRequestStream::STATE_WRITING_HEAD); $stateRef = ClientRequestStream::STATE_HEAD_WRITTEN; @@ -111,7 +115,7 @@ public function write($data) // write directly to connection stream if already available if (self::STATE_HEAD_WRITTEN <= $this->state) { - return $this->stream->write($data); + return $this->connection->write($data); } // otherwise buffer and try to establish connection @@ -155,26 +159,50 @@ public function handleData($data) $response = gPsr\parse_response($this->buffer); $bodyChunk = (string) $response->getBody(); } catch (\InvalidArgumentException $exception) { - $this->emit('error', array($exception)); - } - - $this->buffer = null; - - $this->stream->removeListener('drain', array($this, 'handleDrain')); - $this->stream->removeListener('data', array($this, 'handleData')); - $this->stream->removeListener('end', array($this, 'handleEnd')); - $this->stream->removeListener('error', array($this, 'handleError')); - $this->stream->removeListener('close', array($this, 'handleClose')); - - if (!isset($response)) { + $this->closeError($exception); return; } - $this->stream->on('close', array($this, 'handleClose')); + // response headers successfully received => remove listeners for connection events + $connection = $this->connection; + assert($connection instanceof ConnectionInterface); + $connection->removeListener('drain', array($this, 'handleDrain')); + $connection->removeListener('data', array($this, 'handleData')); + $connection->removeListener('end', array($this, 'handleEnd')); + $connection->removeListener('error', array($this, 'handleError')); + $connection->removeListener('close', array($this, 'close')); + $this->connection = null; + $this->buffer = ''; + + // take control over connection handling and close connection once response body closes + $that = $this; + $input = $body = new CloseProtectionStream($connection); + $input->on('close', function () use ($connection, $that) { + $connection->close(); + $that->close(); + }); + + // determine length of response body + $length = null; + $code = $response->getStatusCode(); + if ($this->request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) { + $length = 0; + } elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') { + $body = new ChunkedDecoder($body); + } elseif ($response->hasHeader('Content-Length')) { + $length = (int) $response->getHeaderLine('Content-Length'); + } + $response = $response->withBody($body = new ReadableBodyStream($body, $length)); - $this->emit('response', array($response, $this->stream)); + // emit response with streaming response body (see `Sender`) + $this->emit('response', array($response, $body)); - $this->stream->emit('data', array($bodyChunk)); + // re-emit HTTP response body to trigger body parsing if parts of it are buffered + if ($bodyChunk !== '') { + $input->handleData($bodyChunk); + } elseif ($length === 0) { + $input->handleEnd(); + } } } @@ -196,12 +224,6 @@ public function handleError(\Exception $error) )); } - /** @internal */ - public function handleClose() - { - $this->close(); - } - /** @internal */ public function closeError(\Exception $error) { @@ -220,9 +242,11 @@ public function close() $this->state = self::STATE_END; $this->pendingWrites = ''; + $this->buffer = ''; - if ($this->stream) { - $this->stream->close(); + if ($this->connection instanceof ConnectionInterface) { + $this->connection->close(); + $this->connection = null; } $this->emit('close'); diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 2e821f5a..acbb6e7d 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -6,7 +6,6 @@ use Psr\Http\Message\ResponseInterface; use React\EventLoop\LoopInterface; use React\Http\Client\Client as HttpClient; -use React\Http\Message\Response; use React\Promise\PromiseInterface; use React\Promise\Deferred; use React\Socket\ConnectorInterface; @@ -95,8 +94,10 @@ public function send(RequestInterface $request) } // automatically add `Connection: close` request header for HTTP/1.1 requests to avoid connection reuse - if ($request->getProtocolVersion() === '1.1' && !$request->hasHeader('Connection')) { + if ($request->getProtocolVersion() === '1.1') { $request = $request->withHeader('Connection', 'close'); + } else { + $request = $request->withoutHeader('Connection'); } // automatically add `Authorization: Basic …` request header if URL includes `user:pass@host` @@ -116,18 +117,8 @@ public function send(RequestInterface $request) $deferred->reject($error); }); - $requestStream->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($deferred, $request) { - $length = null; - $code = $response->getStatusCode(); - if ($request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) { - $length = 0; - } elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') { - $body = new ChunkedDecoder($body); - } elseif ($response->hasHeader('Content-Length')) { - $length = (int) $response->getHeaderLine('Content-Length'); - } - - $deferred->resolve($response->withBody(new ReadableBodyStream($body, $length))); + $requestStream->on('response', function (ResponseInterface $response) use ($deferred, $request) { + $deferred->resolve($response); }); if ($body instanceof ReadableStreamInterface) { diff --git a/tests/Io/ClientRequestStreamTest.php b/tests/Io/ClientRequestStreamTest.php index 07a4eb73..93220d10 100644 --- a/tests/Io/ClientRequestStreamTest.php +++ b/tests/Io/ClientRequestStreamTest.php @@ -2,27 +2,24 @@ namespace React\Tests\Http\Io; +use Psr\Http\Message\ResponseInterface; use React\Http\Io\ClientRequestStream; use React\Http\Message\Request; use React\Promise\Deferred; use React\Promise\Promise; use React\Stream\DuplexResourceStream; +use React\Stream\ReadableStreamInterface; use React\Tests\Http\TestCase; class ClientRequestStreamTest extends TestCase { private $connector; - private $stream; /** * @before */ public function setUpStream() { - $this->stream = $this->getMockBuilder('React\Socket\ConnectionInterface') - ->disableOriginalConstructor() - ->getMock(); - $this->connector = $this->getMockBuilder('React\Socket\ConnectorInterface') ->getMock(); } @@ -30,29 +27,29 @@ public function setUpStream() /** @test */ public function requestShouldBindToStreamEventsAndUseconnector() { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + $requestData = new Request('GET', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->successfulConnectionMock(); - - $this->stream->expects($this->exactly(6))->method('on')->withConsecutive( + $connection->expects($this->atLeast(5))->method('on')->withConsecutive( array('drain', $this->identicalTo(array($request, 'handleDrain'))), array('data', $this->identicalTo(array($request, 'handleData'))), array('end', $this->identicalTo(array($request, 'handleEnd'))), array('error', $this->identicalTo(array($request, 'handleError'))), - array('close', $this->identicalTo(array($request, 'handleClose'))) + array('close', $this->identicalTo(array($request, 'close'))) ); - $this->stream->expects($this->exactly(5))->method('removeListener')->withConsecutive( + $connection->expects($this->exactly(5))->method('removeListener')->withConsecutive( array('drain', $this->identicalTo(array($request, 'handleDrain'))), array('data', $this->identicalTo(array($request, 'handleData'))), array('end', $this->identicalTo(array($request, 'handleEnd'))), array('error', $this->identicalTo(array($request, 'handleError'))), - array('close', $this->identicalTo(array($request, 'handleClose'))) + array('close', $this->identicalTo(array($request, 'close'))) ); - $request->on('end', $this->expectCallableNever()); - $request->end(); $request->handleData("HTTP/1.0 200 OK\r\n"); @@ -65,26 +62,24 @@ public function requestShouldBindToStreamEventsAndUseconnector() */ public function requestShouldConnectViaTlsIfUrlUsesHttpsScheme() { + $this->connector->expects($this->once())->method('connect')->with('tls://www.example.com:443')->willReturn(new Promise(function () { })); + $requestData = new Request('GET', 'https://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->connector->expects($this->once())->method('connect')->with('tls://www.example.com:443')->willReturn(new Promise(function () { })); - $request->end(); } /** @test */ public function requestShouldEmitErrorIfConnectionFails() { + $this->connector->expects($this->once())->method('connect')->willReturn(\React\Promise\reject(new \RuntimeException())); + $requestData = new Request('GET', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->connector->expects($this->once())->method('connect')->willReturn(\React\Promise\reject(new \RuntimeException())); - $request->on('error', $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $request->on('close', $this->expectCallableOnce()); - $request->on('end', $this->expectCallableNever()); $request->end(); } @@ -92,15 +87,15 @@ public function requestShouldEmitErrorIfConnectionFails() /** @test */ public function requestShouldEmitErrorIfConnectionClosesBeforeResponseIsParsed() { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + $requestData = new Request('GET', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->successfulConnectionMock(); - $request->on('error', $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $request->on('close', $this->expectCallableOnce()); - $request->on('end', $this->expectCallableNever()); $request->end(); $request->handleEnd(); @@ -109,15 +104,15 @@ public function requestShouldEmitErrorIfConnectionClosesBeforeResponseIsParsed() /** @test */ public function requestShouldEmitErrorIfConnectionEmitsError() { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + $requestData = new Request('GET', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->successfulConnectionMock(); - $request->on('error', $this->expectCallableOnceWith($this->isInstanceOf('Exception'))); - $request->on('close', $this->expectCallableOnce()); - $request->on('end', $this->expectCallableNever()); $request->end(); $request->handleError(new \Exception('test')); @@ -126,12 +121,15 @@ public function requestShouldEmitErrorIfConnectionEmitsError() /** @test */ public function requestShouldEmitErrorIfRequestParserThrowsException() { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + $requestData = new Request('GET', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->successfulConnectionMock(); - $request->on('error', $this->expectCallableOnceWith($this->isInstanceOf('InvalidArgumentException'))); + $request->on('close', $this->expectCallableOnce()); $request->end(); $request->handleData("\r\n\r\n"); @@ -142,13 +140,13 @@ public function requestShouldEmitErrorIfRequestParserThrowsException() */ public function requestShouldEmitErrorIfUrlIsInvalid() { + $this->connector->expects($this->never())->method('connect'); + $requestData = new Request('GET', 'ftp://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); $request->on('error', $this->expectCallableOnceWith($this->isInstanceOf('InvalidArgumentException'))); - - $this->connector->expects($this->never()) - ->method('connect'); + $request->on('close', $this->expectCallableOnce()); $request->end(); } @@ -158,13 +156,13 @@ public function requestShouldEmitErrorIfUrlIsInvalid() */ public function requestShouldEmitErrorIfUrlHasNoScheme() { + $this->connector->expects($this->never())->method('connect'); + $requestData = new Request('GET', 'www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); $request->on('error', $this->expectCallableOnceWith($this->isInstanceOf('InvalidArgumentException'))); - - $this->connector->expects($this->never()) - ->method('connect'); + $request->on('close', $this->expectCallableOnce()); $request->end(); } @@ -172,12 +170,13 @@ public function requestShouldEmitErrorIfUrlHasNoScheme() /** @test */ public function getRequestShouldSendAGetRequest() { - $requestData = new Request('GET', 'http://www.example.com', array(), '', '1.0'); - $request = new ClientRequestStream($this->connector, $requestData); + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.0\r\nHost: www.example.com\r\n\r\n"); - $this->successfulConnectionMock(); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); - $this->stream->expects($this->once())->method('write')->with("GET / HTTP/1.0\r\nHost: www.example.com\r\n\r\n"); + $requestData = new Request('GET', 'http://www.example.com', array(), '', '1.0'); + $request = new ClientRequestStream($this->connector, $requestData); $request->end(); } @@ -185,12 +184,13 @@ public function getRequestShouldSendAGetRequest() /** @test */ public function getHttp11RequestShouldSendAGetRequestWithGivenConnectionCloseHeader() { - $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); - $request = new ClientRequestStream($this->connector, $requestData); + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); - $this->successfulConnectionMock(); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); - $this->stream->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); $request->end(); } @@ -198,29 +198,331 @@ public function getHttp11RequestShouldSendAGetRequestWithGivenConnectionCloseHea /** @test */ public function getOptionsAsteriskShouldSendAOptionsRequestAsteriskRequestTarget() { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("OPTIONS * HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + $requestData = new Request('OPTIONS', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); $requestData = $requestData->withRequestTarget('*'); $request = new ClientRequestStream($this->connector, $requestData); - $this->successfulConnectionMock(); + $request->end(); + } + + public function testStreamShouldEmitResponseWithEmptyBodyWhenResponseContainsContentLengthZero() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->once())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableNever()); + $body->on('end', $that->expectCallableOnce()); + $body->on('close', $that->expectCallableOnce()); + }); + $request->on('close', $this->expectCallableOnce()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"); + } + + public function testStreamShouldEmitResponseWithEmptyBodyWhenResponseContainsStatusNoContent() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->once())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableNever()); + $body->on('end', $that->expectCallableOnce()); + $body->on('close', $that->expectCallableOnce()); + }); + $request->on('close', $this->expectCallableOnce()); + + $request->end(); + + $request->handleData("HTTP/1.1 204 No Content\r\n\r\n"); + } + + public function testStreamShouldEmitResponseWithEmptyBodyWhenResponseContainsStatusNotModifiedWithContentLengthGiven() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->once())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableNever()); + $body->on('end', $that->expectCallableOnce()); + $body->on('close', $that->expectCallableOnce()); + }); + $request->on('close', $this->expectCallableOnce()); + + $request->end(); + + $request->handleData("HTTP/1.1 304 Not Modified\r\nContent-Length: 100\r\n\r\n"); + } + + public function testStreamShouldEmitResponseWithEmptyBodyWhenRequestMethodIsHead() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("HEAD / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->once())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('HEAD', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableNever()); + $body->on('end', $that->expectCallableOnce()); + $body->on('close', $that->expectCallableOnce()); + }); + $request->on('close', $this->expectCallableOnce()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 100\r\n\r\n"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyUntilEndWhenResponseContainsContentLengthAndResponseBody() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->once())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableOnceWith('OK')); + $body->on('end', $that->expectCallableOnce()); + $body->on('close', $that->expectCallableOnce()); + }); + $request->on('close', $this->expectCallableOnce()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyWithoutDataWhenResponseContainsContentLengthWithoutResponseBody() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->never())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableNever()); + $body->on('end', $that->expectCallableNever()); + $body->on('close', $that->expectCallableNever()); + }); + $request->on('close', $this->expectCallableNever()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyWithDataWithoutEndWhenResponseContainsContentLengthWithIncompleteResponseBody() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->never())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableOnce('O')); + $body->on('end', $that->expectCallableNever()); + $body->on('close', $that->expectCallableNever()); + }); + $request->on('close', $this->expectCallableNever()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nO"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyUntilEndWhenResponseContainsTransferEncodingChunkedAndResponseBody() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->once())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableOnceWith('OK')); + $body->on('end', $that->expectCallableOnce()); + $body->on('close', $that->expectCallableOnce()); + }); + $request->on('close', $this->expectCallableOnce()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n2\r\nOK\r\n0\r\n\r\n"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyWithoutDataWhenResponseContainsTransferEncodingChunkedWithoutResponseBody() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->never())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableNever()); + $body->on('end', $that->expectCallableNever()); + $body->on('close', $that->expectCallableNever()); + }); + $request->on('close', $this->expectCallableNever()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyWithDataWithoutEndWhenResponseContainsTransferEncodingChunkedWithIncompleteResponseBody() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->never())->method('close'); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableOnceWith('O')); + $body->on('end', $that->expectCallableNever()); + $body->on('close', $that->expectCallableNever()); + }); + $request->on('close', $this->expectCallableNever()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n2\r\nO"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyWithDataWithoutEndWhenResponseContainsNoContentLengthAndIncompleteResponseBody() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->never())->method('close'); - $this->stream->expects($this->once())->method('write')->with("OPTIONS * HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableOnce('O')); + $body->on('end', $that->expectCallableNever()); + $body->on('close', $that->expectCallableNever()); + }); + $request->on('close', $this->expectCallableNever()); $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\n\r\nO"); + } + + public function testStreamShouldEmitResponseWithStreamingBodyUntilEndWhenResponseContainsNoContentLengthAndResponseBodyTerminatedByConnectionEndEvent() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n"); + $connection->expects($this->once())->method('close'); + + $endEvent = null; + $eventName = null; + $connection->expects($this->any())->method('on')->with($this->callback(function ($name) use (&$eventName) { + $eventName = $name; + return true; + }), $this->callback(function ($cb) use (&$endEvent, &$eventName) { + if ($eventName === 'end') { + $endEvent = $cb; + } + return true; + })); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'close'), '', '1.1'); + $request = new ClientRequestStream($this->connector, $requestData); + + $that = $this; + $request->on('response', function (ResponseInterface $response, ReadableStreamInterface $body) use ($that) { + $body->on('data', $that->expectCallableOnce('OK')); + $body->on('end', $that->expectCallableOnce()); + $body->on('close', $that->expectCallableOnce()); + }); + $request->on('close', $this->expectCallableOnce()); + + $request->end(); + + $request->handleData("HTTP/1.1 200 OK\r\n\r\nOK"); + + $this->assertNotNull($endEvent); + call_user_func($endEvent); // $endEvent() (PHP 5.4+) } /** @test */ public function postRequestShouldSendAPostRequest() { - $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); - $request = new ClientRequestStream($this->connector, $requestData); + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('write')->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\n\r\nsome post data$#")); - $this->successfulConnectionMock(); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); - $this->stream - ->expects($this->once()) - ->method('write') - ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\n\r\nsome post data$#")); + $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); + $request = new ClientRequestStream($this->connector, $requestData); $request->end('some post data'); @@ -232,17 +534,18 @@ public function postRequestShouldSendAPostRequest() /** @test */ public function writeWithAPostRequestShouldSendToTheStream() { - $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); - $request = new ClientRequestStream($this->connector, $requestData); - - $this->successfulConnectionMock(); - - $this->stream->expects($this->exactly(3))->method('write')->withConsecutive( + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->exactly(3))->method('write')->withConsecutive( array($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\n\r\nsome$#")), array($this->identicalTo("post")), array($this->identicalTo("data")) ); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); + $request = new ClientRequestStream($this->connector, $requestData); + $request->write("some"); $request->write("post"); $request->end("data"); @@ -255,18 +558,20 @@ public function writeWithAPostRequestShouldSendToTheStream() /** @test */ public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent() { - $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); - $request = new ClientRequestStream($this->connector, $requestData); - - $resolveConnection = $this->successfulAsyncConnectionMock(); - - $this->stream->expects($this->exactly(2))->method('write')->withConsecutive( + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->exactly(2))->method('write')->withConsecutive( array($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\n\r\nsomepost$#")), array($this->identicalTo("data")) )->willReturn( true ); + $deferred = new Deferred(); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn($deferred->promise()); + + $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); + $request = new ClientRequestStream($this->connector, $requestData); + $this->assertFalse($request->write("some")); $this->assertFalse($request->write("post")); @@ -276,7 +581,7 @@ public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent $request->end(); }); - $resolveConnection(); + $deferred->resolve($connection); $request->handleData("HTTP/1.0 200 OK\r\n"); $request->handleData("Content-Type: text/plain\r\n"); @@ -286,23 +591,24 @@ public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent /** @test */ public function writeWithAPostRequestShouldForwardDrainEventIfFirstChunkExceedsBuffer() { - $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); - $request = new ClientRequestStream($this->connector, $requestData); - - $this->stream = $this->getMockBuilder('React\Socket\Connection') + $connection = $this->getMockBuilder('React\Socket\Connection') ->disableOriginalConstructor() ->setMethods(array('write')) ->getMock(); - $resolveConnection = $this->successfulAsyncConnectionMock(); - - $this->stream->expects($this->exactly(2))->method('write')->withConsecutive( + $connection->expects($this->exactly(2))->method('write')->withConsecutive( array($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\n\r\nsomepost$#")), array($this->identicalTo("data")) )->willReturn( false ); + $deferred = new Deferred(); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn($deferred->promise()); + + $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); + $request = new ClientRequestStream($this->connector, $requestData); + $this->assertFalse($request->write("some")); $this->assertFalse($request->write("post")); @@ -312,8 +618,8 @@ public function writeWithAPostRequestShouldForwardDrainEventIfFirstChunkExceedsB $request->end(); }); - $resolveConnection(); - $this->stream->emit('drain'); + $deferred->resolve($connection); + $connection->emit('drain'); $request->handleData("HTTP/1.0 200 OK\r\n"); $request->handleData("Content-Type: text/plain\r\n"); @@ -323,17 +629,18 @@ public function writeWithAPostRequestShouldForwardDrainEventIfFirstChunkExceedsB /** @test */ public function pipeShouldPipeDataIntoTheRequestBody() { - $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); - $request = new ClientRequestStream($this->connector, $requestData); - - $this->successfulConnectionMock(); - - $this->stream->expects($this->exactly(3))->method('write')->withConsecutive( + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->exactly(3))->method('write')->withConsecutive( array($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\n\r\nsome$#")), array($this->identicalTo("post")), array($this->identicalTo("data")) ); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + + $requestData = new Request('POST', 'http://www.example.com', array(), '', '1.0'); + $request = new ClientRequestStream($this->connector, $requestData); + $loop = $this ->getMockBuilder('React\EventLoop\LoopInterface') ->getMock(); @@ -356,14 +663,14 @@ public function pipeShouldPipeDataIntoTheRequestBody() */ public function writeShouldStartConnecting() { - $requestData = new Request('POST', 'http://www.example.com'); - $request = new ClientRequestStream($this->connector, $requestData); - $this->connector->expects($this->once()) ->method('connect') ->with('www.example.com:80') ->willReturn(new Promise(function () { })); + $requestData = new Request('POST', 'http://www.example.com'); + $request = new ClientRequestStream($this->connector, $requestData); + $request->write('test'); } @@ -372,14 +679,11 @@ public function writeShouldStartConnecting() */ public function endShouldStartConnectingAndChangeStreamIntoNonWritableMode() { + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(new Promise(function () { })); + $requestData = new Request('POST', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->connector->expects($this->once()) - ->method('connect') - ->with('www.example.com:80') - ->willReturn(new Promise(function () { })); - $request->end(); $this->assertFalse($request->isWritable()); @@ -416,12 +720,11 @@ public function writeAfterCloseReturnsFalse() */ public function endAfterCloseIsNoOp() { + $this->connector->expects($this->never())->method('connect'); + $requestData = new Request('POST', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->connector->expects($this->never()) - ->method('connect'); - $request->close(); $request->end(); } @@ -431,17 +734,13 @@ public function endAfterCloseIsNoOp() */ public function closeShouldCancelPendingConnectionAttempt() { - $requestData = new Request('POST', 'http://www.example.com'); - $request = new ClientRequestStream($this->connector, $requestData); - $promise = new Promise(function () {}, function () { throw new \RuntimeException(); }); + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn($promise); - $this->connector->expects($this->once()) - ->method('connect') - ->with('www.example.com:80') - ->willReturn($promise); + $requestData = new Request('POST', 'http://www.example.com'); + $request = new ClientRequestStream($this->connector, $requestData); $request->end(); @@ -465,35 +764,16 @@ public function requestShouldRemoveAllListenerAfterClosed() $this->assertCount(0, $request->listeners('close')); } - private function successfulConnectionMock() - { - call_user_func($this->successfulAsyncConnectionMock()); - } - - private function successfulAsyncConnectionMock() - { - $deferred = new Deferred(); - - $this->connector - ->expects($this->once()) - ->method('connect') - ->with('www.example.com:80') - ->will($this->returnValue($deferred->promise())); - - $stream = $this->stream; - return function () use ($deferred, $stream) { - $deferred->resolve($stream); - }; - } - /** @test */ public function multivalueHeader() { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + + $this->connector->expects($this->once())->method('connect')->with('www.example.com:80')->willReturn(\React\Promise\resolve($connection)); + $requestData = new Request('GET', 'http://www.example.com'); $request = new ClientRequestStream($this->connector, $requestData); - $this->successfulConnectionMock(); - $response = null; $request->on('response', $this->expectCallableOnce()); $request->on('response', function ($value) use (&$response) { diff --git a/tests/Io/SenderTest.php b/tests/Io/SenderTest.php index c2357a1a..4ef06442 100644 --- a/tests/Io/SenderTest.php +++ b/tests/Io/SenderTest.php @@ -302,6 +302,20 @@ public function getHttp10RequestShouldSendAGetRequestWithoutConnectionHeaderByDe $sender->send($request); } + /** @test */ + public function getHttp10RequestShouldSendAGetRequestWithoutConnectionHeaderEvenWhenConnectionKeepAliveHeaderIsSpecified() + { + $client = $this->getMockBuilder('React\Http\Client\Client')->disableOriginalConstructor()->getMock(); + $client->expects($this->once())->method('request')->with($this->callback(function (RequestInterface $request) { + return !$request->hasHeader('Connection'); + }))->willReturn($this->getMockBuilder('React\Http\Io\ClientRequestStream')->disableOriginalConstructor()->getMock()); + + $sender = new Sender($client); + + $request = new Request('GET', 'http://www.example.com', array('Connection' => 'keep-alive'), '', '1.0'); + $sender->send($request); + } + /** @test */ public function getHttp11RequestShouldSendAGetRequestWithConnectionCloseHeaderByDefault() { @@ -317,16 +331,16 @@ public function getHttp11RequestShouldSendAGetRequestWithConnectionCloseHeaderBy } /** @test */ - public function getHttp11RequestShouldSendAGetRequestWithGivenConnectionUpgradeHeader() + public function getHttp11RequestShouldSendAGetRequestWithConnectionCloseHeaderEvenWhenConnectionKeepAliveHeaderIsSpecified() { $client = $this->getMockBuilder('React\Http\Client\Client')->disableOriginalConstructor()->getMock(); $client->expects($this->once())->method('request')->with($this->callback(function (RequestInterface $request) { - return $request->getHeaderLine('Connection') === 'upgrade'; + return $request->getHeaderLine('Connection') === 'close'; }))->willReturn($this->getMockBuilder('React\Http\Io\ClientRequestStream')->disableOriginalConstructor()->getMock()); $sender = new Sender($client); - $request = new Request('GET', 'http://www.example.com', array('Connection' => 'upgrade'), '', '1.1'); + $request = new Request('GET', 'http://www.example.com', array('Connection' => 'keep-alive'), '', '1.1'); $sender->send($request); } diff --git a/tests/TestCase.php b/tests/TestCase.php index 1938ed89..72b7be8d 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -6,7 +6,7 @@ class TestCase extends BaseTestCase { - protected function expectCallableOnce() + public function expectCallableOnce() // protected (PHP 5.4+) { $mock = $this->createCallableMock(); $mock @@ -16,7 +16,7 @@ protected function expectCallableOnce() return $mock; } - protected function expectCallableOnceWith($value) + public function expectCallableOnceWith($value) // protected (PHP 5.4+) { $mock = $this->createCallableMock(); $mock @@ -27,7 +27,7 @@ protected function expectCallableOnceWith($value) return $mock; } - protected function expectCallableNever() + public function expectCallableNever() // protected (PHP 5.4+) { $mock = $this->createCallableMock(); $mock