Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support persistent connections (Connection: keep-alive) #405

Merged
merged 2 commits into from
Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1698,11 +1698,12 @@ so you don't have to. For instance, if the client sends the request using the
HTTP/1.1 protocol version, the response message will also use the same protocol
version, no matter what version is returned from the request handler function.

Note that persistent connections (`Connection: keep-alive`) are currently
not supported.
As such, HTTP/1.1 response messages will automatically include a
`Connection: close` header, irrespective of what header values are
passed explicitly.
The server supports persistent connections. An appropriate `Connection: keep-alive`
or `Connection: close` response header will be added automatically, respecting the
matching request header value and HTTP default header values. The server is
responsible for handling the `Connection` response header, so you SHOULD NOT pass
this response header yourself, unless you explicitly want to override the user's
choice with a `Connection: close` response header.

### Middleware

Expand Down
13 changes: 10 additions & 3 deletions examples/99-server-benchmark-download.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
<?php

// A simple HTTP web server that can be used to benchmark requests per second and download speed
//
// $ php examples/99-server-benchmark-download.php 8080
//
// This example runs the web server on a single CPU core in order to measure the
// per core performance.
//
// $ curl http://localhost:8080/10g.bin > /dev/null
// $ wget http://localhost:8080/10g.bin -O /dev/null
// $ ab -n10 -c10 http://localhost:8080/1g.bin
// $ docker run -it --rm --net=host jordi/ab -n100000 -c10 http://localhost:8080/
// $ docker run -it --rm --net=host jordi/ab -n10 -c10 http://localhost:8080/1g.bin
// $ ab -n10 -c10 -k http://localhost:8080/1g.bin
// $ docker run -it --rm --net=host jordi/ab -n100000 -c10 -k http://localhost:8080/
// $ docker run -it --rm --net=host jordi/ab -n10 -c10 -k http://localhost:8080/1g.bin
// $ docker run -it --rm --net=host skandyla/wrk -t8 -c10 -d20 http://localhost:8080/

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
Expand Down
4 changes: 0 additions & 4 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ public function handle(ConnectionInterface $conn)
$stream->close();
}
});

$conn->on('close', function () use (&$buffer, &$fn) {
$fn = $buffer = null;
});
}

