From e95900ee67815dab46e07ea645a7adfbef79ed37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sat, 25 Feb 2017 18:52:20 +0100 Subject: [PATCH 1/4] Documentation and internal simplification for pipe() --- README.md | 38 ++++++++++++++++++++++++++---- src/CompositeStream.php | 4 +--- src/ReadableStream.php | 4 +--- src/ReadableStreamInterface.php | 41 +++++++++++++++++++++++++++++++++ src/Stream.php | 4 +--- src/Util.php | 17 +++++++++----- tests/ReadableStreamTest.php | 9 ++++++++ tests/StreamTest.php | 11 +++++++++ tests/UtilTest.php | 11 +++++++++ 9 files changed, 120 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index b3d0bce..127f1bc 100644 --- a/README.md +++ b/README.md @@ -37,10 +37,40 @@ This component depends on `événement`, which is an implementation of the * `pause()`: Remove the data source file descriptor from the event loop. This allows you to throttle incoming data. * `resume()`: Re-attach the data source after a `pause()`. -* `pipe(WritableStreamInterface $dest, array $options = [])`: Pipe this - readable stream into a writable stream. Automatically sends all incoming - data to the destination. Automatically throttles based on what the - destination can handle. +* `pipe(WritableStreamInterface $dest, array $options = [])`: +Pipe all the data from this readable source into the given writable destination. + +Automatically sends all incoming data to the destination. +Automatically throttles the source based on what the destination can handle. + +```php +$source->pipe($dest); +``` + +This method returns the destination stream as-is, which can be used to +set up chains of piped streams: + +```php +$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest); +``` + +By default, this will call `end()` on the destination stream once the +source stream emits an `end` event. This can be disabled like this: + +```php +$source->pipe($dest, array('end' => false)); +``` + +Note that this only applies to the `end` event. +If an `error` or explicit `close` event happens on the source stream, +you'll have to manually close the destination stream: + +```php +$source->pipe($dest); +$source->on('close', function () use ($dest) { + $dest->end('BYE!'); +}); +``` ## Writable Streams diff --git a/src/CompositeStream.php b/src/CompositeStream.php index 1446900..6ce863e 100644 --- a/src/CompositeStream.php +++ b/src/CompositeStream.php @@ -55,9 +55,7 @@ public function resume() public function pipe(WritableStreamInterface $dest, array $options = array()) { - Util::pipe($this, $dest, $options); - - return $dest; + return Util::pipe($this, $dest, $options); } public function isWritable() diff --git a/src/ReadableStream.php b/src/ReadableStream.php index bdf6c33..d0a4e58 100644 --- a/src/ReadableStream.php +++ b/src/ReadableStream.php @@ -23,9 +23,7 @@ public function resume() public function pipe(WritableStreamInterface $dest, array $options = array()) { - Util::pipe($this, $dest, $options); - - return $dest; + return Util::pipe($this, $dest, $options); } public function close() diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index 24e74c2..741dcb2 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -15,6 +15,47 @@ interface ReadableStreamInterface extends EventEmitterInterface public function isReadable(); public function pause(); public function resume(); + + /** + * Pipes all the data from this readable source into the given writable destination. + * + * Automatically sends all incoming data to the destination. + * Automatically throttles the source based on what the destination can handle. + * + * ```php + * $source->pipe($dest); + * ``` + * + * This method returns the destination stream as-is, which can be used to + * set up chains of piped streams: + * + * ```php + * $source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest); + * ``` + * + * By default, this will call `end()` on the destination stream once the + * source stream emits an `end` event. This can be disabled like this: + * + * ```php + * $source->pipe($dest, array('end' => false)); + * ``` + * + * Note that this only applies to the `end` event. + * If an `error` or explicit `close` event happens on the source stream, + * you'll have to manually close the destination stream: + * + * ```php + * $source->pipe($dest); + * $source->on('close', function () use ($dest) { + * $dest->end('BYE!'); + * }); + * ``` + * + * @param WritableStreamInterface $dest + * @param array $options + * @return WritableStreamInterface $dest stream as-is + */ public function pipe(WritableStreamInterface $dest, array $options = array()); + public function close(); } diff --git a/src/Stream.php b/src/Stream.php index 34386ae..56ed511 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -140,9 +140,7 @@ public function end($data = null) public function pipe(WritableStreamInterface $dest, array $options = array()) { - Util::pipe($this, $dest, $options); - - return $dest; + return Util::pipe($this, $dest, $options); } public function handleData($stream) diff --git a/src/Util.php b/src/Util.php index c2445a6..6301916 100644 --- a/src/Util.php +++ b/src/Util.php @@ -2,16 +2,19 @@ namespace React\Stream; -// TODO: move to a trait - class Util { + /** + * Pipes all the data from the given $source into the $dest + * + * @param ReadableStreamInterface $source + * @param WritableStreamInterface $dest + * @param array $options + * @return WritableStreamInterface $dest stream as-is + * @see ReadableStreamInterface::pipe() for more details + */ public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array()) { - // TODO: use stream_copy_to_stream - // it is 4x faster than this - // but can lose data under load with no way to recover it - $dest->emit('pipe', array($source)); $source->on('data', function ($data) use ($source, $dest) { @@ -32,6 +35,8 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter $dest->end(); }); } + + return $dest; } public static function forwardEvents($source, $target, array $events) diff --git a/tests/ReadableStreamTest.php b/tests/ReadableStreamTest.php index 12a77b7..c2b0f65 100644 --- a/tests/ReadableStreamTest.php +++ b/tests/ReadableStreamTest.php @@ -27,6 +27,15 @@ public function resumeShouldDoNothing() $readable->resume(); } + /** @test */ + public function pipeShouldReturnDestination() + { + $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $readable = new ReadableStream(); + + $this->assertSame($dest, $readable->pipe($dest)); + } + /** @test */ public function closeShouldClose() { diff --git a/tests/StreamTest.php b/tests/StreamTest.php index 801e064..44a3e5a 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -198,6 +198,17 @@ public function testEndedStreamsShouldNotWrite() unlink($file); } + public function testPipeShouldReturnDestination() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $conn = new Stream($stream, $loop); + $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + + $this->assertSame($dest, $conn->pipe($dest)); + } + public function testBufferEventsShouldBubbleUp() { $stream = fopen('php://temp', 'r+'); diff --git a/tests/UtilTest.php b/tests/UtilTest.php index 598ad54..876ca49 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -11,6 +11,17 @@ */ class UtilTest extends TestCase { + public function testPipeReturnsDestinationStream() + { + $readable = $this->getMock('React\Stream\ReadableStreamInterface'); + + $writable = $this->getMock('React\Stream\WritableStreamInterface'); + + $ret = Util::pipe($readable, $writable); + + $this->assertSame($writable, $ret); + } + public function testPipeShouldEmitEvents() { $readable = $this->getMock('React\Stream\ReadableStreamInterface'); From 59a8fcb7314ea8eb3f79983b4299ae14f1a80737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sun, 26 Feb 2017 23:01:48 +0100 Subject: [PATCH 2/4] Avoid forwarding events if either source or destination is closed --- README.md | 15 +++++++++ src/ReadableStreamInterface.php | 15 +++++++++ src/Util.php | 12 +++++++ tests/CompositeStreamTest.php | 3 ++ tests/ThroughStreamTest.php | 1 + tests/UtilTest.php | 57 +++++++++++++++++++++++++++------ 6 files changed, 93 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 127f1bc..7b8a220 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,21 @@ $source->on('close', function () use ($dest) { }); ``` +If the source stream is not readable (closed state), then this is a NO-OP. + +```php +$source->close(); +$source->pipe($dest); // NO-OP +``` + +If the destinantion stream is not writable (closed state), then this will simply +throttle (pause) the source stream: + +```php +$dest->close(); +$source->pipe($dest); // calls $source->pause() +``` + ## Writable Streams ### EventEmitter Events diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index 741dcb2..3b9f629 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -51,6 +51,21 @@ public function resume(); * }); * ``` * + * If the source stream is not readable (closed state), then this is a NO-OP. + * + * ```php + * $source->close(); + * $source->pipe($dest); // NO-OP + * ``` + * + * If the destinantion stream is not writable (closed state), then this will simply + * throttle (pause) the source stream: + * + * ```php + * $dest->close(); + * $source->pipe($dest); // calls $source->pause() + * ``` + * * @param WritableStreamInterface $dest * @param array $options * @return WritableStreamInterface $dest stream as-is diff --git a/src/Util.php b/src/Util.php index 6301916..8b6dee7 100644 --- a/src/Util.php +++ b/src/Util.php @@ -15,6 +15,18 @@ class Util */ public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array()) { + // source not readable => NO-OP + if (!$source->isReadable()) { + return $dest; + } + + // destination not writable => just pause() source + if (!$dest->isWritable()) { + $source->pause(); + + return $dest; + } + $dest->emit('pipe', array($source)); $source->on('data', function ($data) use ($source, $dest) { diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index 2c1eacd..8c63b7d 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -112,6 +112,7 @@ public function itShouldHandlePipingCorrectly() { $readable = $this->getMock('React\Stream\ReadableStreamInterface'); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable->expects($this->any())->method('isWritable')->willReturn(True); $writable ->expects($this->once()) ->method('write') @@ -154,9 +155,11 @@ public function itShouldForwardPipeCallsToReadableStream() { $readable = new ReadableStream(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable->expects($this->any())->method('isWritable')->willReturn(True); $composite = new CompositeStream($readable, $writable); $output = $this->getMock('React\Stream\WritableStreamInterface'); + $output->expects($this->any())->method('isWritable')->willReturn(True); $output ->expects($this->once()) ->method('write') diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index d6a6ca3..d68c7f4 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -135,6 +135,7 @@ public function doubleCloseShouldCloseOnce() public function pipeShouldPipeCorrectly() { $output = $this->getMock('React\Stream\WritableStreamInterface'); + $output->expects($this->any())->method('isWritable')->willReturn(True); $output ->expects($this->once()) ->method('write') diff --git a/tests/UtilTest.php b/tests/UtilTest.php index 876ca49..0f9fe94 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -22,23 +22,44 @@ public function testPipeReturnsDestinationStream() $this->assertSame($writable, $ret); } - public function testPipeShouldEmitEvents() + public function testPipeNonReadableSourceShouldDoNothing() { $readable = $this->getMock('React\Stream\ReadableStreamInterface'); $readable - ->expects($this->at(0)) - ->method('on') - ->with('data', $this->isInstanceOf('Closure')); + ->expects($this->any()) + ->method('isReadable') + ->willReturn(false); + + $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->never()) + ->method('isWritable'); + $writable + ->expects($this->never()) + ->method('end'); + + Util::pipe($readable, $writable); + } + + public function testPipeIntoNonWritableDestinationShouldPauseSource() + { + $readable = $this->getMock('React\Stream\ReadableStreamInterface'); $readable - ->expects($this->at(1)) - ->method('on') - ->with('end', $this->isInstanceOf('Closure')); + ->expects($this->any()) + ->method('isReadable') + ->willReturn(true); + $readable + ->expects($this->once()) + ->method('pause'); $writable = $this->getMock('React\Stream\WritableStreamInterface'); $writable - ->expects($this->at(0)) - ->method('emit') - ->with('pipe', array($readable)); + ->expects($this->any()) + ->method('isWritable') + ->willReturn(false); + $writable + ->expects($this->never()) + ->method('end'); Util::pipe($readable, $writable); } @@ -48,6 +69,10 @@ public function testPipeWithEnd() $readable = new Stub\ReadableStreamStub(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->once()) ->method('end'); @@ -62,6 +87,10 @@ public function testPipeWithoutEnd() $readable = new Stub\ReadableStreamStub(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->never()) ->method('end'); @@ -76,6 +105,10 @@ public function testPipeWithTooSlowWritableShouldPauseReadable() $readable = new Stub\ReadableStreamStub(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->once()) ->method('write') @@ -96,6 +129,10 @@ public function testPipeWithTooSlowWritableShouldResumeOnDrain() $onDrain = null; $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->once()) ->method('on') From 12dba41f7f75e19569e17907cc532cbad1fa14a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sun, 5 Mar 2017 11:58:10 +0100 Subject: [PATCH 3/4] Stop forwarding events if either source or destination is closed --- README.md | 8 ++++ src/ReadableStreamInterface.php | 8 ++++ src/Util.php | 19 +++++++-- tests/UtilTest.php | 71 +++++++++++++++++++++++++++++++-- 4 files changed, 100 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 7b8a220..8d48825 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,14 @@ $dest->close(); $source->pipe($dest); // calls $source->pause() ``` +Similarly, if the destination stream is closed while the pipe is still +active, it will also throttle (pause) the source stream: + +```php +$source->pipe($dest); +$dest->close(); // calls $source->pause() +``` + ## Writable Streams ### EventEmitter Events diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index 3b9f629..cab83c2 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -66,6 +66,14 @@ public function resume(); * $source->pipe($dest); // calls $source->pause() * ``` * + * Similarly, if the destination stream is closed while the pipe is still + * active, it will also throttle (pause) the source stream: + * + * ```php + * $source->pipe($dest); + * $dest->close(); // calls $source->pause() + * ``` + * * @param WritableStreamInterface $dest * @param array $options * @return WritableStreamInterface $dest stream as-is diff --git a/src/Util.php b/src/Util.php index 8b6dee7..dca4464 100644 --- a/src/Util.php +++ b/src/Util.php @@ -29,23 +29,36 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter $dest->emit('pipe', array($source)); - $source->on('data', function ($data) use ($source, $dest) { + // forward all source data events as $dest->write() + $source->on('data', $dataer = function ($data) use ($source, $dest) { $feedMore = $dest->write($data); if (false === $feedMore) { $source->pause(); } }); + $dest->on('close', function () use ($source, $dataer) { + $source->removeListener('data', $dataer); + $source->pause(); + }); - $dest->on('drain', function () use ($source) { + // forward destination drain as $source->resume() + $dest->on('drain', $drainer = function () use ($source) { $source->resume(); }); + $source->on('close', function () use ($dest, $drainer) { + $dest->removeListener('drain', $drainer); + }); + // forward end event from source as $dest->end() $end = isset($options['end']) ? $options['end'] : true; if ($end && $source !== $dest) { - $source->on('end', function () use ($dest) { + $source->on('end', $ender = function () use ($dest) { $dest->end(); }); + $dest->on('close', function () use ($source, $ender) { + $source->removeListener('end', $ender); + }); } return $dest; diff --git a/tests/UtilTest.php b/tests/UtilTest.php index 0f9fe94..4de6edf 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -5,6 +5,7 @@ use React\Stream\Buffer; use React\Stream\ReadableStream; use React\Stream\Util; +use React\Stream\WritableStream; /** * @covers React\Stream\Util @@ -64,6 +65,24 @@ public function testPipeIntoNonWritableDestinationShouldPauseSource() Util::pipe($readable, $writable); } + public function testPipeClosingDestPausesSource() + { + $readable = $this->getMock('React\Stream\ReadableStreamInterface'); + $readable + ->expects($this->any()) + ->method('isReadable') + ->willReturn(true); + $readable + ->expects($this->once()) + ->method('pause'); + + $writable = new WritableStream(); + + Util::pipe($readable, $writable); + + $writable->close(); + } + public function testPipeWithEnd() { $readable = new Stub\ReadableStreamStub(); @@ -134,17 +153,19 @@ public function testPipeWithTooSlowWritableShouldResumeOnDrain() ->method('isWritable') ->willReturn(true); $writable - ->expects($this->once()) + ->expects($this->any()) ->method('on') - ->with('drain', $this->isInstanceOf('Closure')) ->will($this->returnCallback(function ($name, $callback) use (&$onDrain) { - $onDrain = $callback; + if ($name === 'drain') { + $onDrain = $callback; + } })); $readable->pipe($writable); $readable->pause(); $this->assertTrue($readable->paused); + $this->assertNotNull($onDrain); $onDrain(); $this->assertFalse($readable->paused); } @@ -167,6 +188,50 @@ public function testPipeWithBuffer() $this->assertSame('hello, I am some random data', stream_get_contents($stream)); } + public function testPipeSetsUpListeners() + { + $source = new ReadableStream(); + $dest = new WritableStream(); + + $this->assertCount(0, $source->listeners('data')); + $this->assertCount(0, $source->listeners('end')); + $this->assertCount(0, $dest->listeners('drain')); + + Util::pipe($source, $dest); + + $this->assertCount(1, $source->listeners('data')); + $this->assertCount(1, $source->listeners('end')); + $this->assertCount(1, $dest->listeners('drain')); + } + + public function testPipeClosingSourceRemovesListeners() + { + $source = new ReadableStream(); + $dest = new WritableStream(); + + Util::pipe($source, $dest); + + $source->close(); + + $this->assertCount(0, $source->listeners('data')); + $this->assertCount(0, $source->listeners('end')); + $this->assertCount(0, $dest->listeners('drain')); + } + + public function testPipeClosingDestRemovesListeners() + { + $source = new ReadableStream(); + $dest = new WritableStream(); + + Util::pipe($source, $dest); + + $dest->close(); + + $this->assertCount(0, $source->listeners('data')); + $this->assertCount(0, $source->listeners('end')); + $this->assertCount(0, $dest->listeners('drain')); + } + /** @test */ public function forwardEventsShouldSetupForwards() { From c3110e45204ca273335f1bc3b33f62c1736c969f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sun, 5 Mar 2017 13:54:34 +0100 Subject: [PATCH 4/4] Piping duplex stream into itself will properly end() --- README.md | 8 ++++++++ src/ReadableStreamInterface.php | 8 ++++++++ src/Util.php | 2 +- tests/UtilTest.php | 16 ++++++++++++++++ 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8d48825..fa36171 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,14 @@ Automatically throttles the source based on what the destination can handle. $source->pipe($dest); ``` +Similarly, you can also pipe an instance implementing `DuplexStreamInterface` +into itself in order to write back all the data that is received. +This may be a useful feature for a TCP/IP echo service: + +```php +$connection->pipe($connection); +``` + This method returns the destination stream as-is, which can be used to set up chains of piped streams: diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index cab83c2..e4a5d14 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -26,6 +26,14 @@ public function resume(); * $source->pipe($dest); * ``` * + * Similarly, you can also pipe an instance implementing `DuplexStreamInterface` + * into itself in order to write back all the data that is received. + * This may be a useful feature for a TCP/IP echo service: + * + * ```php + * $connection->pipe($connection); + * ``` + * * This method returns the destination stream as-is, which can be used to * set up chains of piped streams: * diff --git a/src/Util.php b/src/Util.php index dca4464..d59728e 100644 --- a/src/Util.php +++ b/src/Util.php @@ -52,7 +52,7 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter // forward end event from source as $dest->end() $end = isset($options['end']) ? $options['end'] : true; - if ($end && $source !== $dest) { + if ($end) { $source->on('end', $ender = function () use ($dest) { $dest->end(); }); diff --git a/tests/UtilTest.php b/tests/UtilTest.php index 4de6edf..382bafd 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -6,6 +6,7 @@ use React\Stream\ReadableStream; use React\Stream\Util; use React\Stream\WritableStream; +use React\Stream\CompositeStream; /** * @covers React\Stream\Util @@ -232,6 +233,21 @@ public function testPipeClosingDestRemovesListeners() $this->assertCount(0, $dest->listeners('drain')); } + public function testPipeDuplexIntoSelfEndsOnEnd() + { + $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable->expects($this->any())->method('isReadable')->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable->expects($this->any())->method('isWritable')->willReturn(true); + $duplex = new CompositeStream($readable, $writable); + + Util::pipe($duplex, $duplex); + + $writable->expects($this->once())->method('end'); + + $duplex->emit('end'); + } + /** @test */ public function forwardEventsShouldSetupForwards() {