Skip to content

Commit

Permalink
Merge pull request #405 from clue-labs/keepalive
Browse files Browse the repository at this point in the history
Support persistent connections (Connection: keep-alive)
  • Loading branch information
WyriHaximus authored Apr 10, 2021
2 parents 5944a08 + 84fbe78 commit c919cb7
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 24 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1698,11 +1698,12 @@ so you don't have to. For instance, if the client sends the request using the
HTTP/1.1 protocol version, the response message will also use the same protocol
version, no matter what version is returned from the request handler function.

Note that persistent connections (`Connection: keep-alive`) are currently
not supported.
As such, HTTP/1.1 response messages will automatically include a
`Connection: close` header, irrespective of what header values are
passed explicitly.
The server supports persistent connections. An appropriate `Connection: keep-alive`
or `Connection: close` response header will be added automatically, respecting the
matching request header value and HTTP default header values. The server is
responsible for handling the `Connection` response header, so you SHOULD NOT pass
this response header yourself, unless you explicitly want to override the user's
choice with a `Connection: close` response header.

### Middleware

Expand Down
13 changes: 10 additions & 3 deletions examples/99-server-benchmark-download.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
<?php

// A simple HTTP web server that can be used to benchmark requests per second and download speed
//
// $ php examples/99-server-benchmark-download.php 8080
//
// This example runs the web server on a single CPU core in order to measure the
// per core performance.
//
// $ curl http://localhost:8080/10g.bin > /dev/null
// $ wget http://localhost:8080/10g.bin -O /dev/null
// $ ab -n10 -c10 http://localhost:8080/1g.bin
// $ docker run -it --rm --net=host jordi/ab -n100000 -c10 http://localhost:8080/
// $ docker run -it --rm --net=host jordi/ab -n10 -c10 http://localhost:8080/1g.bin
// $ ab -n10 -c10 -k http://localhost:8080/1g.bin
// $ docker run -it --rm --net=host jordi/ab -n100000 -c10 -k http://localhost:8080/
// $ docker run -it --rm --net=host jordi/ab -n10 -c10 -k http://localhost:8080/1g.bin
// $ docker run -it --rm --net=host skandyla/wrk -t8 -c10 -d20 http://localhost:8080/

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
Expand Down
4 changes: 0 additions & 4 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ public function handle(ConnectionInterface $conn)
$stream->close();
}
});

$conn->on('close', function () use (&$buffer, &$fn) {
$fn = $buffer = null;
});
}