/**
Expand Down
42 changes: 35 additions & 7 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public function writeError(ConnectionInterface $conn, $code, ServerRequestInterf
$response = new Response(
$code,
array(
'Content-Type' => 'text/plain'
'Content-Type' => 'text/plain',
'Connection' => 'close' // we do not want to keep the connection open after an error
),
'Error ' . $code
);
Expand Down Expand Up @@ -273,17 +274,28 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
$chunked = true;
} else {
// remove any Transfer-Encoding headers unless automatically enabled above
// we do not want to keep connection alive, so pretend we received "Connection: close" request header
$response = $response->withoutHeader('Transfer-Encoding');
$request = $request->withHeader('Connection', 'close');
}

// assign "Connection" header automatically
$persist = false;
if ($code === 101) {
// 101 (Switching Protocols) response uses Connection: upgrade header
// This implies that this stream now uses another protocol and we
// may not persist this connection for additional requests.
$response = $response->withHeader('Connection', 'upgrade');
} elseif ($version === '1.1') {
// HTTP/1.1 assumes persistent connection support by default
// we do not support persistent connections, so let the client know
} elseif (\strtolower($request->getHeaderLine('Connection')) === 'close' || \strtolower($response->getHeaderLine('Connection')) === 'close') {
// obey explicit "Connection: close" request header or response header if present
$response = $response->withHeader('Connection', 'close');
} elseif ($version === '1.1') {
// HTTP/1.1 assumes persistent connection support by default, so we don't need to inform client
$persist = true;
} elseif (strtolower($request->getHeaderLine('Connection')) === 'keep-alive') {
// obey explicit "Connection: keep-alive" request header and inform client
$persist = true;
$response = $response->withHeader('Connection', 'keep-alive');
} else {
// remove any Connection headers unless automatically enabled above
$response = $response->withoutHeader('Connection');
Expand Down Expand Up @@ -328,9 +340,15 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
$body = "0\r\n\r\n";
}

// end connection after writing response headers and body
// write response headers and body
$connection->write($headers . "\r\n" . $body);
$connection->end();

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->parser->handle($connection);
} else {
$connection->end();
}
return;
}

Expand All @@ -345,6 +363,16 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// in particular this may only fire on a later read/write attempt.
$connection->on('close', array($body, 'close'));

$body->pipe($connection);
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$connection->removeListener('close', array($body, 'close'));
$parser->handle($connection);
});
} else {
$body->pipe($connection);
}
}
}
6 changes: 3 additions & 3 deletions tests/FunctionalServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ public function testConnectWithThroughStreamReturnsDataAsGiven()
$server->listen($socket);

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
Expand Down Expand Up @@ -703,7 +703,7 @@ public function testConnectWithThroughStreamReturnedFromPromiseReturnsDataAsGive
$server->listen($socket);

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
Expand Down Expand Up @@ -737,7 +737,7 @@ public function testConnectWithClosedThroughStreamReturnsNoData()
$server->listen($socket);

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\nConnection: close\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
Expand Down
197 changes: 195 additions & 2 deletions tests/Io/StreamingServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\EventLoop\Factory;
use React\Http\Io\StreamingServer;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Promise\Promise;
use React\Stream\ThroughStream;
use React\Tests\Http\SocketServerStub;
Expand Down Expand Up @@ -957,7 +958,7 @@ function ($data) use (&$buffer) {
$data = "GET / HTTP/1.1\r\n\r\n";
$this->connection->emit('data', array($data));

$this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\nConnection: close\r\n\r\nfoo", $buffer);
$this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\n\r\nfoo", $buffer);
}

public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse()
Expand Down Expand Up @@ -992,7 +993,7 @@ function ($data) use (&$buffer) {
$data = "GET / HTTP/1.1\r\nUpgrade: demo\r\n\r\n";
$this->connection->emit('data', array($data));

$this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\nConnection: close\r\n\r\nfoo", $buffer);
$this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nfoo", $buffer);
}

public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength()
Expand Down Expand Up @@ -2813,6 +2814,198 @@ public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() {
$this->assertEquals(array('test' => 'abc,def', 'hello' => 'world'), $requestValidation->getCookieParams());
}

public function testNewConnectionWillInvokeParserOnce()
{
$server = new StreamingServer(Factory::create(), $this->expectCallableNever());

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
}

public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneForHttp10()
{
$request = new ServerRequest('GET', 'http://localhost/', array(), '', '1.0');

$server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request));

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->once())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneForHttp11ConnectionClose()
{
$request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'close'));

$server = new StreamingServer(Factory::create(), $this->expectCallableOnceWith($request));

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->once())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhenParserIsDoneAndRequestHandlerReturnsConnectionClose()
{
$request = new ServerRequest('GET', 'http://localhost/');

$server = new StreamingServer(Factory::create(), function () {
return new Response(200, array('Connection' => 'close'));
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->once())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenConnectionCanBeKeptAliveForHttp11Default()
{
$request = new ServerRequest('GET', 'http://localhost/');

$server = new StreamingServer(Factory::create(), function () {
return new Response();
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->exactly(2))->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenConnectionCanBeKeptAliveForHttp10ConnectionKeepAlive()
{
$request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'keep-alive'), '', '1.0');

$server = new StreamingServer(Factory::create(), function () {
return new Response();
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->exactly(2))->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandlerWhenStreamingResponseBodyKeepsStreaming()
{
$request = new ServerRequest('GET', 'http://localhost/');

$body = new ThroughStream();
$server = new StreamingServer(Factory::create(), function () use ($body) {
return new Response(200, array(), $body);
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->once())->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);
}

public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandlerWhenStreamingResponseBodyEnds()
{
$request = new ServerRequest('GET', 'http://localhost/');

$body = new ThroughStream();
$server = new StreamingServer(Factory::create(), function () use ($body) {
return new Response(200, array(), $body);
});

$parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->getMock();
$parser->expects($this->exactly(2))->method('handle');

$ref = new \ReflectionProperty($server, 'parser');
$ref->setAccessible(true);
$ref->setValue($server, $parser);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->exactly(2))->method('write');
$this->connection->expects($this->never())->method('end');

// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);

$this->assertCount(2, $this->connection->listeners('close'));
$body->end();
$this->assertCount(1, $this->connection->listeners('close'));
}

private function createGetRequest()
{
$data = "GET / HTTP/1.1\r\n";
Expand Down