Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal refactoring to reuse single request parser for all requests #346

Merged
merged 2 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 59 additions & 35 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
use React\Socket\ConnectionInterface;
use RingCentral\Psr7 as g7;
use Exception;

Expand All @@ -20,50 +21,73 @@
*/
class RequestHeaderParser extends EventEmitter
{
private $buffer = '';
private $maxSize = 8192;

private $localSocketUri;
private $remoteSocketUri;

public function __construct($localSocketUri = null, $remoteSocketUri = null)
public function handle(ConnectionInterface $conn)
{
$this->localSocketUri = $localSocketUri;
$this->remoteSocketUri = $remoteSocketUri;
}
$buffer = '';
$maxSize = $this->maxSize;
$that = $this;
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
// append chunk of data to buffer and look for end of request headers
$buffer .= $data;
$endOfHeader = \strpos($buffer, "\r\n\r\n");

// reject request if buffer size is exceeded
if ($endOfHeader > $maxSize || ($endOfHeader === false && isset($buffer[$maxSize]))) {
$conn->removeListener('data', $fn);
$fn = null;

$that->emit('error', array(
new \OverflowException("Maximum header size of {$maxSize} exceeded.", 431),
$conn
));
return;
}

public function feed($data)
{
$this->buffer .= $data;
$endOfHeader = \strpos($this->buffer, "\r\n\r\n");
// ignore incomplete requests
if ($endOfHeader === false) {
return;
}

if ($endOfHeader > $this->maxSize || ($endOfHeader === false && isset($this->buffer[$this->maxSize]))) {
$this->emit('error', array(new \OverflowException("Maximum header size of {$this->maxSize} exceeded.", 431), $this));
$this->removeAllListeners();
return;
}
// request headers received => try to parse request
$conn->removeListener('data', $fn);
$fn = null;

if (false !== $endOfHeader) {
try {
$request = $this->parseRequest((string)\substr($this->buffer, 0, $endOfHeader));
$request = $that->parseRequest(
(string)\substr($buffer, 0, $endOfHeader),
$conn->getRemoteAddress(),
$conn->getLocalAddress()
);
} catch (Exception $exception) {
$this->emit('error', array($exception));
$this->removeAllListeners();
$buffer = '';
$that->emit('error', array(
$exception,
$conn
));
return;
}

$bodyBuffer = isset($this->buffer[$endOfHeader + 4]) ? \substr($this->buffer, $endOfHeader + 4) : '';
$this->emit('headers', array($request, $bodyBuffer));
$this->removeAllListeners();
}
$bodyBuffer = isset($buffer[$endOfHeader + 4]) ? \substr($buffer, $endOfHeader + 4) : '';
$buffer = '';
$that->emit('headers', array($request, $bodyBuffer, $conn));
});

$conn->on('close', function () use (&$buffer, &$fn) {
$fn = $buffer = null;
});
}

