diff --git a/README.md b/README.md index 8d48825..fa36171 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,14 @@ Automatically throttles the source based on what the destination can handle. $source->pipe($dest); ``` +Similarly, you can also pipe an instance implementing `DuplexStreamInterface` +into itself in order to write back all the data that is received. +This may be a useful feature for a TCP/IP echo service: + +```php +$connection->pipe($connection); +``` + This method returns the destination stream as-is, which can be used to set up chains of piped streams: diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index cab83c2..e4a5d14 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -26,6 +26,14 @@ public function resume(); * $source->pipe($dest); * ``` * + * Similarly, you can also pipe an instance implementing `DuplexStreamInterface` + * into itself in order to write back all the data that is received. + * This may be a useful feature for a TCP/IP echo service: + * + * ```php + * $connection->pipe($connection); + * ``` + * * This method returns the destination stream as-is, which can be used to * set up chains of piped streams: * diff --git a/src/Util.php b/src/Util.php index dca4464..d59728e 100644 --- a/src/Util.php +++ b/src/Util.php @@ -52,7 +52,7 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter // forward end event from source as $dest->end() $end = isset($options['end']) ? $options['end'] : true; - if ($end && $source !== $dest) { + if ($end) { $source->on('end', $ender = function () use ($dest) { $dest->end(); }); diff --git a/tests/UtilTest.php b/tests/UtilTest.php index 4de6edf..382bafd 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -6,6 +6,7 @@ use React\Stream\ReadableStream; use React\Stream\Util; use React\Stream\WritableStream; +use React\Stream\CompositeStream; /** * @covers React\Stream\Util @@ -232,6 +233,21 @@ public function testPipeClosingDestRemovesListeners() $this->assertCount(0, $dest->listeners('drain')); } + public function testPipeDuplexIntoSelfEndsOnEnd() + { + $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable->expects($this->any())->method('isReadable')->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable->expects($this->any())->method('isWritable')->willReturn(true); + $duplex = new CompositeStream($readable, $writable); + + Util::pipe($duplex, $duplex); + + $writable->expects($this->once())->method('end'); + + $duplex->emit('end'); + } + /** @test */ public function forwardEventsShouldSetupForwards() {