diff --git a/README.md b/README.md index 50797fe6..357e1b3e 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ multiple concurrent HTTP requests without blocking. * [ServerRequest](#serverrequest) * [ResponseException](#responseexception) * [React\Http\Middleware](#reacthttpmiddleware) + * [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware) * [StreamingRequestMiddleware](#streamingrequestmiddleware) * [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware) * [RequestBodyBufferMiddleware](#requestbodybuffermiddleware) @@ -2498,6 +2499,22 @@ access its underlying response object. ### React\Http\Middleware +#### InactiveConnectionTimeoutMiddleware + +The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the +`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly open. + +The following example configures the `HttpServer` to close any inactive connections after one and a half second: + +```php +$http = new React\Http\HttpServer( + new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5), + $handler +); +``` +> Internally, this class is used as a "value object" to override the default timeout of one minute. + As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + #### StreamingRequestMiddleware The `React\Http\Middleware\StreamingRequestMiddleware` can be used to diff --git a/src/HttpServer.php b/src/HttpServer.php index f2334733..004a1e06 100644 --- a/src/HttpServer.php +++ b/src/HttpServer.php @@ -8,6 +8,7 @@ use React\Http\Io\IniUtil; use React\Http\Io\MiddlewareRunner; use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\LimitConcurrentRequestsMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Http\Middleware\RequestBodyBufferMiddleware; @@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop) } $streaming = false; + $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT; foreach ((array) $requestHandlers as $handler) { if ($handler instanceof StreamingRequestMiddleware) { $streaming = true; - break; + } + if ($handler instanceof InactiveConnectionTimeoutMiddleware) { + $idleConnectTimeout = $handler->getTimeout(); } } @@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop) * doing anything with the request. */ $middleware = \array_filter($middleware, function ($handler) { - return !($handler instanceof StreamingRequestMiddleware); + return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware); }); - $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware)); + $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectTimeout); $that = $this; $this->streamingServer->on('error', function ($error) use ($that) { diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index dd4c0584..b36676b4 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -8,6 +8,7 @@ use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Promise; use React\Promise\CancellablePromiseInterface; use React\Promise\PromiseInterface; @@ -85,6 +86,7 @@ final class StreamingServer extends EventEmitter private $callback; private $parser; private $loop; + private $idleConnectionTimeout; /** * Creates an HTTP server that invokes the given callback for each incoming HTTP request @@ -96,15 +98,17 @@ final class StreamingServer extends EventEmitter * * @param LoopInterface $loop * @param callable $requestHandler + * @param float $idleConnectTimeout * @see self::listen() */ - public function __construct(LoopInterface $loop, $requestHandler) + public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT) { if (!\is_callable($requestHandler)) { throw new \InvalidArgumentException('Invalid request handler given'); } $this->loop = $loop; + $this->idleConnectionTimeout = $idleConnectTimeout; $this->callback = $requestHandler; $this->parser = new RequestHeaderParser(); @@ -134,7 +138,27 @@ public function __construct(LoopInterface $loop, $requestHandler) */ public function listen(ServerInterface $socket) { - $socket->on('connection', array($this->parser, 'handle')); + $socket->on('connection', array($this, 'handle')); + } + + /** @internal */ + public function handle(ConnectionInterface $conn) + { + $timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) { + $conn->close(); + }); + $loop = $this->loop; + $conn->once('data', function () use ($loop, $timer) { + $loop->cancelTimer($timer); + }); + $conn->on('end', function () use ($loop, $timer) { + $loop->cancelTimer($timer); + }); + $conn->on('close', function () use ($loop, $timer) { + $loop->cancelTimer($timer); + }); + + $this->parser->handle($conn); } /** @internal */ @@ -350,7 +374,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // either wait for next request over persistent connection or end connection if ($persist) { - $this->parser->handle($connection); + $this->handle($connection); } else { $connection->end(); } @@ -371,10 +395,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // write streaming body and then wait for next request over persistent connection if ($persist) { $body->pipe($connection, array('end' => false)); - $parser = $this->parser; - $body->on('end', function () use ($connection, $parser, $body) { + $that = $this; + $body->on('end', function () use ($connection, $that, $body) { $connection->removeListener('close', array($body, 'close')); - $parser->handle($connection); + $that->handle($connection); }); } else { $body->pipe($connection); diff --git a/src/Middleware/InactiveConnectionTimeoutMiddleware.php b/src/Middleware/InactiveConnectionTimeoutMiddleware.php new file mode 100644 index 00000000..7ba0200a --- /dev/null +++ b/src/Middleware/InactiveConnectionTimeoutMiddleware.php @@ -0,0 +1,58 @@ + Internally, this class is used as a "value object" to override the default timeout of one minute. + * As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + */ +final class InactiveConnectionTimeoutMiddleware +{ + const DEFAULT_TIMEOUT = 60; + + /** + * @var float + */ + private $timeout; + + /** + * @param float $timeout + */ + public function __construct($timeout = self::DEFAULT_TIMEOUT) + { + $this->timeout = $timeout; + } + + public function __invoke(ServerRequestInterface $request, $next) + { + return $next($request); + } + + /** + * @return float + */ + public function getTimeout() + { + return $this->timeout; + } +} diff --git a/tests/HttpServerTest.php b/tests/HttpServerTest.php index 9200aa66..d6115e2b 100644 --- a/tests/HttpServerTest.php +++ b/tests/HttpServerTest.php @@ -7,6 +7,8 @@ use React\EventLoop\Factory; use React\Http\HttpServer; use React\Http\Io\IniUtil; +use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Promise; use React\Promise\Deferred; @@ -254,6 +256,19 @@ function (ServerRequestInterface $request) use (&$streaming) { $this->assertEquals(true, $streaming); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $loop = Factory::create(); + $http = new HttpServer($loop, new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); + + $http->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $loop->run(); + } + public function testForwardErrors() { $exception = new \Exception(); @@ -434,7 +449,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency() public function testConstructFiltersOutConfigurationMiddlewareBefore() { - $http = new HttpServer(new StreamingRequestMiddleware(), function () { }); + $http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { }); $ref = new \ReflectionProperty($http, 'streamingServer'); $ref->setAccessible(true); diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index ccafe338..92ac8261 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -58,6 +58,19 @@ public function testRequestEventWillNotBeEmittedForIncompleteHeaders() $this->connection->emit('data', array($data)); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $loop = Factory::create(); + $server = new StreamingServer($loop, $this->expectCallableNever(), 0.1); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $loop->run(); + } + public function testRequestEventIsEmitted() { $server = new StreamingServer(Factory::create(), $this->expectCallableOnce()); @@ -3196,9 +3209,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle // pretend parser just finished parsing $server->handleRequest($this->connection, $request); - $this->assertCount(2, $this->connection->listeners('close')); + $this->assertCount(3, $this->connection->listeners('close')); $body->end(); - $this->assertCount(1, $this->connection->listeners('close')); + $this->assertCount(3, $this->connection->listeners('close')); } private function createGetRequest()