/**
Expand Down
42 changes: 35 additions & 7 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public function writeError(ConnectionInterface $conn, $code, ServerRequestInterf
$response = new Response(
$code,
array(
'Content-Type' => 'text/plain'
'Content-Type' => 'text/plain',
'Connection' => 'close' // we do not want to keep the connection open after an error
),
'Error ' . $code
);
Expand Down Expand Up @@ -273,17 +274,28 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
$chunked = true;
} else {
// remove any Transfer-Encoding headers unless automatically enabled above
// we do not want to keep connection alive, so pretend we received "Connection: close" request header
$response = $response->withoutHeader('Transfer-Encoding');
$request = $request->withHeader('Connection', 'close');
}

// assign "Connection" header automatically
$persist = false;
if ($code === 101) {
// 101 (Switching Protocols) response uses Connection: upgrade header
// This implies that this stream now uses another protocol and we
// may not persist this connection for additional requests.
$response = $response->withHeader('Connection', 'upgrade');
} elseif ($version === '1.1') {
// HTTP/1.1 assumes persistent connection support by default
// we do not support persistent connections, so let the client know
} elseif (\strtolower($request->getHeaderLine('Connection')) === 'close' || \strtolower($response->getHeaderLine('Connection')) === 'close') {
// obey explicit "Connection: close" request header or response header if present
$response = $response->withHeader('Connection', 'close');
} elseif ($version === '1.1') {
// HTTP/1.1 assumes persistent connection support by default, so we don't need to inform client
$persist = true;
} elseif (strtolower($request->getHeaderLine('Connection')) === 'keep-alive') {
// obey explicit "Connection: keep-alive" request header and inform client
$persist = true;
$response = $response->withHeader('Connection', 'keep-alive');
} else {
// remove any Connection headers unless automatically enabled above
$response = $response->withoutHeader('Connection');
Expand Down Expand Up @@ -328,9 +340,15 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
$body = "0\r\n\r\n";
}

// end connection after writing response headers and body
// write response headers and body
$connection->write($headers . "\r\n" . $body);
$connection->end();

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->parser->handle($connection);
} else {
$connection->end();
}
return;
}

Expand All @@ -345,6 +363,16 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// in particular this may only fire on a later read/write attempt.
$connection->on('close', array($body, 'close'));

$body->pipe($connection);
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$connection->removeListener('close', array($body, 'close'));
$parser->handle($connection);
});
} else {
$body->pipe($connection);
}
}
}
6 changes: 3 additions & 3 deletions tests/FunctionalServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ public function testConnectWithThroughStreamReturnsDataAsGiven()
$server->listen($socket);

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
Expand Down Expand Up @@ -703,7 +703,7 @@ public function testConnectWithThroughStreamReturnedFromPromiseReturnsDataAsGive
$server->listen($socket);

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
Expand Down Expand Up @@ -737,7 +737,7 @@ public function testConnectWithClosedThroughStreamReturnsNoData()
$server->listen($socket);

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
Expand Down
197 changes: 195 additions & 2 deletions tests/Io/StreamingServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\EventLoop\Factory;
use React\Http\Io\StreamingServer;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Promise\Promise;
use React\Stream\ThroughStream;
use React\Tests\Http\SocketServerStub;
Expand Down Expand Up @@ -957,7 +958,7 @@ function ($data) use (&$buffer) {
$data = "GET / HTTP/1.1\r\n\r\n";
$this->connection->emit('data', array($data));

$this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\nConnection: close\r\n\r\nfoo", $buffer);
$this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\n\r\nfoo", $buffer);
}

public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse()
Expand Down Expand Up @@ -992,7 +993,7 @@ function ($data) use (&$buffer) {
$data = "GET / HTTP/1.1\r\nUpgrade: demo\r\n\r\n";
$this->connection->emit('data', array($data));

$this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\nConnection: close\r\n\r\nfoo", $buffer);
$this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nfoo", $buffer);
}

public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength()
Expand Down Expand Up @@ -2813,6 +2814,198 @@ public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() {
$this->assertEquals(array('test' => 'abc,def', 'hello' => 'world'), $requestValidation->getCookieParams());
}

public function testNewConnectionWillInvokeParserOnce()
{
$server = new StreamingServer(Factory::create(), $this->expectCallableNever());

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
}

public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneForHttp10()
{
$request = new ServerRequest('GET', 'http://localhost/', array(), '', '1.0');

$server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request));

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->once())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneForHttp11ConnectionClose()
{
$request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'close'));

$server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request));

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->once())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneAndRequestHandlerReturnsConnectionClose()
{
$request = new ServerRequest('GET', 'http://localhost/');

$server = new StreamingServer(Factory::create(), function () {
return new Response(200, array('Connection' => 'close'));
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->once())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenConnectionCanBeKeptAliveForHttp11Default()
{
$request = new ServerRequest('GET', 'http://localhost/');

$server = new StreamingServer(Factory::create(), function () {
return new Response();
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->exactly(2))->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenConnectionCanBeKeptAliveForHttp10ConnectionKeepAlive()
{
$request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'keep-alive'), '', '1.0');

$server = new StreamingServer(Factory::create(), function () {
return new Response();
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->exactly(2))->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandlerWhenStreamingResponseBodyKeepsStreaming()
{
$request = new ServerRequest('GET', 'http://localhost/');

$body = new ThroughStream();
$server = new StreamingServer(Factory::create(), function () use ($body) {
return new Response(200, array(), $body);
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenStreamingResponseBodyEnds()
{
$request = new ServerRequest('GET', 'http://localhost/');

$body = new ThroughStream();
$server = new StreamingServer(Factory::create(), function () use ($body) {
return new Response(200, array(), $body);
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->exactly(2))->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->exactly(2))->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);

$this->assertCount(2, $this->connection->listeners('close'));
$body->end();
$this->assertCount(1, $this->connection->listeners('close'));
}

private function createGetRequest()
{
$data = "GET / HTTP/1.1\r\n";
Expand Down

0 comments on commit c919cb7

Please sign in to comment.