/**
* @param string $headers buffer string containing request headers only
* @throws \InvalidArgumentException
* @param ?string $remoteSocketUri
* @param ?string $localSocketUri
* @return ServerRequestInterface
* @throws \InvalidArgumentException
* @internal
*/
private function parseRequest($headers)
public function parseRequest($headers, $remoteSocketUri, $localSocketUri)
{
// additional, stricter safe-guard for request line
// because request parser doesn't properly cope with invalid ones
Expand Down Expand Up @@ -103,22 +127,22 @@ private function parseRequest($headers)

// apply REMOTE_ADDR and REMOTE_PORT if source address is known
// address should always be known, unless this is over Unix domain sockets (UDS)
if ($this->remoteSocketUri !== null) {
$remoteAddress = \parse_url($this->remoteSocketUri);
if ($remoteSocketUri !== null) {
$remoteAddress = \parse_url($remoteSocketUri);
$serverParams['REMOTE_ADDR'] = $remoteAddress['host'];
$serverParams['REMOTE_PORT'] = $remoteAddress['port'];
}

// apply SERVER_ADDR and SERVER_PORT if server address is known
// address should always be known, even for Unix domain sockets (UDS)
// but skip UDS as it doesn't have a concept of host/port.s
if ($this->localSocketUri !== null) {
$localAddress = \parse_url($this->localSocketUri);
if ($localSocketUri !== null) {
$localAddress = \parse_url($localSocketUri);
if (isset($localAddress['host'], $localAddress['port'])) {
$serverParams['SERVER_ADDR'] = $localAddress['host'];
$serverParams['SERVER_PORT'] = $localAddress['port'];
}
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'https') {
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'tls') {
WyriHaximus marked this conversation as resolved.
Show resolved Hide resolved
$serverParams['HTTPS'] = 'on';
}
}
Expand Down Expand Up @@ -177,7 +201,7 @@ private function parseRequest($headers)

// set URI components from socket address if not already filled via Host header
if ($request->getUri()->getHost() === '') {
$parts = \parse_url($this->localSocketUri);
$parts = \parse_url($localSocketUri);
if (!isset($parts['host'], $parts['port'])) {
$parts = array('host' => '127.0.0.1', 'port' => 80);
}
Expand All @@ -198,8 +222,8 @@ private function parseRequest($headers)
}

// Update request URI to "https" scheme if the connection is encrypted
$parts = \parse_url($this->localSocketUri);
if (isset($parts['scheme']) && $parts['scheme'] === 'https') {
$parts = \parse_url($localSocketUri);
if (isset($parts['scheme']) && $parts['scheme'] === 'tls') {
// The request URI may omit default ports here, so try to parse port
// from Host header field (if possible)
$port = $request->getUri()->getPort();
Expand Down
64 changes: 23 additions & 41 deletions src/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
final class StreamingServer extends EventEmitter
{
private $callback;
private $parser;

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
Expand All @@ -108,6 +109,27 @@ public function __construct($requestHandler)
}

$this->callback = $requestHandler;
$this->parser = new RequestHeaderParser();

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer, ConnectionInterface $conn) use ($that) {
$that->handleRequest($conn, $request);

if ($bodyBuffer !== '') {
$conn->emit('data', array($bodyBuffer));
}
});

$this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) {
$that->emit('error', array($e));

// parsing failed => assume dummy request and send appropriate error
$that->writeError(
$conn,
$e->getCode() !== 0 ? $e->getCode() : 400,
new ServerRequest('GET', '/')
);
});
}

/**
Expand Down Expand Up @@ -154,47 +176,7 @@ public function __construct($requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this, 'handleConnection'));
}

/** @internal */
public function handleConnection(ConnectionInterface $conn)
{
$uriLocal = $conn->getLocalAddress();
if ($uriLocal !== null) {
// local URI known, so translate transport scheme to application scheme
$uriLocal = \strtr($uriLocal, array('tcp://' => 'http://', 'tls://' => 'https://'));
}

$uriRemote = $conn->getRemoteAddress();

$that = $this;
$parser = new RequestHeaderParser($uriLocal, $uriRemote);

$listener = array($parser, 'feed');
$parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer) use ($conn, $listener, $that) {
// parsing request completed => stop feeding parser
$conn->removeListener('data', $listener);

$that->handleRequest($conn, $request);

if ($bodyBuffer !== '') {
$conn->emit('data', array($bodyBuffer));
}
});

$conn->on('data', $listener);
$parser->on('error', function(\Exception $e) use ($conn, $listener, $that) {
$conn->removeListener('data', $listener);
$that->emit('error', array($e));

// parsing failed => assume dummy request and send appropriate error
$that->writeError(
$conn,
$e->getCode() !== 0 ? $e->getCode() : 400,
new ServerRequest('GET', '/')
);
});
$socket->on('connection', array($this->parser, 'handle'));
}

/** @internal */
Expand Down
Loading