Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent pipe() semantics for closed and closing streams #71

Merged
merged 4 commits into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,71 @@ This component depends on `événement`, which is an implementation of the
* `pause()`: Remove the data source file descriptor from the event loop. This
allows you to throttle incoming data.
* `resume()`: Re-attach the data source after a `pause()`.
* `pipe(WritableStreamInterface $dest, array $options = [])`: Pipe this
readable stream into a writable stream. Automatically sends all incoming
data to the destination. Automatically throttles based on what the
destination can handle.
* `pipe(WritableStreamInterface $dest, array $options = [])`:
Pipe all the data from this readable source into the given writable destination.

Automatically sends all incoming data to the destination.
Automatically throttles the source based on what the destination can handle.

```php
$source->pipe($dest);
```

Similarly, you can also pipe an instance implementing `DuplexStreamInterface`
into itself in order to write back all the data that is received.
This may be a useful feature for a TCP/IP echo service:

```php
$connection->pipe($connection);
```

This method returns the destination stream as-is, which can be used to
set up chains of piped streams:

```php
$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);
```

By default, this will call `end()` on the destination stream once the
source stream emits an `end` event. This can be disabled like this:

```php
$source->pipe($dest, array('end' => false));
```

Note that this only applies to the `end` event.
If an `error` or explicit `close` event happens on the source stream,
you'll have to manually close the destination stream:

```php
$source->pipe($dest);
$source->on('close', function () use ($dest) {
$dest->end('BYE!');
});
```

If the source stream is not readable (closed state), then this is a NO-OP.

```php
$source->close();
$source->pipe($dest); // NO-OP
```

If the destinantion stream is not writable (closed state), then this will simply
throttle (pause) the source stream:

```php
$dest->close();
$source->pipe($dest); // calls $source->pause()
```

Similarly, if the destination stream is closed while the pipe is still
active, it will also throttle (pause) the source stream:

```php
$source->pipe($dest);
$dest->close(); // calls $source->pause()
```

## Writable Streams

Expand Down
4 changes: 1 addition & 3 deletions src/CompositeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ public function resume()

public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);

return $dest;
return Util::pipe($this, $dest, $options);
}

public function isWritable()
Expand Down
4 changes: 1 addition & 3 deletions src/ReadableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ public function resume()

public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);

return $dest;
return Util::pipe($this, $dest, $options);
}

public function close()
Expand Down
72 changes: 72 additions & 0 deletions src/ReadableStreamInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,78 @@ interface ReadableStreamInterface extends EventEmitterInterface
public function isReadable();
public function pause();
public function resume();

/**
* Pipes all the data from this readable source into the given writable destination.
*
* Automatically sends all incoming data to the destination.
* Automatically throttles the source based on what the destination can handle.
*
* ```php
* $source->pipe($dest);
* ```
*
* Similarly, you can also pipe an instance implementing `DuplexStreamInterface`
* into itself in order to write back all the data that is received.
* This may be a useful feature for a TCP/IP echo service:
*
* ```php
* $connection->pipe($connection);
* ```
*
* This method returns the destination stream as-is, which can be used to
* set up chains of piped streams:
*
* ```php
* $source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);
* ```
*
* By default, this will call `end()` on the destination stream once the
* source stream emits an `end` event. This can be disabled like this:
*
* ```php
* $source->pipe($dest, array('end' => false));
* ```
*
* Note that this only applies to the `end` event.
* If an `error` or explicit `close` event happens on the source stream,
* you'll have to manually close the destination stream:
*
* ```php
* $source->pipe($dest);
* $source->on('close', function () use ($dest) {
* $dest->end('BYE!');
* });
* ```
*
* If the source stream is not readable (closed state), then this is a NO-OP.
*
* ```php
* $source->close();
* $source->pipe($dest); // NO-OP
* ```
*
* If the destinantion stream is not writable (closed state), then this will simply
* throttle (pause) the source stream:
*
* ```php
* $dest->close();
* $source->pipe($dest); // calls $source->pause()
* ```
*
* Similarly, if the destination stream is closed while the pipe is still
* active, it will also throttle (pause) the source stream:
*
* ```php
* $source->pipe($dest);
* $dest->close(); // calls $source->pause()
* ```
*
* @param WritableStreamInterface $dest
* @param array $options
* @return WritableStreamInterface $dest stream as-is
*/
public function pipe(WritableStreamInterface $dest, array $options = array());

