Skip to content

Commit

Permalink
Implement chunked request handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andig committed Oct 13, 2017
1 parent 6f7b324 commit b724a55
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 52 deletions.
10 changes: 8 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.6",
"react/promise": "^2.3 || ^1.2.1",
"evenement/evenement": "^3.0 || ^2.0 || ^1.0",
"react/promise-stream": "^0.1.1"
"react/promise-stream": "dev-buffer-max-size"
},
"autoload": {
"psr-4": {
Expand All @@ -21,5 +21,11 @@
"phpunit/phpunit": "^4.8.10||^5.0",
"react/socket": "^1.0 || ^0.8 || ^0.7",
"clue/block-react": "^1.1"
}
},
"repositories": [
{
"type": "vcs",
"url": "https://github.com/WyriHaximus-labs/promise-stream"
}
]
}
2 changes: 1 addition & 1 deletion src/HttpBodyStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class HttpBodyStream extends EventEmitter implements StreamInterface, ReadableSt
* @param ReadableStreamInterface $input - Stream data from $stream as a body of a PSR-7 object4
* @param int|null $size - size of the data body
*/
public function __construct(ReadableStreamInterface $input, $size)
public function __construct(ReadableStreamInterface $input, $size = null)
{
$this->input = $input;
$this->size = $size;
Expand Down
19 changes: 8 additions & 11 deletions src/Middleware/RequestBodyBufferMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use React\Promise\Stream;
use React\Stream\ReadableStreamInterface;
use RingCentral\Psr7\BufferStream;
use OverflowException;

final class RequestBodyBufferMiddleware
{
Expand All @@ -29,27 +30,23 @@ public function __construct($sizeLimit = null)

public function __invoke(ServerRequestInterface $request, $stack)
{
$size = $request->getBody()->getSize();

if ($size === null) {
return new Response(411, array('Content-Type' => 'text/plain'), 'No Content-Length header given');
}

if ($size > $this->sizeLimit) {
return new Response(413, array('Content-Type' => 'text/plain'), 'Request body exceeds allowed limit');
}

$body = $request->getBody();
if (!$body instanceof ReadableStreamInterface) {
return $stack($request);
}

return Stream\buffer($body)->then(function ($buffer) use ($request, $stack) {
return Stream\buffer($body, $this->sizeLimit)->then(function ($buffer) use ($request, $stack) {
$stream = new BufferStream(strlen($buffer));
$stream->write($buffer);
$request = $request->withBody($stream);

return $stack($request);
}, function($error) {
if ($error instanceof OverflowException) {
return new Response(413, array('Content-Type' => 'text/plain'), 'Request body exceeds allowed limit');
}

return $error;
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use Evenement\EventEmitter;
use Exception;
use RingCentral\Psr7 as g7;
use RingCentral\Psr7;

/**
* @event headers
Expand Down Expand Up @@ -92,7 +92,7 @@ private function parseRequest($headers)
}

// parse request headers into obj implementing RequestInterface
$request = g7\parse_request($headers);
$request = Psr7\parse_request($headers);

// create new obj implementing ServerRequestInterface by preserving all
// previous properties and restoring original request-target
Expand Down
2 changes: 1 addition & 1 deletion src/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function __construct(
$version = '1.1',
$reason = null
) {
if ($body instanceof ReadableStreamInterface) {
if ($body instanceof ReadableStreamInterface &! $body instanceof HttpBodyStream) {
$body = new HttpBodyStream($body, null);
}

Expand Down
16 changes: 7 additions & 9 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
namespace React\Http;

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Message\ResponseInterface;
use RingCentral\Psr7;
use React\Socket\ServerInterface;
use React\Socket\ConnectionInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Promise\Promise;
use RingCentral\Psr7 as Psr7Implementation;
use Psr\Http\Message\ServerRequestInterface;
use React\Promise\CancellablePromiseInterface;
use React\Stream\WritableStreamInterface;

Expand Down Expand Up @@ -168,7 +167,7 @@ public function handleConnection(ConnectionInterface $conn)
$parser = new RequestHeaderParser($uriLocal, $uriRemote);

$listener = array($parser, 'feed');
$parser->on('headers', function (RequestInterface $request, $bodyBuffer) use ($conn, $listener, $that) {
$parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer) use ($conn, $listener, $that) {
// parsing request completed => stop feeding parser
$conn->removeListener('data', $listener);

Expand Down Expand Up @@ -305,7 +304,6 @@ public function writeError(ConnectionInterface $conn, $code, ServerRequestInterf
$this->handleResponse($conn, $request, $response);
}


/** @internal */
public function handleResponse(ConnectionInterface $connection, ServerRequestInterface $request, ResponseInterface $response)
{
Expand Down Expand Up @@ -353,7 +351,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// response to HEAD and 1xx, 204 and 304 responses MUST NOT include a body
// exclude status 101 (Switching Protocols) here for Upgrade request handling below
if ($request->getMethod() === 'HEAD' || $code === 100 || ($code > 101 && $code < 200) || $code === 204 || $code === 304) {
$response = $response->withBody(Psr7Implementation\stream_for(''));
$response = $response->withBody(Psr7\stream_for(''));
}

// 101 (Switching Protocols) response uses Connection: upgrade header
Expand Down Expand Up @@ -388,7 +386,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
private function handleResponseBody(ResponseInterface $response, ConnectionInterface $connection)
{
if (!$response->getBody() instanceof HttpBodyStream) {
$connection->write(Psr7Implementation\str($response));
$connection->write(Psr7\str($response));
return $connection->end();
}

Expand All @@ -399,7 +397,7 @@ private function handleResponseBody(ResponseInterface $response, ConnectionInter
return $stream->close();
}

$connection->write(Psr7Implementation\str($response));
$connection->write(Psr7\str($response));

if ($stream->isReadable()) {
if ($response->getHeaderLine('Transfer-Encoding') === 'chunked') {
Expand Down
48 changes: 22 additions & 26 deletions tests/Middleware/RequestBodyBufferMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Middleware\RequestBodyBufferMiddleware;
use React\Http\ServerRequest;
use React\Http\HttpBodyStream;
use React\Stream\ThroughStream;
use React\EventLoop\Factory;
use React\Tests\Http\TestCase;
use RingCentral\Psr7\BufferStream;
use React\Stream\ThroughStream;
use React\Http\HttpBodyStream;
use Clue\React\Block;

final class RequestBodyBufferMiddlewareTest extends TestCase
{
Expand Down Expand Up @@ -63,45 +65,39 @@ function (ServerRequestInterface $request) use (&$exposedRequest) {
$this->assertSame($body, $exposedRequest->getBody()->getContents());
}

public function testUnknownSizeReturnsError411()
public function testExcessiveSizeReturnsError413()
{
$body = $this->getMockBuilder('Psr\Http\Message\StreamInterface')->getMock();
$body->expects($this->once())->method('getSize')->willReturn(null);
$loop = Factory::create();

$stream = new ThroughStream();

$serverRequest = new ServerRequest(
'GET',
'https://example.com/',
array(),
$body
new HttpBodyStream($stream)
);

$buffer = new RequestBodyBufferMiddleware();
$response = $buffer(
$buffer = new RequestBodyBufferMiddleware(1);
$promise = $buffer(
$serverRequest,
function () {}
function (ServerRequestInterface $request) {
return $request;
}
);

$this->assertSame(411, $response->getStatusCode());
}

public function testExcessiveSizeReturnsError413()
{
$stream = new BufferStream(2);
$stream->write('aa');

$serverRequest = new ServerRequest(
'GET',
'https://example.com/',
array(),
$stream
$exposedResponse = null;
$promise->then(
function($response) use (&$exposedResponse) {
$exposedResponse = $response;
},
$this->expectCallableNever()
);

$buffer = new RequestBodyBufferMiddleware(1);
$response = $buffer(
$serverRequest,
function () {}
);
$this->assertSame(413, $exposedResponse->getStatusCode());

$this->assertSame(413, $response->getStatusCode());
Block\await($promise, $loop);
}
}

0 comments on commit b724a55

Please sign in to comment.