diff --git a/README.md b/README.md index 0fa8a1a..4fddecb 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ descriptor based implementation with an in-memory write buffer. * [ReadableResourceStream](#readableresourcestream) * [WritableResourceStream](#writableresourcestream) * [DuplexResourceStream](#duplexresourcestream) + * [ThroughStream](#throughstream) * [Usage](#usage) * [Install](#install) * [Tests](#tests) @@ -924,6 +925,70 @@ $buffer->softLimit = 8192; See also [`write()`](#write) for more details. +### ThroughStream + +The `ThroughStream` implements the +[`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data +you write to it through to its readable end. + +```php +$through = new ThroughStream(); +$through->on('data', $this->expectCallableOnceWith('hello')); + +$through->write('hello'); +``` + +Similarly, the [`end()` method](#end) will end the stream and emit an +[`end` event](#end-event) and then [`close()`](#close-1) the stream. +The [`close()` method](#close-1) will close the stream and emit a +[`close` event](#close-event). +Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this: + +```php +$through = new ThroughStream(); +$source->pipe($through)->pipe($dest); +``` + +Optionally, its constructor accepts any callable function which will then be +used to *filter* any data written to it. This function receives a single data +argument as passed to the writable side and must return the data as it will be +passed to its readable end: + +```php +$through = new ThroughStream('strtoupper'); +$source->pipe($through)->pipe($dest); +``` + +Note that this class makes no assumptions about any data types. This can be +used to convert data, for example for transforming any structured data into +a newline-delimited JSON (NDJSON) stream like this: + +```php +$through = new ThroughStream(function ($data) { + return json_encode($data) . PHP_EOL; +}); +$through->on('data', $this->expectCallableOnceWith("[2, true]\n")); + +$through->write(array(2, true)); +``` + +The callback function is allowed to throw an `Exception`. In this case, +the stream will emit an `error` event and then [`close()`](#close-1) the stream. + +```php +$through = new ThroughStream(function ($data) { + if (!is_string($data)) { + throw new \UnexpectedValueException('Only strings allowed'); + } + return $data; +}); +$through->on('error', $this->expectCallableOnce())); +$through->on('close', $this->expectCallableOnce())); +$through->on('data', $this->expectCallableNever())); + +$through->write(2); +``` + ## Usage ```php $loop = React\EventLoop\Factory::create(); diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 5f80f58..6f63e2e 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -3,7 +3,76 @@ namespace React\Stream; use Evenement\EventEmitter; - +use InvalidArgumentException; + +/** + * The `ThroughStream` implements the + * [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data + * you write to it through to its readable end. + * + * ```php + * $through = new ThroughStream(); + * $through->on('data', $this->expectCallableOnceWith('hello')); + * + * $through->write('hello'); + * ``` + * + * Similarly, the [`end()` method](#end) will end the stream and emit an + * [`end` event](#end-event) and then [`close()`](#close-1) the stream. + * The [`close()` method](#close-1) will close the stream and emit a + * [`close` event](#close-event). + * Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this: + * + * ```php + * $through = new ThroughStream(); + * $source->pipe($through)->pipe($dest); + * ``` + * + * Optionally, its constructor accepts any callable function which will then be + * used to *filter* any data written to it. This function receives a single data + * argument as passed to the writable side and must return the data as it will be + * passed to its readable end: + * + * ```php + * $through = new ThroughStream('strtoupper'); + * $source->pipe($through)->pipe($dest); + * ``` + * + * Note that this class makes no assumptions about any data types. This can be + * used to convert data, for example for transforming any structured data into + * a newline-delimited JSON (NDJSON) stream like this: + * + * ```php + * $through = new ThroughStream(function ($data) { + * return json_encode($data) . PHP_EOL; + * }); + * $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); + * + * $through->write(array(2, true)); + * ``` + * + * The callback function is allowed to throw an `Exception`. In this case, + * the stream will emit an `error` event and then [`close()`](#close-1) the stream. + * + * ```php + * $through = new ThroughStream(function ($data) { + * if (!is_string($data)) { + * throw new \UnexpectedValueException('Only strings allowed'); + * } + * return $data; + * }); + * $through->on('error', $this->expectCallableOnce())); + * $through->on('close', $this->expectCallableOnce())); + * $through->on('data', $this->expectCallableNever())); + * + * $through->write(2); + * ``` + * + * @see WritableStreamInterface::write() + * @see WritableStreamInterface::end() + * @see DuplexStreamInterface::close() + * @see WritableStreamInterface::pipe() + */ class ThroughStream extends EventEmitter implements DuplexStreamInterface { private $readable = true; @@ -11,10 +80,15 @@ class ThroughStream extends EventEmitter implements DuplexStreamInterface private $closed = false; private $paused = false; private $drain = false; + private $callback; - public function filter($data) + public function __construct($callback = null) { - return $data; + if ($callback !== null && !is_callable($callback)) { + throw new InvalidArgumentException('Invalid transformation callback given'); + } + + $this->callback = $callback; } public function pause() @@ -52,7 +126,18 @@ public function write($data) return false; } - $this->emit('data', array($this->filter($data))); + if ($this->callback !== null) { + try { + $data = call_user_func($this->callback, $data); + } catch (\Exception $e) { + $this->emit('error', array($e)); + $this->close(); + + return false; + } + } + + $this->emit('data', array($data)); if ($this->paused) { $this->drain = true; @@ -70,6 +155,11 @@ public function end($data = null) if (null !== $data) { $this->write($data); + + // return if write() already caused the stream to close + if (!$this->writable) { + return; + } } $this->readable = false; @@ -92,6 +182,7 @@ public function close() $this->closed = true; $this->paused = true; $this->drain = false; + $this->callback = null; $this->emit('close'); } diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index 263eb09..0ffd93d 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -10,6 +10,15 @@ */ class ThroughStreamTest extends TestCase { + /** + * @test + * @expectedException InvalidArgumentException + */ + public function itShouldRejectInvalidCallback() + { + new ThroughStream(123); + } + /** @test */ public function itShouldReturnTrueForAnyDataWrittenToIt() { @@ -27,6 +36,56 @@ public function itShouldEmitAnyDataWrittenToIt() $through->write('foo'); } + /** @test */ + public function itShouldEmitAnyDataWrittenToItPassedThruFunction() + { + $through = new ThroughStream('strtoupper'); + $through->on('data', $this->expectCallableOnceWith('FOO')); + $through->write('foo'); + } + + /** @test */ + public function itShouldEmitAnyDataWrittenToItPassedThruCallback() + { + $through = new ThroughStream('strtoupper'); + $through->on('data', $this->expectCallableOnceWith('FOO')); + $through->write('foo'); + } + + /** @test */ + public function itShouldEmitErrorAndCloseIfCallbackThrowsException() + { + $through = new ThroughStream(function () { + throw new \RuntimeException(); + }); + $through->on('error', $this->expectCallableOnce()); + $through->on('close', $this->expectCallableOnce()); + $through->on('data', $this->expectCallableNever()); + $through->on('end', $this->expectCallableNever()); + + $through->write('foo'); + + $this->assertFalse($through->isReadable()); + $this->assertFalse($through->isWritable()); + } + + /** @test */ + public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd() + { + $through = new ThroughStream(function () { + throw new \RuntimeException(); + }); + $through->on('error', $this->expectCallableOnce()); + $through->on('close', $this->expectCallableOnce()); + $through->on('data', $this->expectCallableNever()); + $through->on('end', $this->expectCallableNever()); + + $through->end('foo'); + + $this->assertFalse($through->isReadable()); + $this->assertFalse($through->isWritable()); + } + /** @test */ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused() {