Skip to content

Commit

Permalink
Merge pull request #132 from clue-labs/request
Browse files Browse the repository at this point in the history
Request stream will now be handled internally
  • Loading branch information
WyriHaximus authored Feb 21, 2017
2 parents 4468182 + dcc2b95 commit 89bd4bc
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 91 deletions.
69 changes: 36 additions & 33 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;
use React\Stream\Util;
use Psr\Http\Message\RequestInterface;

/**
* The `Request` class is responsible for streaming the incoming request body
Expand All @@ -24,11 +25,8 @@
class Request extends EventEmitter implements ReadableStreamInterface
{
private $readable = true;
private $method;
private $path;
private $query;
private $httpVersion;
private $headers;
private $request;
private $stream;

// metadata, implicitly added externally
public $remoteAddress;
Expand All @@ -42,13 +40,23 @@ class Request extends EventEmitter implements ReadableStreamInterface
*
* @internal
*/
public function __construct($method, $path, $query = array(), $httpVersion = '1.1', $headers = array())
public function __construct(RequestInterface $request, ReadableStreamInterface $stream)
{
$this->method = $method;
$this->path = $path;
$this->query = $query;
$this->httpVersion = $httpVersion;
$this->headers = $headers;
$this->request = $request;
$this->stream = $stream;

$that = $this;
// forward data and end events from body stream to request
$stream->on('data', function ($data) use ($that) {
$that->emit('data', array($data));
});
$stream->on('end', function () use ($that) {
$that->emit('end');
});
$stream->on('error', function ($error) use ($that) {
$that->emit('error', array($error));
});
$stream->on('close', array($this, 'close'));
}

/**
Expand All @@ -58,7 +66,7 @@ public function __construct($method, $path, $query = array(), $httpVersion = '1.
*/
public function getMethod()
{
return $this->method;
return $this->request->getMethod();
}

/**
Expand All @@ -68,7 +76,7 @@ public function getMethod()
*/
public function getPath()
{
return $this->path;
return $this->request->getUri()->getPath();
}

/**
Expand All @@ -78,7 +86,10 @@ public function getPath()
*/
public function getQueryParams()
{
return $this->query;
$params = array();
parse_str($this->request->getUri()->getQuery(), $params);

return $params;
}

/**
Expand All @@ -88,7 +99,7 @@ public function getQueryParams()
*/
public function getProtocolVersion()
{
return $this->httpVersion;
return $this->request->getProtocolVersion();
}

/**
Expand All @@ -102,7 +113,7 @@ public function getProtocolVersion()
*/
public function getHeaders()
{
return $this->headers;
return $this->request->getHeaders();
}

/**
Expand All @@ -113,18 +124,7 @@ public function getHeaders()
*/
public function getHeader($name)
{
$found = array();

$name = strtolower($name);
foreach ($this->headers as $key => $value) {
if (strtolower($key) === $name) {
foreach((array)$value as $one) {
$found[] = $one;
}
}
}

return $found;
return $this->request->getHeader($name);
}

/**
Expand All @@ -135,7 +135,7 @@ public function getHeader($name)
*/
public function getHeaderLine($name)
{
return implode(', ', $this->getHeader($name));
return $this->request->getHeaderLine($name);
}

/**
Expand All @@ -146,7 +146,7 @@ public function getHeaderLine($name)
*/
public function hasHeader($name)
{
return !!$this->getHeader($name);
return $this->request->hasHeader($name);
}

/**
Expand All @@ -164,7 +164,7 @@ public function hasHeader($name)
*/
public function expectsContinue()
{
return $this->httpVersion !== '1.0' && '100-continue' === strtolower($this->getHeaderLine('Expect'));
return $this->getProtocolVersion() !== '1.0' && '100-continue' === strtolower($this->getHeaderLine('Expect'));
}

public function isReadable()
Expand All @@ -178,7 +178,7 @@ public function pause()
return;
}

$this->emit('pause');
$this->stream->pause();
}

public function resume()
Expand All @@ -187,7 +187,7 @@ public function resume()
return;
}

$this->emit('resume');
$this->stream->resume();
}

public function close()
Expand All @@ -196,7 +196,10 @@ public function close()
return;
}

// request closed => stop reading from the stream by pausing it
$this->readable = false;
$this->stream->pause();

