Skip to content

Commit

Permalink
Piping duplex stream into itself will properly end()
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Mar 6, 2017
1 parent 12dba41 commit c3110e4
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 1 deletion.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ Automatically throttles the source based on what the destination can handle.
$source->pipe($dest);
```

Similarly, you can also pipe an instance implementing `DuplexStreamInterface`
into itself in order to write back all the data that is received.
This may be a useful feature for a TCP/IP echo service:

```php
$connection->pipe($connection);
```

This method returns the destination stream as-is, which can be used to
set up chains of piped streams:

Expand Down
8 changes: 8 additions & 0 deletions src/ReadableStreamInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ public function resume();
* $source->pipe($dest);
* ```
*
* Similarly, you can also pipe an instance implementing `DuplexStreamInterface`
* into itself in order to write back all the data that is received.
* This may be a useful feature for a TCP/IP echo service:
*
* ```php
* $connection->pipe($connection);
* ```
*
* This method returns the destination stream as-is, which can be used to
* set up chains of piped streams:
*
Expand Down
2 changes: 1 addition & 1 deletion src/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter

// forward end event from source as $dest->end()
$end = isset($options['end']) ? $options['end'] : true;
if ($end && $source !== $dest) {
if ($end) {
$source->on('end', $ender = function () use ($dest) {
$dest->end();
});
Expand Down
16 changes: 16 additions & 0 deletions tests/UtilTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\Stream\ReadableStream;
use React\Stream\Util;
use React\Stream\WritableStream;
use React\Stream\CompositeStream;

/**
* @covers React\Stream\Util
Expand Down Expand Up @@ -232,6 +233,21 @@ public function testPipeClosingDestRemovesListeners()
$this->assertCount(0, $dest->listeners('drain'));
}

public function testPipeDuplexIntoSelfEndsOnEnd()
{
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$readable->expects($this->any())->method('isReadable')->willReturn(true);
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
$writable->expects($this->any())->method('isWritable')->willReturn(true);
$duplex = new CompositeStream($readable, $writable);

Util::pipe($duplex, $duplex);

$writable->expects($this->once())->method('end');

$duplex->emit('end');
}

/** @test */
public function forwardEventsShouldSetupForwards()
{
Expand Down

0 comments on commit c3110e4

Please sign in to comment.