diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3666cd47..0d890c63 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,6 +9,7 @@ jobs: name: PHPUnit (PHP ${{ matrix.php }}) runs-on: ubuntu-22.04 strategy: + fail-fast: false matrix: php: - 8.3 diff --git a/README.md b/README.md index d0550642..21b36017 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ multiple concurrent HTTP requests without blocking. * [Uri](#uri) * [ResponseException](#responseexception) * [React\Http\Middleware](#reacthttpmiddleware) + * [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware) * [StreamingRequestMiddleware](#streamingrequestmiddleware) * [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware) * [RequestBodyBufferMiddleware](#requestbodybuffermiddleware) @@ -2692,6 +2693,25 @@ access its underlying response object. ### React\Http\Middleware +#### InactiveConnectionTimeoutMiddleware + +The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the +`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly +open. The default is `60` seconds of inactivity and should only be changed if you know what you are doing. + +The following example configures the `HttpServer` to close any inactive connections after one and a half second: + +```php +$http = new React\Http\HttpServer( + new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5), + $handler +); +``` +> Internally, this class is used as a "value object" to override the default timeout of one minute. + As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + This timeout is only in effect if we expect data from the client, not when we are writing data to + the client. + #### StreamingRequestMiddleware The `React\Http\Middleware\StreamingRequestMiddleware` can be used to diff --git a/src/HttpServer.php b/src/HttpServer.php index f2334733..2dc8218c 100644 --- a/src/HttpServer.php +++ b/src/HttpServer.php @@ -8,6 +8,7 @@ use React\Http\Io\IniUtil; use React\Http\Io\MiddlewareRunner; use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\LimitConcurrentRequestsMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Http\Middleware\RequestBodyBufferMiddleware; @@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop) } $streaming = false; + $idleConnectionTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT; foreach ((array) $requestHandlers as $handler) { if ($handler instanceof StreamingRequestMiddleware) { $streaming = true; - break; + } + if ($handler instanceof InactiveConnectionTimeoutMiddleware) { + $idleConnectionTimeout = $handler->getTimeout(); } } @@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop) * doing anything with the request. */ $middleware = \array_filter($middleware, function ($handler) { - return !($handler instanceof StreamingRequestMiddleware); + return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware); }); - $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware)); + $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectionTimeout); $that = $this; $this->streamingServer->on('error', function ($error) use ($that) { diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 143edaa8..e9d0914e 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter /** @var Clock */ private $clock; + /** @var LoopInterface */ + private $loop; + + /** @var int */ + private $idleConnectionTimeout; + /** * Creates an HTTP server that invokes the given callback for each incoming HTTP request * @@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter * connections in order to then parse incoming data as HTTP. * See also [listen()](#listen) for more details. * - * @param LoopInterface $loop * @param callable $requestHandler + * @param int $idleConnectionTimeout * @see self::listen() */ - public function __construct(LoopInterface $loop, $requestHandler) + public function __construct(LoopInterface $loop, $requestHandler, $idleConnectionTimeout) { if (!\is_callable($requestHandler)) { throw new \InvalidArgumentException('Invalid request handler given'); } + $this->loop = $loop; $this->callback = $requestHandler; $this->clock = new Clock($loop); $this->parser = new RequestHeaderParser($this->clock); + $this->idleConnectionTimeout = $idleConnectionTimeout; $that = $this; $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) { @@ -134,7 +142,35 @@ public function __construct(LoopInterface $loop, $requestHandler) */ public function listen(ServerInterface $socket) { - $socket->on('connection', array($this->parser, 'handle')); + $socket->on('connection', array($this, 'handleConnection')); + } + + /** @internal */ + public function handleConnection(ConnectionInterface $connection) + { + $idleConnectionTimeout = $this->idleConnectionTimeout; + $loop = $this->loop; + $idleConnectionTimeoutHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler) { + $connection->removeListener('close', $closeEventHandler); + $connection->removeListener('data', $dataEventHandler); + + $connection->close(); + }; + $timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + $closeEventHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler, $loop, &$timer) { + $connection->removeListener('close', $closeEventHandler); + $connection->removeListener('data', $dataEventHandler); + + $loop->cancelTimer($timer); + }; + $dataEventHandler = function () use ($loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler, &$timer) { + $loop->cancelTimer($timer); + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + }; + $connection->on('close', $closeEventHandler); + $connection->on('data', $dataEventHandler); + + $this->parseRequest($connection); } /** @internal */ @@ -372,7 +408,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // either wait for next request over persistent connection or end connection if ($persist) { - $this->parser->handle($connection); + $this->parseRequest($connection); } else { $connection->end(); } @@ -393,13 +429,67 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // write streaming body and then wait for next request over persistent connection if ($persist) { $body->pipe($connection, array('end' => false)); - $parser = $this->parser; - $body->on('end', function () use ($connection, $parser, $body) { + $that = $this; + $body->on('end', function () use ($connection, $body, &$that) { $connection->removeListener('close', array($body, 'close')); - $parser->handle($connection); + $that->parseRequest($connection); }); } else { $body->pipe($connection); } } + + /** + * @internal + */ + public function parseRequest(ConnectionInterface $connection) + { + $idleConnectionTimeout = $this->idleConnectionTimeout; + $loop = $this->loop; + $parser = $this->parser; + $idleConnectionTimeoutHandler = function () use ($connection, $parser, &$removeTimerHandler) { + $parser->removeListener('headers', $removeTimerHandler); + $parser->removeListener('error', $removeTimerHandler); + + $parser->emit('error', array( + new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT), + $connection + )); + }; + $timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + $removeTimerHandler = function ($requestOrError, $conn) use ($loop, &$timer, $parser, $connection, &$removeTimerHandler, $idleConnectionTimeout, $idleConnectionTimeoutHandler) { + if ($conn !== $connection) { + return; + } + + $loop->cancelTimer($timer); + $parser->removeListener('headers', $removeTimerHandler); + $parser->removeListener('error', $removeTimerHandler); + + if (!($requestOrError instanceof ServerRequestInterface)) { + return; + } + + $requestBody = $requestOrError->getBody(); + if (!($requestBody instanceof HttpBodyStream)) { + return; + } + + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + $requestBody->on('data', function () use (&$timer, $loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler) { + $loop->cancelTimer($timer); + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + }); + $requestBody->on('end', function () use (&$timer, $loop) { + $loop->cancelTimer($timer); + }); + $requestBody->on('close', function () use (&$timer, $loop) { + $loop->cancelTimer($timer); + }); + }; + $this->parser->on('headers', $removeTimerHandler); + $this->parser->on('error', $removeTimerHandler); + + $this->parser->handle($connection); + } } diff --git a/src/Middleware/InactiveConnectionTimeoutMiddleware.php b/src/Middleware/InactiveConnectionTimeoutMiddleware.php new file mode 100644 index 00000000..0ca4c759 --- /dev/null +++ b/src/Middleware/InactiveConnectionTimeoutMiddleware.php @@ -0,0 +1,62 @@ + Internally, this class is used as a "value object" to override the default timeout of one minute. + * As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + */ +final class InactiveConnectionTimeoutMiddleware +{ + /** + * @internal + */ + const DEFAULT_TIMEOUT = 60; + + /** + * @var float + */ + private $timeout; + + /** + * @param float $timeout + */ + public function __construct($timeout = self::DEFAULT_TIMEOUT) + { + $this->timeout = $timeout; + } + + public function __invoke(ServerRequestInterface $request, $next) + { + return $next($request); + } + + /** + * @return float + * @internal + */ + public function getTimeout() + { + return $this->timeout; + } +} diff --git a/tests/HttpServerTest.php b/tests/HttpServerTest.php index 72d48468..518af923 100644 --- a/tests/HttpServerTest.php +++ b/tests/HttpServerTest.php @@ -6,6 +6,8 @@ use React\EventLoop\Loop; use React\Http\HttpServer; use React\Http\Io\IniUtil; +use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Promise; use React\Promise\Deferred; @@ -60,6 +62,10 @@ public function testConstructWithoutLoopAssignsLoopAutomatically() $ref->setAccessible(true); $clock = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($streamingServer, 'parser'); + $ref->setAccessible(true); + $parser = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($clock, 'loop'); $ref->setAccessible(true); $loop = $ref->getValue($clock); @@ -257,6 +263,18 @@ function (ServerRequestInterface $request) use (&$streaming) { $this->assertEquals(true, $streaming); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); + + $http->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + Loop::run(); + } + public function testForwardErrors() { $exception = new \Exception(); @@ -439,7 +457,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency() public function testConstructFiltersOutConfigurationMiddlewareBefore() { - $http = new HttpServer(new StreamingRequestMiddleware(), function () { }); + $http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { }); $ref = new \ReflectionProperty($http, 'streamingServer'); $ref->setAccessible(true); diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index b4e3f2f8..6fdb9a6a 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -61,7 +61,7 @@ private function mockConnection(array $additionalMethods = null) public function testRequestEventWillNotBeEmittedForIncompleteHeaders() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -73,7 +73,7 @@ public function testRequestEventWillNotBeEmittedForIncompleteHeaders() public function testRequestEventIsEmitted() { - $server = new StreamingServer(Loop::get(), $this->expectCallableOnce()); + $server = $this->createStreamingServer($this->expectCallableOnce()); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -88,7 +88,7 @@ public function testRequestEventIsEmitted() public function testRequestEventIsEmittedForArrayCallable() { $this->called = null; - $server = new StreamingServer(Loop::get(), array($this, 'helperCallableOnce')); + $server = $this->createStreamingServer(array($this, 'helperCallableOnce')); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -108,7 +108,7 @@ public function testRequestEvent() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; }); @@ -141,7 +141,7 @@ public function testRequestEventWithSingleRequestHandlerArray() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; }); @@ -173,7 +173,7 @@ public function testRequestEventWithSingleRequestHandlerArray() public function testRequestGetWithHostAndCustomPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -195,7 +195,7 @@ public function testRequestGetWithHostAndCustomPort() public function testRequestGetWithHostAndHttpsPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -217,7 +217,7 @@ public function testRequestGetWithHostAndHttpsPort() public function testRequestGetWithHostAndDefaultPortWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -239,7 +239,7 @@ public function testRequestGetWithHostAndDefaultPortWillBeIgnored() public function testRequestGetHttp10WithoutHostWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -261,7 +261,7 @@ public function testRequestGetHttp10WithoutHostWillBeIgnored() public function testRequestGetHttp11WithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), 'var_dump'); + $server = $this->createStreamingServer('var_dump'); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -274,7 +274,7 @@ public function testRequestGetHttp11WithoutHostWillReject() public function testRequestOptionsAsterisk() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -294,7 +294,7 @@ public function testRequestOptionsAsterisk() public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -307,7 +307,7 @@ public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() public function testRequestConnectAuthorityForm() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -329,7 +329,7 @@ public function testRequestConnectAuthorityForm() public function testRequestConnectWithoutHostWillBePassesAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -351,7 +351,7 @@ public function testRequestConnectWithoutHostWillBePassesAsIs() public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -373,7 +373,7 @@ public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -394,7 +394,7 @@ public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() public function testRequestConnectOriginFormRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -406,7 +406,7 @@ public function testRequestConnectOriginFormRequestTargetWillReject() public function testRequestNonConnectWithAuthorityRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -420,7 +420,7 @@ public function testRequestWithoutHostEventUsesSocketAddress() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -446,7 +446,7 @@ public function testRequestAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -468,7 +468,7 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -488,7 +488,7 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() public function testRequestAbsoluteWithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -502,7 +502,7 @@ public function testRequestOptionsAsteriskEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -524,7 +524,7 @@ public function testRequestOptionsAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -544,7 +544,7 @@ public function testRequestOptionsAbsoluteEvent() public function testRequestPauseWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->pause(); }); @@ -564,7 +564,7 @@ public function testRequestPauseWillBeForwardedToConnection() public function testRequestResumeWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->resume(); }); @@ -584,7 +584,7 @@ public function testRequestResumeWillBeForwardedToConnection() public function testRequestCloseWillNotCloseConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); }); @@ -599,7 +599,7 @@ public function testRequestCloseWillNotCloseConnection() public function testRequestPauseAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->pause(); }); @@ -616,7 +616,7 @@ public function testRequestPauseAfterCloseWillNotBeForwarded() public function testRequestResumeAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->resume(); }); @@ -635,7 +635,7 @@ public function testRequestEventWithoutBodyWillNotEmitData() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($never) { $request->getBody()->on('data', $never); }); @@ -650,7 +650,7 @@ public function testRequestEventWithSecondDataEventWillEmitBodyData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); }); @@ -670,7 +670,7 @@ public function testRequestEventWithPartialBodyWillEmitData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); }); @@ -691,7 +691,7 @@ public function testRequestEventWithPartialBodyWillEmitData() public function testResponseContainsServerHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -721,7 +721,7 @@ public function testResponsePendingPromiseWillNotSendAnything() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($never) { return new Promise(function () { }, $never); }); @@ -751,7 +751,7 @@ public function testResponsePendingPromiseWillBeCancelledIfConnectionCloses() { $once = $this->expectCallableOnce(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { return new Promise(function () { }, $once); }); @@ -783,7 +783,7 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyChunkedEncod $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -812,13 +812,15 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamEndingWillSendEmptyBodyChunkedEncoded() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -849,6 +851,8 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10() @@ -856,7 +860,7 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10( $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -892,7 +896,7 @@ public function testResponseStreamWillBeClosedIfConnectionIsAlreadyClosed() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -947,7 +951,7 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -965,7 +969,7 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() public function testResponseUpgradeInResponseCanBeUsedToAdvertisePossibleUpgrade() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -997,11 +1001,13 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -1032,11 +1038,13 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 101, array( @@ -1076,7 +1084,7 @@ public function testResponseUpgradeSwitchingProtocolWithStreamWillPipeDataToConn { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 101, array( @@ -1117,7 +1125,7 @@ public function testResponseConnectMethodStreamWillPipeDataToConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -1155,7 +1163,7 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -1174,7 +1182,7 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() public function testResponseContainsSameRequestProtocolVersionAndChunkedBodyForHttp11() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1203,11 +1211,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsSameRequestProtocolVersionAndRawBodyForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1241,7 +1251,7 @@ function ($data) use (&$buffer) { public function testResponseContainsNoResponseBodyForHeadRequest() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1270,6 +1280,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingResponse() @@ -1277,7 +1289,7 @@ public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingRes $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array('Content-Length' => '3'), @@ -1305,11 +1317,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 204, array(), @@ -1338,6 +1352,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatusResponseWithStreamingBody() @@ -1345,7 +1361,7 @@ public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContent $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 204, array('Content-Length' => '3'), @@ -1373,11 +1389,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), @@ -1405,11 +1423,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 0\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array('Content-Length' => 3), @@ -1437,11 +1457,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForHeadRequests() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Content-Length' => 3), @@ -1469,11 +1491,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), @@ -1502,6 +1526,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStreamingBody() @@ -1509,7 +1535,7 @@ public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStream $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 304, array('Content-Length' => '3'), @@ -1537,12 +1563,14 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testRequestInvalidHttpProtocolVersionWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1576,7 +1604,7 @@ function ($data) use (&$buffer) { public function testRequestOverflowWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1610,7 +1638,7 @@ function ($data) use (&$buffer) { public function testRequestInvalidWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1647,7 +1675,7 @@ public function testRequestContentLengthBodyDataWillEmitDataEventOnRequestStream $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1675,7 +1703,7 @@ public function testRequestChunkedTransferEncodingRequestWillEmitDecodedDataEven $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1706,7 +1734,7 @@ public function testRequestChunkedTransferEncodingWithAdditionalDataWontBeEmitte $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1735,7 +1763,7 @@ public function testRequestChunkedTransferEncodingEmpty() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1763,7 +1791,7 @@ public function testRequestChunkedTransferEncodingHeaderCanBeUpperCase() $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1793,7 +1821,7 @@ public function testRequestChunkedTransferEncodingCanBeMixedUpperAndLowerCase() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1820,7 +1848,7 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1851,7 +1879,7 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1883,7 +1911,7 @@ public function testRequestZeroContentLengthWillEmitEndEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1909,7 +1937,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1936,7 +1964,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1962,7 +1990,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); return \React\Promise\resolve(new Response()); }); @@ -1987,7 +2015,7 @@ public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2009,7 +2037,7 @@ public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2032,7 +2060,7 @@ public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWi public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2054,7 +2082,7 @@ public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithContentLengthWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2081,7 +2109,7 @@ public function testRequestWithoutBodyWillEmitEndOnRequestStream() $endEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ $request->getBody()->on('data', $dataEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('end', $endEvent); @@ -2105,7 +2133,7 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -2124,7 +2152,7 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() public function testResponseWithBodyStreamWillUseChunkedTransferEncodingByDefault() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -2158,7 +2186,7 @@ function ($data) use (&$buffer) { public function testResponseWithBodyStringWillOverwriteExplicitContentLengthAndTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2199,7 +2227,7 @@ public function testResponseContainsResponseBodyWithTransferEncodingChunkedForBo $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), @@ -2228,6 +2256,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("Transfer-Encoding: chunked", $buffer); $this->assertNotContainsString("Content-Length:", $buffer); $this->assertContainsString("body", $buffer); + + Loop::run(); } public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForLegacyHttp10() @@ -2236,7 +2266,7 @@ public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForL $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), @@ -2270,7 +2300,7 @@ function ($data) use (&$buffer) { public function testResponseWithCustomTransferEncodingWillBeIgnoredAndUseChunkedTransferEncodingInstead() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array( @@ -2307,7 +2337,7 @@ function ($data) use (&$buffer) { public function testResponseWithoutExplicitDateHeaderWillAddCurrentDateFromClock() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2345,7 +2375,7 @@ function ($data) use (&$buffer) { public function testResponseWithCustomDateHeaderOverwritesDefault() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array("Date" => "Tue, 15 Nov 1994 08:12:31 GMT") @@ -2378,7 +2408,7 @@ function ($data) use (&$buffer) { public function testResponseWithEmptyDateHeaderRemovesDateHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Date' => '') @@ -2411,7 +2441,7 @@ function ($data) use (&$buffer) { public function testResponseCanContainMultipleCookieHeaders() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2449,7 +2479,7 @@ function ($data) use (&$buffer) { public function testReponseWithExpectContinueRequestContainsContinueWithLaterResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2481,7 +2511,7 @@ function ($data) use (&$buffer) { public function testResponseWithExpectContinueRequestWontSendContinueForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2512,14 +2542,14 @@ function ($data) use (&$buffer) { public function testInvalidCallbackFunctionLeadsToException() { $this->setExpectedException('InvalidArgumentException'); - new StreamingServer(Loop::get(), 'invalid'); + $this->createStreamingServer('invalid'); } public function testResponseBodyStreamWillStreamDataWithChunkedTransferEncoding() { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array(), @@ -2558,7 +2588,7 @@ public function testResponseBodyStreamWithContentLengthWillStreamTillLengthWitho { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array('Content-Length' => 5), @@ -2596,7 +2626,7 @@ function ($data) use (&$buffer) { public function testResponseWithResponsePromise() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve(new Response()); }); @@ -2624,7 +2654,7 @@ function ($data) use (&$buffer) { public function testResponseReturnInvalidTypeWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return "invalid"; }); @@ -2658,7 +2688,7 @@ function ($data) use (&$buffer) { public function testResponseResolveWrongTypeInPromiseWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve("invalid"); }); @@ -2686,7 +2716,7 @@ function ($data) use (&$buffer) { public function testResponseRejectedPromiseWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject(new \Exception()); }); @@ -2717,7 +2747,7 @@ function ($data) use (&$buffer) { public function testResponseExceptionInCallbackWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { throw new \Exception('Bad call'); }); @@ -2748,7 +2778,7 @@ function ($data) use (&$buffer) { public function testResponseWithContentLengthHeaderForStringBodyOverwritesTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Transfer-Encoding' => 'chunked'), @@ -2784,7 +2814,7 @@ function ($data) use (&$buffer) { public function testResponseWillBeHandled() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2812,7 +2842,7 @@ function ($data) use (&$buffer) { public function testResponseExceptionThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { throw new \Exception('hello'); }); @@ -2850,7 +2880,7 @@ function ($data) use (&$buffer) { */ public function testResponseThrowableThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { throw new \Error('hello'); }); @@ -2893,7 +2923,7 @@ function ($data) use (&$buffer) { public function testResponseRejectOfNonExceptionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject('Invalid type'); }); @@ -2980,7 +3010,7 @@ public static function provideInvalidResponse() */ public function testInvalidResponseObjectWillResultInErrorMessage(ResponseInterface $response) { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($response) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($response) { return $response; }); @@ -3015,7 +3045,7 @@ function ($data) use (&$buffer) { public function testRequestServerRequestParams() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3049,7 +3079,7 @@ public function testRequestServerRequestParams() public function testRequestQueryParametersWillBeAddedToRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3069,7 +3099,7 @@ public function testRequestQueryParametersWillBeAddedToRequest() public function testRequestCookieWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3090,7 +3120,7 @@ public function testRequestCookieWillBeAddedToServerRequest() public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3111,7 +3141,7 @@ public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3131,7 +3161,7 @@ public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3150,7 +3180,7 @@ public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() public function testNewConnectionWillInvokeParserOnce() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3167,7 +3197,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array(), '', '1.0'); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = $this->createStreamingServer($this->expectCallableOnceWith($request)); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3190,7 +3220,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'close')); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = $this->createStreamingServer($this->expectCallableOnceWith($request)); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3213,7 +3243,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(200, array('Connection' => 'close')); }); @@ -3238,7 +3268,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(); }); @@ -3263,7 +3293,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'keep-alive'), '', '1.0'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(); }); @@ -3289,7 +3319,7 @@ public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandler $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = $this->createStreamingServer(function () use ($body) { return new Response(200, array(), $body); }); @@ -3315,7 +3345,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = $this->createStreamingServer(function () use ($body) { return new Response(200, array(), $body); }); @@ -3335,9 +3365,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle // pretend parser just finished parsing $server->handleRequest($this->connection, $request); - $this->assertCount(2, $this->connection->listeners('close')); + $this->assertCount(3, $this->connection->listeners('close')); $body->end(); - $this->assertCount(1, $this->connection->listeners('close')); + $this->assertCount(2, $this->connection->listeners('close')); } public function testCompletingARequestWillRemoveConnectionOnCloseListener() @@ -3346,7 +3376,7 @@ public function testCompletingARequestWillRemoveConnectionOnCloseListener() $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return \React\Promise\resolve(new Response()); }); @@ -3359,6 +3389,85 @@ public function testCompletingARequestWillRemoveConnectionOnCloseListener() $server->handleRequest($connection, $request); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $server = $this->createStreamingServer($this->expectCallableNever(), 0.1); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + Loop::run(); + } + + public function testIdleConnectionWillBeClosedAfterConfiguredTimeoutWhileParsingHeadersReturnsA408() + { + $buffer = ''; + + $this->connection + ->expects($this->any()) + ->method('write') + ->will( + $this->returnCallback( + function ($data) use (&$buffer) { + $buffer .= $data; + } + ) + ); + $this->connection->expects($this->once())->method('close'); + + $server = $this->createStreamingServer($this->expectCallableNever(), 0.1); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + $this->connection->emit('data', array('GET / HTTP/1.1')); + + Loop::run(); + + $this->assertStringStartsWith("HTTP/1.1 408 Request Timeout\r\n", $buffer); + $this->assertStringEndsWith("\r\n\r\nError 408: Request Timeout", $buffer); + } + + public function testIdleConnectionWillBeClosedAfterConfiguredTimeoutAfterParsingHeadersButBeforeFullBodyHasBeenSentReturnsA408() + { + $buffer = ''; + + $this->connection + ->expects($this->any()) + ->method('write') + ->will( + $this->returnCallback( + function ($data) use (&$buffer) { + $buffer .= $data; + } + ) + ); + $this->connection->expects($this->once())->method('close'); + + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { + $body = $request->getBody(); + + return new Promise(function () {}); + }, 0.1); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $data = ''; + $data .= "POST / HTTP/1.1\r\n"; + $data .= "Host: localhost\r\n"; + $data .= "Content-Length: 100\r\n"; + $data .= "\r\n"; + $data .= "incomplete"; + $this->connection->emit('data', array($data)); + + Loop::run(); + + $this->assertStringStartsWith("HTTP/1.1 408 Request Timeout\r\n", $buffer); + $this->assertStringEndsWith("\r\n\r\nError 408: Request Timeout", $buffer); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; @@ -3368,4 +3477,9 @@ private function createGetRequest() return $data; } + + private function createStreamingServer($requestHandler, $requestTimeout = 1) + { + return new StreamingServer(Loop::get(), $requestHandler, $requestTimeout); + } }