Skip to content

Commit

Permalink
Merge pull request #3 from WyriHaximus-labs/buffer-max-size
Browse files Browse the repository at this point in the history
Optional maximum length argument for the buffer function
  • Loading branch information
jsor authored Oct 17, 2017
2 parents 8e41bd2 + 39ca554 commit 6a88554
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 5 deletions.
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ Alternatively, you can also refer to them with their fully-qualified name:

### buffer()

The `buffer(ReadableStreamInterface $stream)` function can be used to create
a `Promise` which resolves with the stream data buffer.
The `buffer(ReadableStreamInterface $stream, int $maxLength = null)` function can be used to create
a `Promise` which resolves with the stream data buffer. With an optional maximum length argument
which defaults to no limit. In case the maximum length is reached before the end the promise will
be rejected with a `\OverflowException`.


```php
$stream = accessSomeJsonStream();
Expand All @@ -56,6 +59,18 @@ The promise will reject if the stream emits an error.

The promise will reject if it is canceled.

```php
$stream = accessSomeToLargeStream();

Stream\buffer($stream, 1024)->then(function ($contents) {
var_dump(json_decode($contents));
}, function ($error) {
// Reaching here when the stream buffer goes above the max size,
// in this example that is 1024 bytes,
// or when the stream emits an error.
});
```

### first()

The `first(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')`
Expand Down
13 changes: 10 additions & 3 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,29 @@
* Creates a `Promise` which resolves with the stream data buffer
*
* @param ReadableStreamInterface $stream
* @param int|null $maxLength Maximum number of bytes to buffer or null for unlimited.
* @return Promise\CancellablePromiseInterface Promise<string, Exception>
*/
function buffer(ReadableStreamInterface $stream)
function buffer(ReadableStreamInterface $stream, $maxLength = null)
{
// stream already ended => resolve with empty buffer
if (!$stream->isReadable()) {
return Promise\resolve('');
}

$deferred = new Promise\Deferred();
$buffer = '';
$bufferer = function ($data) use (&$buffer) {
$bufferer = function ($data) use (&$buffer, $deferred, $maxLength) {
$buffer .= $data;
if ($maxLength !== null && isset($buffer[$maxLength])) {
$deferred->reject(new \OverflowException('Buffer exceeded maximum length'));
}
};
$stream->on('data', $bufferer);

$promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) {
$promise = new Promise\Promise(function ($resolve, $reject) use ($stream, $deferred, &$buffer) {
$deferred->promise()->then($resolve, $reject);

$stream->on('error', function ($error) use ($reject) {
$reject(new \RuntimeException('An error occured on the underlying stream while buffering', 0, $error));
});
Expand Down
33 changes: 33 additions & 0 deletions tests/BufferTest.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

use Clue\React\Block;
use React\EventLoop\Factory;
use React\Promise\Stream;
use React\Stream\ThroughStream;

Expand Down Expand Up @@ -77,4 +79,35 @@ public function testCancelPendingStreamWillReject()

$this->expectPromiseReject($promise);
}

public function testMaximumSize()
{
$loop = Factory::create();
$stream = new ThroughStream();

$loop->addTimer(0.1, function () use ($stream) {
$stream->write('12345678910111213141516');
});

$promise = Stream\buffer($stream, 16);

$this->setExpectedException('\OverflowException', 'Buffer exceeded maximum length');
Block\await($promise, $loop, 10);
}

public function testUnderMaximumSize()
{
$loop = Factory::create();
$stream = new ThroughStream();

$loop->addTimer(0.1, function () use ($stream) {
$stream->write('1234567891011');
$stream->end();
});

$promise = Stream\buffer($stream, 16);

$result = Block\await($promise, $loop, 10);
$this->assertSame('1234567891011', $result);
}
}

0 comments on commit 6a88554

Please sign in to comment.