Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming body parser: Multipart #200

Closed
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"react/socket": "^1.0 || ^0.8 || ^0.7 || ^0.6 || ^0.5",
"react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.6",
"react/promise": "^2.1 || ^1.2.1",
"evenement/evenement": "^2.0 || ^1.0"
"evenement/evenement": "^2.0 || ^1.0",
"react/promise-stream": "^0.1"
},
"autoload": {
"psr-4": {
Expand All @@ -18,7 +19,6 @@
},
"require-dev": {
"phpunit/phpunit": "^4.8.10||^5.0",
"react/promise-stream": "^0.1.1",
"react/socket": "^1.0 || ^0.8 || ^0.7",
"clue/block-react": "^1.1"
}
Expand Down
319 changes: 319 additions & 0 deletions src/StreamingBodyParser/MultipartParser.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
<?php

namespace React\Http\StreamingBodyParser;

use Evenement\EventEmitter;
use Psr\Http\Message\RequestInterface;
use React\Http\HttpBodyStream;
use React\Http\UploadedFile;
use React\Promise\CancellablePromiseInterface;
use React\Promise\Deferred;
use React\Stream\ThroughStream;

final class MultipartParser extends EventEmitter
{
/**
* @var string
*/
protected $buffer = '';

/**
* @var string
*/
protected $ending = '';

/**
* @var int
*/
protected $endingSize = 0;

/**
* @var string
*/
protected $boundary;

/**
* @var RequestInterface
*/
protected $request;

/**
* @var HttpBodyStream
*/
protected $body;

/**
* @var CancellablePromiseInterface
*/
protected $promise;

/**
* @var callable
*/
protected $onDataCallable;

public function __construct(RequestInterface $request)
{
$this->onDataCallable = array($this, 'onData');
$deferred = new Deferred(function () {
$this->body->removeListener('data', $this->onDataCallable);
$this->body->close();
});
$this->promise = $deferred->promise();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the promise property really needed? Looks like it's only used in MultipartParser::cancel(). I think this is unecessary and

$this->body->removeListener('data', $this->onDataCallable);
$this->body->close();

can be just called directly there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, fixed it 👍

$this->request = $request;
$this->body = $this->request->getBody();

$dataMethod = $this->determineOnDataMethod();
$this->setOnDataListener(array($this, $dataMethod));
}

protected function determineOnDataMethod()
{
if (!$this->request->hasHeader('content-type')) {
return 'findBoundary';
}

$contentType = $this->request->getHeaderLine('content-type');
preg_match('/boundary="?(.*)"?$/', $contentType, $matches);
if (isset($matches[1])) {
$this->setBoundary($matches[1]);
return 'onData';
}

return 'findBoundary';
}

protected function setBoundary($boundary)
{
$this->boundary = $boundary;
$this->ending = $this->boundary . "--\r\n";
$this->endingSize = strlen($this->ending);
}

public function findBoundary($data)
{
$this->buffer .= $data;

if (substr($this->buffer, 0, 3) === '---' && strpos($this->buffer, "\r\n") !== false) {
$boundary = substr($this->buffer, 2, strpos($this->buffer, "\r\n"));
$boundary = substr($boundary, 0, -2);
$this->setBoundary($boundary);
$this->setOnDataListener(array($this, 'onData'));
}
}

public function onData($data)
{
$this->buffer .= $data;
$ending = strpos($this->buffer, $this->ending) == strlen($this->buffer) - $this->endingSize;

if (
strrpos($this->buffer, $this->boundary) < strrpos($this->buffer, "\r\n\r\n") || $ending
) {
$this->parseBuffer();
}

if ($ending) {
$this->emit('end');
}
}

protected function parseBuffer()
{
$chunks = preg_split('/-+' . $this->boundary . '/', $this->buffer);
$this->buffer = array_pop($chunks);
foreach ($chunks as $chunk) {
$chunk = $this->stripTrainingEOL($chunk);
$this->parseChunk($chunk);
}

$split = explode("\r\n\r\n", $this->buffer, 2);
if (count($split) <= 1) {
return;
}

$chunks = preg_split('/-+' . $this->boundary . '/', $split[0], -1, PREG_SPLIT_NO_EMPTY);
$headers = $this->parseHeaders(trim($chunks[0]));
if (isset($headers['content-disposition']) && $this->headerStartsWith($headers['content-disposition'], 'filename')) {
$this->parseFile($headers, $split[1]);
$this->buffer = '';
}
}

protected function parseChunk($chunk)
{
if ($chunk === '') {
return;
}

list ($header, $body) = explode("\r\n\r\n", $chunk, 2);
$headers = $this->parseHeaders($header);

if (!isset($headers['content-disposition'])) {
return;
}

if ($this->headerStartsWith($headers['content-disposition'], 'filename')) {
$this->parseFile($headers, $body, false);
return;
}

if ($this->headerStartsWith($headers['content-disposition'], 'name')) {
$this->parsePost($headers, $body);
return;
}
}

protected function parseFile($headers, $body, $streaming = true)
{
if (
!$this->headerContains($headers['content-disposition'], 'name=') ||
!$this->headerContains($headers['content-disposition'], 'filename=')
) {
return;
}

$stream = new ThroughStream();
$this->emit('file', array(
$this->getFieldFromHeader($headers['content-disposition'], 'name'),
new UploadedFile(
new HttpBodyStream($stream, null),
0,
UPLOAD_ERR_OK,
$this->getFieldFromHeader($headers['content-disposition'], 'filename'),
$headers['content-type'][0]
),
$headers,
));

if (!$streaming) {
$stream->end($body);
return;
}

$this->setOnDataListener($this->chunkStreamFunc($stream));
$stream->write($body);
}

protected function chunkStreamFunc(ThroughStream $stream)
{
$that = $this;
$boundary = $this->boundary;
$buffer = '';
$func = function($data) use (&$buffer, $stream, $that, $boundary) {
$buffer .= $data;
if (strpos($buffer, $boundary) !== false) {
$chunks = preg_split('/-+' . $boundary . '/', $buffer);
$chunk = array_shift($chunks);
$chunk = $that->stripTrainingEOL($chunk);
$stream->end($chunk);

$that->setOnDataListener(array($that, 'onData'));

if (count($chunks) == 1) {
array_unshift($chunks, '');
}

$that->onData(implode('-' . $boundary, $chunks));
return;
}

if (strlen($buffer) >= strlen($boundary) * 3) {
$stream->write($buffer);
$buffer = '';
}
};
return $func;
}

protected function parsePost($headers, $body)
{
foreach ($headers['content-disposition'] as $part) {
if (strpos($part, 'name') === 0) {
preg_match('/name="?(.*)"$/', $part, $matches);
$this->emit('post', array(
$matches[1],
$body,
$headers,
));
}
}
}

protected function parseHeaders($header)
{
$headers = array();

foreach (explode("\r\n", trim($header)) as $line) {
list($key, $values) = explode(':', $line, 2);
$key = trim($key);
$key = strtolower($key);
$values = explode(';', $values);
$values = array_map('trim', $values);
$headers[$key] = $values;
}

return $headers;
}

protected function headerStartsWith(array $header, $needle)
{
foreach ($header as $part) {
if (strpos($part, $needle) === 0) {
return true;
}
}

return false;
}

protected function headerContains(array $header, $needle)
{
foreach ($header as $part) {
if (strpos($part, $needle) !== false) {
return true;
}
}

return false;
}

protected function getFieldFromHeader(array $header, $field)
{
foreach ($header as $part) {
if (strpos($part, $field) === 0) {
preg_match('/' . $field . '="?(.*)"$/', $part, $matches);
return $matches[1];
}
}

return '';
}

/**
* @internal
*/
public function setOnDataListener($callable)
{
$this->body->removeListener('data', $this->onDataCallable);
$this->onDataCallable = $callable;
$this->body->on('data', $this->onDataCallable);
}

public function cancel()
{
$this->promise->cancel();
}

/**
* @internal
*/
public function stripTrainingEOL($chunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: stripTrainingEOL -> stripTrailingEOL

{
if (substr($chunk, -2) === "\r\n") {
return substr($chunk, 0, -2);
}

return $chunk;
}
}
Loading