public function close();
}
4 changes: 1 addition & 3 deletions src/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public function end($data = null)

public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);

return $dest;
return Util::pipe($this, $dest, $options);
}

public function handleData($stream)
Expand Down
48 changes: 39 additions & 9 deletions src/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,66 @@

namespace React\Stream;

// TODO: move to a trait

class Util
{
/**
* Pipes all the data from the given $source into the $dest
*
* @param ReadableStreamInterface $source
* @param WritableStreamInterface $dest
* @param array $options
* @return WritableStreamInterface $dest stream as-is
* @see ReadableStreamInterface::pipe() for more details
*/
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
{
// TODO: use stream_copy_to_stream
// it is 4x faster than this
// but can lose data under load with no way to recover it
// source not readable => NO-OP
if (!$source->isReadable()) {
return $dest;
}

// destination not writable => just pause() source
if (!$dest->isWritable()) {
$source->pause();

return $dest;
}

$dest->emit('pipe', array($source));

$source->on('data', function ($data) use ($source, $dest) {
// forward all source data events as $dest->write()
$source->on('data', $dataer = function ($data) use ($source, $dest) {
$feedMore = $dest->write($data);

if (false === $feedMore) {
$source->pause();
}
});
$dest->on('close', function () use ($source, $dataer) {
$source->removeListener('data', $dataer);
$source->pause();
});

$dest->on('drain', function () use ($source) {
// forward destination drain as $source->resume()
$dest->on('drain', $drainer = function () use ($source) {
$source->resume();
});
$source->on('close', function () use ($dest, $drainer) {
$dest->removeListener('drain', $drainer);
});

// forward end event from source as $dest->end()
$end = isset($options['end']) ? $options['end'] : true;
if ($end && $source !== $dest) {
$source->on('end', function () use ($dest) {
if ($end) {
$source->on('end', $ender = function () use ($dest) {
$dest->end();
});
$dest->on('close', function () use ($source, $ender) {
$source->removeListener('end', $ender);
});
}

return $dest;
}

public static function forwardEvents($source, $target, array $events)
Expand Down
3 changes: 3 additions & 0 deletions tests/CompositeStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public function itShouldHandlePipingCorrectly()
{
$readable = $this->getMock('React\Stream\ReadableStreamInterface');
$writable = $this->getMock('React\Stream\WritableStreamInterface');
$writable->expects($this->any())->method('isWritable')->willReturn(True);
$writable
->expects($this->once())
->method('write')
Expand Down Expand Up @@ -154,9 +155,11 @@ public function itShouldForwardPipeCallsToReadableStream()
{
$readable = new ReadableStream();
$writable = $this->getMock('React\Stream\WritableStreamInterface');
$writable->expects($this->any())->method('isWritable')->willReturn(True);
$composite = new CompositeStream($readable, $writable);

$output = $this->getMock('React\Stream\WritableStreamInterface');
$output->expects($this->any())->method('isWritable')->willReturn(True);
$output
->expects($this->once())
->method('write')
Expand Down
9 changes: 9 additions & 0 deletions tests/ReadableStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ public function resumeShouldDoNothing()
$readable->resume();
}

/** @test */
public function pipeShouldReturnDestination()
{
$dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
$readable = new ReadableStream();

$this->assertSame($dest, $readable->pipe($dest));
}

/** @test */
public function closeShouldClose()
{
Expand Down
11 changes: 11 additions & 0 deletions tests/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ public function testEndedStreamsShouldNotWrite()
unlink($file);
}

public function testPipeShouldReturnDestination()
{
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$conn = new Stream($stream, $loop);
$dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();

$this->assertSame($dest, $conn->pipe($dest));
}

public function testBufferEventsShouldBubbleUp()
{
$stream = fopen('php://temp', 'r+');
Expand Down
1 change: 1 addition & 0 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public function doubleCloseShouldCloseOnce()
public function pipeShouldPipeCorrectly()
{
$output = $this->getMock('React\Stream\WritableStreamInterface');
$output->expects($this->any())->method('isWritable')->willReturn(True);
$output
->expects($this->once())
->method('write')
Expand Down
Loading