diff --git a/README.md b/README.md index df5cba5..7849e94 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,11 @@ Alternatively, you can also refer to them with their fully-qualified name: ### buffer() -The `buffer(ReadableStreamInterface $stream)` function can be used to create -a `Promise` which resolves with the stream data buffer. +The `buffer(ReadableStreamInterface $stream, int $maxLength = null)` function can be used to create +a `Promise` which resolves with the stream data buffer. With an optional maximum length argument +which defaults to no limit. In case the maximum length is reached before the end the promise will +be rejected with a `\OverflowException`. + ```php $stream = accessSomeJsonStream(); @@ -56,6 +59,18 @@ The promise will reject if the stream emits an error. The promise will reject if it is canceled. +```php +$stream = accessSomeToLargeStream(); + +Stream\buffer($stream, 1024)->then(function ($contents) { + var_dump(json_decode($contents)); +}, function ($error) { + // Reaching here when the stream buffer goes above the max size, + // in this example that is 1024 bytes, + // or when the stream emits an error. +}); +``` + ### first() The `first(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')` diff --git a/src/functions.php b/src/functions.php index b792afa..3f78a0e 100644 --- a/src/functions.php +++ b/src/functions.php @@ -12,22 +12,29 @@ * Creates a `Promise` which resolves with the stream data buffer * * @param ReadableStreamInterface $stream + * @param int|null $maxLength Maximum number of bytes to buffer or null for unlimited. * @return Promise\CancellablePromiseInterface Promise */ -function buffer(ReadableStreamInterface $stream) +function buffer(ReadableStreamInterface $stream, $maxLength = null) { // stream already ended => resolve with empty buffer if (!$stream->isReadable()) { return Promise\resolve(''); } + $deferred = new Promise\Deferred(); $buffer = ''; - $bufferer = function ($data) use (&$buffer) { + $bufferer = function ($data) use (&$buffer, $deferred, $maxLength) { $buffer .= $data; + if ($maxLength !== null && isset($buffer[$maxLength])) { + $deferred->reject(new \OverflowException('Buffer exceeded maximum length')); + } }; $stream->on('data', $bufferer); - $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) { + $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, $deferred, &$buffer) { + $deferred->promise()->then($resolve, $reject); + $stream->on('error', function ($error) use ($reject) { $reject(new \RuntimeException('An error occured on the underlying stream while buffering', 0, $error)); }); diff --git a/tests/BufferTest.php b/tests/BufferTest.php index 6cc608e..3f46a76 100644 --- a/tests/BufferTest.php +++ b/tests/BufferTest.php @@ -1,5 +1,7 @@ expectPromiseReject($promise); } + + public function testMaximumSize() + { + $loop = Factory::create(); + $stream = new ThroughStream(); + + $loop->addTimer(0.1, function () use ($stream) { + $stream->write('12345678910111213141516'); + }); + + $promise = Stream\buffer($stream, 16); + + $this->setExpectedException('\OverflowException', 'Buffer exceeded maximum length'); + Block\await($promise, $loop, 10); + } + + public function testUnderMaximumSize() + { + $loop = Factory::create(); + $stream = new ThroughStream(); + + $loop->addTimer(0.1, function () use ($stream) { + $stream->write('1234567891011'); + $stream->end(); + }); + + $promise = Stream\buffer($stream, 16); + + $result = Block\await($promise, $loop, 10); + $this->assertSame('1234567891011', $result); + } }