From 7c338ad789d151486f1b30f6bace1faf5f1febdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Fri, 24 Mar 2017 18:28:29 +0100 Subject: [PATCH 1/3] Use callback instead of inheritance --- src/ThroughStream.php | 20 ++++++++++++++++++-- tests/ThroughStreamTest.php | 25 +++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 5f80f58..d4e8759 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -3,6 +3,7 @@ namespace React\Stream; use Evenement\EventEmitter; +use InvalidArgumentException; class ThroughStream extends EventEmitter implements DuplexStreamInterface { @@ -11,10 +12,15 @@ class ThroughStream extends EventEmitter implements DuplexStreamInterface private $closed = false; private $paused = false; private $drain = false; + private $callback; - public function filter($data) + public function __construct($callback = null) { - return $data; + if ($callback !== null && !is_callable($callback)) { + throw new InvalidArgumentException('Invalid transformation callback given'); + } + + $this->callback = $callback; } public function pause() @@ -92,7 +98,17 @@ public function close() $this->closed = true; $this->paused = true; $this->drain = false; + $this->callback = null; $this->emit('close'); } + + private function filter($data) + { + if ($this->callback !== null) { + $data = call_user_func($this->callback, $data); + } + + return $data; + } } diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index 263eb09..434e5c5 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -10,6 +10,15 @@ */ class ThroughStreamTest extends TestCase { + /** + * @test + * @expectedException InvalidArgumentException + */ + public function itShouldRejectInvalidCallback() + { + new ThroughStream(123); + } + /** @test */ public function itShouldReturnTrueForAnyDataWrittenToIt() { @@ -27,6 +36,22 @@ public function itShouldEmitAnyDataWrittenToIt() $through->write('foo'); } + /** @test */ + public function itShouldEmitAnyDataWrittenToItPassedThruFunction() + { + $through = new ThroughStream('strtoupper'); + $through->on('data', $this->expectCallableOnceWith('FOO')); + $through->write('foo'); + } + + /** @test */ + public function itShouldEmitAnyDataWrittenToItPassedThruCallback() + { + $through = new ThroughStream('strtoupper'); + $through->on('data', $this->expectCallableOnceWith('FOO')); + $through->write('foo'); + } + /** @test */ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused() { From e1ddc34beb0c7c9895859375dfcf2712ae61cc29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Mon, 24 Apr 2017 10:46:37 +0200 Subject: [PATCH 2/3] Documentation for ThroughStream --- README.md | 48 ++++++++++++++++++++++++++++++++++++++++ src/ThroughStream.php | 51 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/README.md b/README.md index 0fa8a1a..6fd1ab3 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ descriptor based implementation with an in-memory write buffer. * [ReadableResourceStream](#readableresourcestream) * [WritableResourceStream](#writableresourcestream) * [DuplexResourceStream](#duplexresourcestream) + * [ThroughStream](#throughstream) * [Usage](#usage) * [Install](#install) * [Tests](#tests) @@ -924,6 +925,53 @@ $buffer->softLimit = 8192; See also [`write()`](#write) for more details. +### ThroughStream + +The `ThroughStream` implements the +[`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data +you write to it through to its readable end. + +```php +$through = new ThroughStream(); +$through->on('data', $this->expectCallableOnceWith('hello')); + +$through->write('hello'); +``` + +Similarly, the [`end()` method](#end) will end the stream and emit an +[`end` event](#end-event) and then [`close()`](#close-1) the stream. +The [`close()` method](#close-1) will close the stream and emit a +[`close` event](#close-event). +Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this: + +```php +$through = new ThroughStream(); +$source->pipe($through)->pipe($dest); +``` + +Optionally, its constructor accepts any callable function which will then be +used to *filter* any data written to it. This function receives a single data +argument as passed to the writable side and must return the data as it will be +passed to its readable end: + +```php +$through = new ThroughStream('strtoupper'); +$source->pipe($through)->pipe($dest); +``` + +Note that this class makes no assumptions about any data types. This can be +used to convert data, for example for transforming any structured data into +a newline-delimited JSON (NDJSON) stream like this: + +```php +$through = new ThroughStream(function ($data) { + return json_encode($data) . PHP_EOL; +}); +$through->on('data', $this->expectCallableOnceWith("[2, true]\n")); + +$through->write(array(2, true)); +``` + ## Usage ```php $loop = React\EventLoop\Factory::create(); diff --git a/src/ThroughStream.php b/src/ThroughStream.php index d4e8759..1db670d 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -5,6 +5,57 @@ use Evenement\EventEmitter; use InvalidArgumentException; +/** + * The `ThroughStream` implements the + * [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data + * you write to it through to its readable end. + * + * ```php + * $through = new ThroughStream(); + * $through->on('data', $this->expectCallableOnceWith('hello')); + * + * $through->write('hello'); + * ``` + * + * Similarly, the [`end()` method](#end) will end the stream and emit an + * [`end` event](#end-event) and then [`close()`](#close-1) the stream. + * The [`close()` method](#close-1) will close the stream and emit a + * [`close` event](#close-event). + * Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this: + * + * ```php + * $through = new ThroughStream(); + * $source->pipe($through)->pipe($dest); + * ``` + * + * Optionally, its constructor accepts any callable function which will then be + * used to *filter* any data written to it. This function receives a single data + * argument as passed to the writable side and must return the data as it will be + * passed to its readable end: + * + * ```php + * $through = new ThroughStream('strtoupper'); + * $source->pipe($through)->pipe($dest); + * ``` + * + * Note that this class makes no assumptions about any data types. This can be + * used to convert data, for example for transforming any structured data into + * a newline-delimited JSON (NDJSON) stream like this: + * + * ```php + * $through = new ThroughStream(function ($data) { + * return json_encode($data) . PHP_EOL; + * }); + * $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); + * + * $through->write(array(2, true)); + * ``` + * + * @see WritableStreamInterface::write() + * @see WritableStreamInterface::end() + * @see DuplexStreamInterface::close() + * @see WritableStreamInterface::pipe() + */ class ThroughStream extends EventEmitter implements DuplexStreamInterface { private $readable = true; From 62a939f8deede87924676325a1794de965b82d78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Mon, 24 Apr 2017 12:19:20 +0200 Subject: [PATCH 3/3] Data callback is allowed to throw Exception --- README.md | 17 ++++++++++++++ src/ThroughStream.php | 44 ++++++++++++++++++++++++++++--------- tests/ThroughStreamTest.php | 34 ++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 6fd1ab3..4fddecb 100644 --- a/README.md +++ b/README.md @@ -972,6 +972,23 @@ $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); $through->write(array(2, true)); ``` +The callback function is allowed to throw an `Exception`. In this case, +the stream will emit an `error` event and then [`close()`](#close-1) the stream. + +```php +$through = new ThroughStream(function ($data) { + if (!is_string($data)) { + throw new \UnexpectedValueException('Only strings allowed'); + } + return $data; +}); +$through->on('error', $this->expectCallableOnce())); +$through->on('close', $this->expectCallableOnce())); +$through->on('data', $this->expectCallableNever())); + +$through->write(2); +``` + ## Usage ```php $loop = React\EventLoop\Factory::create(); diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 1db670d..6f63e2e 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -51,6 +51,23 @@ * $through->write(array(2, true)); * ``` * + * The callback function is allowed to throw an `Exception`. In this case, + * the stream will emit an `error` event and then [`close()`](#close-1) the stream. + * + * ```php + * $through = new ThroughStream(function ($data) { + * if (!is_string($data)) { + * throw new \UnexpectedValueException('Only strings allowed'); + * } + * return $data; + * }); + * $through->on('error', $this->expectCallableOnce())); + * $through->on('close', $this->expectCallableOnce())); + * $through->on('data', $this->expectCallableNever())); + * + * $through->write(2); + * ``` + * * @see WritableStreamInterface::write() * @see WritableStreamInterface::end() * @see DuplexStreamInterface::close() @@ -109,7 +126,18 @@ public function write($data) return false; } - $this->emit('data', array($this->filter($data))); + if ($this->callback !== null) { + try { + $data = call_user_func($this->callback, $data); + } catch (\Exception $e) { + $this->emit('error', array($e)); + $this->close(); + + return false; + } + } + + $this->emit('data', array($data)); if ($this->paused) { $this->drain = true; @@ -127,6 +155,11 @@ public function end($data = null) if (null !== $data) { $this->write($data); + + // return if write() already caused the stream to close + if (!$this->writable) { + return; + } } $this->readable = false; @@ -153,13 +186,4 @@ public function close() $this->emit('close'); } - - private function filter($data) - { - if ($this->callback !== null) { - $data = call_user_func($this->callback, $data); - } - - return $data; - } } diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index 434e5c5..0ffd93d 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -52,6 +52,40 @@ public function itShouldEmitAnyDataWrittenToItPassedThruCallback() $through->write('foo'); } + /** @test */ + public function itShouldEmitErrorAndCloseIfCallbackThrowsException() + { + $through = new ThroughStream(function () { + throw new \RuntimeException(); + }); + $through->on('error', $this->expectCallableOnce()); + $through->on('close', $this->expectCallableOnce()); + $through->on('data', $this->expectCallableNever()); + $through->on('end', $this->expectCallableNever()); + + $through->write('foo'); + + $this->assertFalse($through->isReadable()); + $this->assertFalse($through->isWritable()); + } + + /** @test */ + public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd() + { + $through = new ThroughStream(function () { + throw new \RuntimeException(); + }); + $through->on('error', $this->expectCallableOnce()); + $through->on('close', $this->expectCallableOnce()); + $through->on('data', $this->expectCallableNever()); + $through->on('end', $this->expectCallableNever()); + + $through->end('foo'); + + $this->assertFalse($through->isReadable()); + $this->assertFalse($through->isWritable()); + } + /** @test */ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused() {