$this->emit('close');
$this->removeAllListeners();
}
Expand Down
16 changes: 1 addition & 15 deletions src/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,7 @@ private function parseRequest($data)
{
list($headers, $bodyBuffer) = explode("\r\n\r\n", $data, 2);

$psrRequest = g7\parse_request($headers);

$parsedQuery = array();
$queryString = $psrRequest->getUri()->getQuery();
if ($queryString) {
parse_str($queryString, $parsedQuery);
}

$request = new Request(
$psrRequest->getMethod(),
$psrRequest->getUri()->getPath(),
$parsedQuery,
$psrRequest->getProtocolVersion(),
$psrRequest->getHeaders()
);
$request = g7\parse_request($headers);

return array($request, $bodyBuffer);
}
Expand Down
33 changes: 9 additions & 24 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Evenement\EventEmitter;
use React\Socket\ServerInterface as SocketServerInterface;
use React\Socket\ConnectionInterface;
use Psr\Http\Message\RequestInterface;

/**
* The `Server` class is responsible for handling incoming connections and then
Expand Down Expand Up @@ -87,7 +88,7 @@ public function handleConnection(ConnectionInterface $conn)
$that = $this;
$parser = new RequestHeaderParser();
$listener = array($parser, 'feed');
$parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $listener, $parser, $that) {
$parser->on('headers', function (RequestInterface $request, $bodyBuffer) use ($conn, $listener, $parser, $that) {
// parsing request completed => stop feeding parser
$conn->removeListener('data', $listener);

Expand All @@ -111,7 +112,7 @@ public function handleConnection(ConnectionInterface $conn)
}

/** @internal */
public function handleRequest(ConnectionInterface $conn, Request $request)
public function handleRequest(ConnectionInterface $conn, RequestInterface $request)
{
// only support HTTP/1.1 and HTTP/1.0 requests
if ($request->getProtocolVersion() !== '1.1' && $request->getProtocolVersion() !== '1.0') {
Expand All @@ -138,13 +139,6 @@ public function handleRequest(ConnectionInterface $conn, Request $request)
}

$response = new Response($conn, $request->getProtocolVersion());
$response->on('close', array($request, 'close'));

// attach remote ip to the request as metadata
$request->remoteAddress = trim(
parse_url('tcp://' . $conn->getRemoteAddress(), PHP_URL_HOST),
'[]'
);

$stream = $conn;
if ($request->hasHeader('Transfer-Encoding')) {
Expand All @@ -155,22 +149,13 @@ public function handleRequest(ConnectionInterface $conn, Request $request)
}
}

// forward pause/resume calls to underlying connection
$request->on('pause', array($conn, 'pause'));
$request->on('resume', array($conn, 'resume'));

// request closed => stop reading from the stream by pausing it
// stream closed => close request
$request->on('close', array($stream, 'pause'));
$stream->on('close', array($request, 'close'));
$request = new Request($request, $stream);

// forward data and end events from body stream to request
$stream->on('end', function() use ($request) {
$request->emit('end');
});
$stream->on('data', function ($data) use ($request) {
$request->emit('data', array($data));
});
// attach remote ip to the request as metadata
$request->remoteAddress = trim(
parse_url('tcp://' . $conn->getRemoteAddress(), PHP_URL_HOST),
'[]'
);

$this->emit('request', array($request, $response));
}
Expand Down
10 changes: 4 additions & 6 deletions tests/RequestHeaderParserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ public function testHeadersEventShouldReturnRequestAndBodyBuffer()
$data .= 'RANDOM DATA';
$parser->feed($data);

$this->assertInstanceOf('React\Http\Request', $request);
$this->assertInstanceOf('Psr\Http\Message\RequestInterface', $request);
$this->assertSame('GET', $request->getMethod());
$this->assertSame('/', $request->getPath());
$this->assertSame(array(), $request->getQueryParams());
$this->assertEquals('http://example.com/', $request->getUri());
$this->assertSame('1.1', $request->getProtocolVersion());
$this->assertSame(array('Host' => array('example.com:80'), 'Connection' => array('close')), $request->getHeaders());

Expand Down Expand Up @@ -83,10 +82,9 @@ public function testHeadersEventShouldParsePathAndQueryString()
$data = $this->createAdvancedPostRequest();
$parser->feed($data);

$this->assertInstanceOf('React\Http\Request', $request);
$this->assertInstanceOf('Psr\Http\Message\RequestInterface', $request);
$this->assertSame('POST', $request->getMethod());
$this->assertSame('/foo', $request->getPath());
$this->assertSame(array('bar' => 'baz'), $request->getQueryParams());
$this->assertEquals('http://example.com/foo?bar=baz', $request->getUri());
$this->assertSame('1.1', $request->getProtocolVersion());
$headers = array(
'Host' => array('example.com:80'),
Expand Down
Loading

0 comments on commit 89bd4bc

Please sign in to comment.