From 7aa08f01583c765261d08d1191f0239324f18012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Fri, 6 Apr 2018 17:33:44 +0200 Subject: [PATCH 1/2] Support persistent connections (Connection: keep-alive) --- README.md | 11 +- src/Io/StreamingServer.php | 41 +++++-- tests/FunctionalServerTest.php | 6 +- tests/Io/StreamingServerTest.php | 195 ++++++++++++++++++++++++++++++- 4 files changed, 236 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 09a4ecd2..4625a81e 100644 --- a/README.md +++ b/README.md @@ -1698,11 +1698,12 @@ so you don't have to. For instance, if the client sends the request using the HTTP/1.1 protocol version, the response message will also use the same protocol version, no matter what version is returned from the request handler function. -Note that persistent connections (`Connection: keep-alive`) are currently -not supported. -As such, HTTP/1.1 response messages will automatically include a -`Connection: close` header, irrespective of what header values are -passed explicitly. +The server supports persistent connections. An appropriate `Connection: keep-alive` +or `Connection: close` response header will be added automatically, respecting the +matching request header value and HTTP default header values. The server is +responsible for handling the `Connection` response header, so you SHOULD NOT pass +this response header yourself, unless you explicitly want to override the user's +choice with a `Connection: close` response header. ### Middleware diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 0674d960..076a4ff0 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -210,7 +210,8 @@ public function writeError(ConnectionInterface $conn, $code, ServerRequestInterf $response = new Response( $code, array( - 'Content-Type' => 'text/plain' + 'Content-Type' => 'text/plain', + 'Connection' => 'close' // we do not want to keep the connection open after an error ), 'Error ' . $code ); @@ -273,17 +274,28 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt $chunked = true; } else { // remove any Transfer-Encoding headers unless automatically enabled above + // we do not want to keep connection alive, so pretend we received "Connection: close" request header $response = $response->withoutHeader('Transfer-Encoding'); + $request = $request->withHeader('Connection', 'close'); } // assign "Connection" header automatically + $persist = false; if ($code === 101) { // 101 (Switching Protocols) response uses Connection: upgrade header + // This implies that this stream now uses another protocol and we + // may not persist this connection for additional requests. $response = $response->withHeader('Connection', 'upgrade'); - } elseif ($version === '1.1') { - // HTTP/1.1 assumes persistent connection support by default - // we do not support persistent connections, so let the client know + } elseif (\strtolower($request->getHeaderLine('Connection')) === 'close' || \strtolower($response->getHeaderLine('Connection')) === 'close') { + // obey explicit "Connection: close" request header or response header if present $response = $response->withHeader('Connection', 'close'); + } elseif ($version === '1.1') { + // HTTP/1.1 assumes persistent connection support by default, so we don't need to inform client + $persist = true; + } elseif (strtolower($request->getHeaderLine('Connection')) === 'keep-alive') { + // obey explicit "Connection: keep-alive" request header and inform client + $persist = true; + $response = $response->withHeader('Connection', 'keep-alive'); } else { // remove any Connection headers unless automatically enabled above $response = $response->withoutHeader('Connection'); @@ -328,9 +340,15 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt $body = "0\r\n\r\n"; } - // end connection after writing response headers and body + // write response headers and body $connection->write($headers . "\r\n" . $body); - $connection->end(); + + // either wait for next request over persistent connection or end connection + if ($persist) { + $this->parser->handle($connection); + } else { + $connection->end(); + } return; } @@ -345,6 +363,15 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // in particular this may only fire on a later read/write attempt. $connection->on('close', array($body, 'close')); - $body->pipe($connection); + // write streaming body and then wait for next request over persistent connection + if ($persist) { + $body->pipe($connection, array('end' => false)); + $parser = $this->parser; + $body->on('end', function () use ($connection, $parser) { + $parser->handle($connection); + }); + } else { + $body->pipe($connection); + } } } diff --git a/tests/FunctionalServerTest.php b/tests/FunctionalServerTest.php index bd127ab7..41cf31db 100644 --- a/tests/FunctionalServerTest.php +++ b/tests/FunctionalServerTest.php @@ -662,7 +662,7 @@ public function testConnectWithThroughStreamReturnsDataAsGiven() $server->listen($socket); $result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) { - $conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n"); + $conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n"); $conn->once('data', function () use ($conn) { $conn->write('hello'); @@ -703,7 +703,7 @@ public function testConnectWithThroughStreamReturnedFromPromiseReturnsDataAsGive $server->listen($socket); $result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) { - $conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n"); + $conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n"); $conn->once('data', function () use ($conn) { $conn->write('hello'); @@ -737,7 +737,7 @@ public function testConnectWithClosedThroughStreamReturnsNoData() $server->listen($socket); $result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) { - $conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n"); + $conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n"); $conn->once('data', function () use ($conn) { $conn->write('hello'); diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index d2401a06..40187f06 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -6,6 +6,7 @@ use React\EventLoop\Factory; use React\Http\Io\StreamingServer; use React\Http\Message\Response; +use React\Http\Message\ServerRequest; use React\Promise\Promise; use React\Stream\ThroughStream; use React\Tests\Http\SocketServerStub; @@ -957,7 +958,7 @@ function ($data) use (&$buffer) { $data = "GET / HTTP/1.1\r\n\r\n"; $this->connection->emit('data', array($data)); - $this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\nConnection: close\r\n\r\nfoo", $buffer); + $this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\n\r\nfoo", $buffer); } public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse() @@ -992,7 +993,7 @@ function ($data) use (&$buffer) { $data = "GET / HTTP/1.1\r\nUpgrade: demo\r\n\r\n"; $this->connection->emit('data', array($data)); - $this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\nConnection: close\r\n\r\nfoo", $buffer); + $this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nfoo", $buffer); } public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength() @@ -2813,6 +2814,196 @@ public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() { $this->assertEquals(array('test' => 'abc,def', 'hello' => 'world'), $requestValidation->getCookieParams()); } + public function testNewConnectionWillInvokeParserOnce() + { + $server = new StreamingServer(Factory::create(), $this->expectCallableNever()); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->once())->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + } + + public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneForHttp10() + { + $request = new ServerRequest('GET', 'http://localhost/', array(), '', '1.0'); + + $server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request)); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->once())->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $this->connection->expects($this->once())->method('write'); + $this->connection->expects($this->once())->method('end'); + + // pretend parser just finished parsing + $server->handleRequest($this->connection, $request); + } + + public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneForHttp11ConnectionClose() + { + $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'close')); + + $server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request)); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->once())->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $this->connection->expects($this->once())->method('write'); + $this->connection->expects($this->once())->method('end'); + + // pretend parser just finished parsing + $server->handleRequest($this->connection, $request); + } + + public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneAndRequestHandlerReturnsConnectionClose() + { + $request = new ServerRequest('GET', 'http://localhost/'); + + $server = new StreamingServer(Factory::create(), function () { + return new Response(200, array('Connection' => 'close')); + }); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->once())->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $this->connection->expects($this->once())->method('write'); + $this->connection->expects($this->once())->method('end'); + + // pretend parser just finished parsing + $server->handleRequest($this->connection, $request); + } + + public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenConnectionCanBeKeptAliveForHttp11Default() + { + $request = new ServerRequest('GET', 'http://localhost/'); + + $server = new StreamingServer(Factory::create(), function () { + return new Response(); + }); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->exactly(2))->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $this->connection->expects($this->once())->method('write'); + $this->connection->expects($this->never())->method('end'); + + // pretend parser just finished parsing + $server->handleRequest($this->connection, $request); + } + + public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenConnectionCanBeKeptAliveForHttp10ConnectionKeepAlive() + { + $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'keep-alive'), '', '1.0'); + + $server = new StreamingServer(Factory::create(), function () { + return new Response(); + }); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->exactly(2))->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $this->connection->expects($this->once())->method('write'); + $this->connection->expects($this->never())->method('end'); + + // pretend parser just finished parsing + $server->handleRequest($this->connection, $request); + } + + public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandlerWhenStreamingResponseBodyKeepsStreaming() + { + $request = new ServerRequest('GET', 'http://localhost/'); + + $body = new ThroughStream(); + $server = new StreamingServer(Factory::create(), function () use ($body) { + return new Response(200, array(), $body); + }); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->once())->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $this->connection->expects($this->once())->method('write'); + $this->connection->expects($this->never())->method('end'); + + // pretend parser just finished parsing + $server->handleRequest($this->connection, $request); + } + + public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenStreamingResponseBodyEnds() + { + $request = new ServerRequest('GET', 'http://localhost/'); + + $body = new ThroughStream(); + $server = new StreamingServer(Factory::create(), function () use ($body) { + return new Response(200, array(), $body); + }); + + $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock(); + $parser->expects($this->exactly(2))->method('handle'); + + $ref = new \ReflectionProperty($server, 'parser'); + $ref->setAccessible(true); + $ref->setValue($server, $parser); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + $this->connection->expects($this->exactly(2))->method('write'); + $this->connection->expects($this->never())->method('end'); + + // pretend parser just finished parsing + $server->handleRequest($this->connection, $request); + + $body->end(); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; From 84fbe7882e0b948ae03abd67a57c5e88f2eac055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Tue, 23 Mar 2021 19:54:15 +0100 Subject: [PATCH 2/2] Improve benchmarking instructions and dangling memory references --- examples/99-server-benchmark-download.php | 13 ++++++++++--- src/Io/RequestHeaderParser.php | 4 ---- src/Io/StreamingServer.php | 3 ++- tests/Io/StreamingServerTest.php | 2 ++ 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/examples/99-server-benchmark-download.php b/examples/99-server-benchmark-download.php index 5fdd55c9..1a49df72 100644 --- a/examples/99-server-benchmark-download.php +++ b/examples/99-server-benchmark-download.php @@ -1,11 +1,18 @@ /dev/null // $ wget http://localhost:8080/10g.bin -O /dev/null -// $ ab -n10 -c10 http://localhost:8080/1g.bin -// $ docker run -it --rm --net=host jordi/ab -n100000 -c10 http://localhost:8080/ -// $ docker run -it --rm --net=host jordi/ab -n10 -c10 http://localhost:8080/1g.bin +// $ ab -n10 -c10 -k http://localhost:8080/1g.bin +// $ docker run -it --rm --net=host jordi/ab -n100000 -c10 -k http://localhost:8080/ +// $ docker run -it --rm --net=host jordi/ab -n10 -c10 -k http://localhost:8080/1g.bin +// $ docker run -it --rm --net=host skandyla/wrk -t8 -c10 -d20 http://localhost:8080/ use Evenement\EventEmitter; use Psr\Http\Message\ServerRequestInterface; diff --git a/src/Io/RequestHeaderParser.php b/src/Io/RequestHeaderParser.php index 53f7ff09..5125c77f 100644 --- a/src/Io/RequestHeaderParser.php +++ b/src/Io/RequestHeaderParser.php @@ -106,10 +106,6 @@ public function handle(ConnectionInterface $conn) $stream->close(); } }); - - $conn->on('close', function () use (&$buffer, &$fn) { - $fn = $buffer = null; - }); } /** diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 076a4ff0..e20ddf48 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -367,7 +367,8 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt if ($persist) { $body->pipe($connection, array('end' => false)); $parser = $this->parser; - $body->on('end', function () use ($connection, $parser) { + $body->on('end', function () use ($connection, $parser, $body) { + $connection->removeListener('close', array($body, 'close')); $parser->handle($connection); }); } else { diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index 40187f06..0dde7a0c 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -3001,7 +3001,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle // pretend parser just finished parsing $server->handleRequest($this->connection, $request); + $this->assertCount(2, $this->connection->listeners('close')); $body->end(); + $this->assertCount(1, $this->connection->listeners('close')); } private function createGetRequest()