diff --git a/README.md b/README.md index b3d0bce..fa36171 100644 --- a/README.md +++ b/README.md @@ -37,10 +37,71 @@ This component depends on `événement`, which is an implementation of the * `pause()`: Remove the data source file descriptor from the event loop. This allows you to throttle incoming data. * `resume()`: Re-attach the data source after a `pause()`. -* `pipe(WritableStreamInterface $dest, array $options = [])`: Pipe this - readable stream into a writable stream. Automatically sends all incoming - data to the destination. Automatically throttles based on what the - destination can handle. +* `pipe(WritableStreamInterface $dest, array $options = [])`: +Pipe all the data from this readable source into the given writable destination. + +Automatically sends all incoming data to the destination. +Automatically throttles the source based on what the destination can handle. + +```php +$source->pipe($dest); +``` + +Similarly, you can also pipe an instance implementing `DuplexStreamInterface` +into itself in order to write back all the data that is received. +This may be a useful feature for a TCP/IP echo service: + +```php +$connection->pipe($connection); +``` + +This method returns the destination stream as-is, which can be used to +set up chains of piped streams: + +```php +$source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest); +``` + +By default, this will call `end()` on the destination stream once the +source stream emits an `end` event. This can be disabled like this: + +```php +$source->pipe($dest, array('end' => false)); +``` + +Note that this only applies to the `end` event. +If an `error` or explicit `close` event happens on the source stream, +you'll have to manually close the destination stream: + +```php +$source->pipe($dest); +$source->on('close', function () use ($dest) { + $dest->end('BYE!'); +}); +``` + +If the source stream is not readable (closed state), then this is a NO-OP. + +```php +$source->close(); +$source->pipe($dest); // NO-OP +``` + +If the destinantion stream is not writable (closed state), then this will simply +throttle (pause) the source stream: + +```php +$dest->close(); +$source->pipe($dest); // calls $source->pause() +``` + +Similarly, if the destination stream is closed while the pipe is still +active, it will also throttle (pause) the source stream: + +```php +$source->pipe($dest); +$dest->close(); // calls $source->pause() +``` ## Writable Streams diff --git a/src/CompositeStream.php b/src/CompositeStream.php index 1446900..6ce863e 100644 --- a/src/CompositeStream.php +++ b/src/CompositeStream.php @@ -55,9 +55,7 @@ public function resume() public function pipe(WritableStreamInterface $dest, array $options = array()) { - Util::pipe($this, $dest, $options); - - return $dest; + return Util::pipe($this, $dest, $options); } public function isWritable() diff --git a/src/ReadableStream.php b/src/ReadableStream.php index bdf6c33..d0a4e58 100644 --- a/src/ReadableStream.php +++ b/src/ReadableStream.php @@ -23,9 +23,7 @@ public function resume() public function pipe(WritableStreamInterface $dest, array $options = array()) { - Util::pipe($this, $dest, $options); - - return $dest; + return Util::pipe($this, $dest, $options); } public function close() diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index 24e74c2..e4a5d14 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -15,6 +15,78 @@ interface ReadableStreamInterface extends EventEmitterInterface public function isReadable(); public function pause(); public function resume(); + + /** + * Pipes all the data from this readable source into the given writable destination. + * + * Automatically sends all incoming data to the destination. + * Automatically throttles the source based on what the destination can handle. + * + * ```php + * $source->pipe($dest); + * ``` + * + * Similarly, you can also pipe an instance implementing `DuplexStreamInterface` + * into itself in order to write back all the data that is received. + * This may be a useful feature for a TCP/IP echo service: + * + * ```php + * $connection->pipe($connection); + * ``` + * + * This method returns the destination stream as-is, which can be used to + * set up chains of piped streams: + * + * ```php + * $source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest); + * ``` + * + * By default, this will call `end()` on the destination stream once the + * source stream emits an `end` event. This can be disabled like this: + * + * ```php + * $source->pipe($dest, array('end' => false)); + * ``` + * + * Note that this only applies to the `end` event. + * If an `error` or explicit `close` event happens on the source stream, + * you'll have to manually close the destination stream: + * + * ```php + * $source->pipe($dest); + * $source->on('close', function () use ($dest) { + * $dest->end('BYE!'); + * }); + * ``` + * + * If the source stream is not readable (closed state), then this is a NO-OP. + * + * ```php + * $source->close(); + * $source->pipe($dest); // NO-OP + * ``` + * + * If the destinantion stream is not writable (closed state), then this will simply + * throttle (pause) the source stream: + * + * ```php + * $dest->close(); + * $source->pipe($dest); // calls $source->pause() + * ``` + * + * Similarly, if the destination stream is closed while the pipe is still + * active, it will also throttle (pause) the source stream: + * + * ```php + * $source->pipe($dest); + * $dest->close(); // calls $source->pause() + * ``` + * + * @param WritableStreamInterface $dest + * @param array $options + * @return WritableStreamInterface $dest stream as-is + */ public function pipe(WritableStreamInterface $dest, array $options = array()); + public function close(); } diff --git a/src/Stream.php b/src/Stream.php index 34386ae..56ed511 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -140,9 +140,7 @@ public function end($data = null) public function pipe(WritableStreamInterface $dest, array $options = array()) { - Util::pipe($this, $dest, $options); - - return $dest; + return Util::pipe($this, $dest, $options); } public function handleData($stream) diff --git a/src/Util.php b/src/Util.php index c2445a6..d59728e 100644 --- a/src/Util.php +++ b/src/Util.php @@ -2,36 +2,66 @@ namespace React\Stream; -// TODO: move to a trait - class Util { + /** + * Pipes all the data from the given $source into the $dest + * + * @param ReadableStreamInterface $source + * @param WritableStreamInterface $dest + * @param array $options + * @return WritableStreamInterface $dest stream as-is + * @see ReadableStreamInterface::pipe() for more details + */ public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array()) { - // TODO: use stream_copy_to_stream - // it is 4x faster than this - // but can lose data under load with no way to recover it + // source not readable => NO-OP + if (!$source->isReadable()) { + return $dest; + } + + // destination not writable => just pause() source + if (!$dest->isWritable()) { + $source->pause(); + + return $dest; + } $dest->emit('pipe', array($source)); - $source->on('data', function ($data) use ($source, $dest) { + // forward all source data events as $dest->write() + $source->on('data', $dataer = function ($data) use ($source, $dest) { $feedMore = $dest->write($data); if (false === $feedMore) { $source->pause(); } }); + $dest->on('close', function () use ($source, $dataer) { + $source->removeListener('data', $dataer); + $source->pause(); + }); - $dest->on('drain', function () use ($source) { + // forward destination drain as $source->resume() + $dest->on('drain', $drainer = function () use ($source) { $source->resume(); }); + $source->on('close', function () use ($dest, $drainer) { + $dest->removeListener('drain', $drainer); + }); + // forward end event from source as $dest->end() $end = isset($options['end']) ? $options['end'] : true; - if ($end && $source !== $dest) { - $source->on('end', function () use ($dest) { + if ($end) { + $source->on('end', $ender = function () use ($dest) { $dest->end(); }); + $dest->on('close', function () use ($source, $ender) { + $source->removeListener('end', $ender); + }); } + + return $dest; } public static function forwardEvents($source, $target, array $events) diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index 2c1eacd..8c63b7d 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -112,6 +112,7 @@ public function itShouldHandlePipingCorrectly() { $readable = $this->getMock('React\Stream\ReadableStreamInterface'); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable->expects($this->any())->method('isWritable')->willReturn(True); $writable ->expects($this->once()) ->method('write') @@ -154,9 +155,11 @@ public function itShouldForwardPipeCallsToReadableStream() { $readable = new ReadableStream(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable->expects($this->any())->method('isWritable')->willReturn(True); $composite = new CompositeStream($readable, $writable); $output = $this->getMock('React\Stream\WritableStreamInterface'); + $output->expects($this->any())->method('isWritable')->willReturn(True); $output ->expects($this->once()) ->method('write') diff --git a/tests/ReadableStreamTest.php b/tests/ReadableStreamTest.php index 12a77b7..c2b0f65 100644 --- a/tests/ReadableStreamTest.php +++ b/tests/ReadableStreamTest.php @@ -27,6 +27,15 @@ public function resumeShouldDoNothing() $readable->resume(); } + /** @test */ + public function pipeShouldReturnDestination() + { + $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $readable = new ReadableStream(); + + $this->assertSame($dest, $readable->pipe($dest)); + } + /** @test */ public function closeShouldClose() { diff --git a/tests/StreamTest.php b/tests/StreamTest.php index 801e064..44a3e5a 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -198,6 +198,17 @@ public function testEndedStreamsShouldNotWrite() unlink($file); } + public function testPipeShouldReturnDestination() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $conn = new Stream($stream, $loop); + $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + + $this->assertSame($dest, $conn->pipe($dest)); + } + public function testBufferEventsShouldBubbleUp() { $stream = fopen('php://temp', 'r+'); diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index d6a6ca3..d68c7f4 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -135,6 +135,7 @@ public function doubleCloseShouldCloseOnce() public function pipeShouldPipeCorrectly() { $output = $this->getMock('React\Stream\WritableStreamInterface'); + $output->expects($this->any())->method('isWritable')->willReturn(True); $output ->expects($this->once()) ->method('write') diff --git a/tests/UtilTest.php b/tests/UtilTest.php index 598ad54..382bafd 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -5,38 +5,94 @@ use React\Stream\Buffer; use React\Stream\ReadableStream; use React\Stream\Util; +use React\Stream\WritableStream; +use React\Stream\CompositeStream; /** * @covers React\Stream\Util */ class UtilTest extends TestCase { - public function testPipeShouldEmitEvents() + public function testPipeReturnsDestinationStream() + { + $readable = $this->getMock('React\Stream\ReadableStreamInterface'); + + $writable = $this->getMock('React\Stream\WritableStreamInterface'); + + $ret = Util::pipe($readable, $writable); + + $this->assertSame($writable, $ret); + } + + public function testPipeNonReadableSourceShouldDoNothing() { $readable = $this->getMock('React\Stream\ReadableStreamInterface'); $readable - ->expects($this->at(0)) - ->method('on') - ->with('data', $this->isInstanceOf('Closure')); + ->expects($this->any()) + ->method('isReadable') + ->willReturn(false); + + $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->never()) + ->method('isWritable'); + $writable + ->expects($this->never()) + ->method('end'); + + Util::pipe($readable, $writable); + } + + public function testPipeIntoNonWritableDestinationShouldPauseSource() + { + $readable = $this->getMock('React\Stream\ReadableStreamInterface'); $readable - ->expects($this->at(1)) - ->method('on') - ->with('end', $this->isInstanceOf('Closure')); + ->expects($this->any()) + ->method('isReadable') + ->willReturn(true); + $readable + ->expects($this->once()) + ->method('pause'); $writable = $this->getMock('React\Stream\WritableStreamInterface'); $writable - ->expects($this->at(0)) - ->method('emit') - ->with('pipe', array($readable)); + ->expects($this->any()) + ->method('isWritable') + ->willReturn(false); + $writable + ->expects($this->never()) + ->method('end'); Util::pipe($readable, $writable); } + public function testPipeClosingDestPausesSource() + { + $readable = $this->getMock('React\Stream\ReadableStreamInterface'); + $readable + ->expects($this->any()) + ->method('isReadable') + ->willReturn(true); + $readable + ->expects($this->once()) + ->method('pause'); + + $writable = new WritableStream(); + + Util::pipe($readable, $writable); + + $writable->close(); + } + public function testPipeWithEnd() { $readable = new Stub\ReadableStreamStub(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->once()) ->method('end'); @@ -51,6 +107,10 @@ public function testPipeWithoutEnd() $readable = new Stub\ReadableStreamStub(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->never()) ->method('end'); @@ -65,6 +125,10 @@ public function testPipeWithTooSlowWritableShouldPauseReadable() $readable = new Stub\ReadableStreamStub(); $writable = $this->getMock('React\Stream\WritableStreamInterface'); + $writable + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); $writable ->expects($this->once()) ->method('write') @@ -86,17 +150,23 @@ public function testPipeWithTooSlowWritableShouldResumeOnDrain() $writable = $this->getMock('React\Stream\WritableStreamInterface'); $writable - ->expects($this->once()) + ->expects($this->any()) + ->method('isWritable') + ->willReturn(true); + $writable + ->expects($this->any()) ->method('on') - ->with('drain', $this->isInstanceOf('Closure')) ->will($this->returnCallback(function ($name, $callback) use (&$onDrain) { - $onDrain = $callback; + if ($name === 'drain') { + $onDrain = $callback; + } })); $readable->pipe($writable); $readable->pause(); $this->assertTrue($readable->paused); + $this->assertNotNull($onDrain); $onDrain(); $this->assertFalse($readable->paused); } @@ -119,6 +189,65 @@ public function testPipeWithBuffer() $this->assertSame('hello, I am some random data', stream_get_contents($stream)); } + public function testPipeSetsUpListeners() + { + $source = new ReadableStream(); + $dest = new WritableStream(); + + $this->assertCount(0, $source->listeners('data')); + $this->assertCount(0, $source->listeners('end')); + $this->assertCount(0, $dest->listeners('drain')); + + Util::pipe($source, $dest); + + $this->assertCount(1, $source->listeners('data')); + $this->assertCount(1, $source->listeners('end')); + $this->assertCount(1, $dest->listeners('drain')); + } + + public function testPipeClosingSourceRemovesListeners() + { + $source = new ReadableStream(); + $dest = new WritableStream(); + + Util::pipe($source, $dest); + + $source->close(); + + $this->assertCount(0, $source->listeners('data')); + $this->assertCount(0, $source->listeners('end')); + $this->assertCount(0, $dest->listeners('drain')); + } + + public function testPipeClosingDestRemovesListeners() + { + $source = new ReadableStream(); + $dest = new WritableStream(); + + Util::pipe($source, $dest); + + $dest->close(); + + $this->assertCount(0, $source->listeners('data')); + $this->assertCount(0, $source->listeners('end')); + $this->assertCount(0, $dest->listeners('drain')); + } + + public function testPipeDuplexIntoSelfEndsOnEnd() + { + $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $readable->expects($this->any())->method('isReadable')->willReturn(true); + $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); + $writable->expects($this->any())->method('isWritable')->willReturn(true); + $duplex = new CompositeStream($readable, $writable); + + Util::pipe($duplex, $duplex); + + $writable->expects($this->once())->method('end'); + + $duplex->emit('end'); + } + /** @test */ public function forwardEventsShouldSetupForwards() {