Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ThroughStream by using data callback instead of inheritance #89

Merged
merged 3 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ descriptor based implementation with an in-memory write buffer.
* [ReadableResourceStream](#readableresourcestream)
* [WritableResourceStream](#writableresourcestream)
* [DuplexResourceStream](#duplexresourcestream)
* [ThroughStream](#throughstream)
* [Usage](#usage)
* [Install](#install)
* [Tests](#tests)
Expand Down Expand Up @@ -924,6 +925,70 @@ $buffer->softLimit = 8192;

See also [`write()`](#write) for more details.

### ThroughStream

The `ThroughStream` implements the
[`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
you write to it through to its readable end.

```php
$through = new ThroughStream();
$through->on('data', $this->expectCallableOnceWith('hello'));

$through->write('hello');
```

Similarly, the [`end()` method](#end) will end the stream and emit an
[`end` event](#end-event) and then [`close()`](#close-1) the stream.
The [`close()` method](#close-1) will close the stream and emit a
[`close` event](#close-event).
Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:

```php
$through = new ThroughStream();
$source->pipe($through)->pipe($dest);
```

Optionally, its constructor accepts any callable function which will then be
used to *filter* any data written to it. This function receives a single data
argument as passed to the writable side and must return the data as it will be
passed to its readable end:

```php
$through = new ThroughStream('strtoupper');
$source->pipe($through)->pipe($dest);
```

Note that this class makes no assumptions about any data types. This can be
used to convert data, for example for transforming any structured data into
a newline-delimited JSON (NDJSON) stream like this:

```php
$through = new ThroughStream(function ($data) {
return json_encode($data) . PHP_EOL;
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));

$through->write(array(2, true));
```

The callback function is allowed to throw an `Exception`. In this case,
the stream will emit an `error` event and then [`close()`](#close-1) the stream.

```php
$through = new ThroughStream(function ($data) {
if (!is_string($data)) {
throw new \UnexpectedValueException('Only strings allowed');
}
return $data;
});
$through->on('error', $this->expectCallableOnce()));
$through->on('close', $this->expectCallableOnce()));
$through->on('data', $this->expectCallableNever()));

$through->write(2);
```

## Usage
```php
$loop = React\EventLoop\Factory::create();
Expand Down
99 changes: 95 additions & 4 deletions src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,92 @@
namespace React\Stream;

use Evenement\EventEmitter;

use InvalidArgumentException;

/**
* The `ThroughStream` implements the
* [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
* you write to it through to its readable end.
*
* ```php
* $through = new ThroughStream();
* $through->on('data', $this->expectCallableOnceWith('hello'));
*
* $through->write('hello');
* ```
*
* Similarly, the [`end()` method](#end) will end the stream and emit an
* [`end` event](#end-event) and then [`close()`](#close-1) the stream.
* The [`close()` method](#close-1) will close the stream and emit a
* [`close` event](#close-event).
* Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:
*
* ```php
* $through = new ThroughStream();
* $source->pipe($through)->pipe($dest);
* ```
*
* Optionally, its constructor accepts any callable function which will then be
* used to *filter* any data written to it. This function receives a single data
* argument as passed to the writable side and must return the data as it will be
* passed to its readable end:
*
* ```php
* $through = new ThroughStream('strtoupper');
* $source->pipe($through)->pipe($dest);
* ```
*
* Note that this class makes no assumptions about any data types. This can be
* used to convert data, for example for transforming any structured data into
* a newline-delimited JSON (NDJSON) stream like this:
*
* ```php
* $through = new ThroughStream(function ($data) {
* return json_encode($data) . PHP_EOL;
* });
* $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
*
* $through->write(array(2, true));
* ```
*
* The callback function is allowed to throw an `Exception`. In this case,
* the stream will emit an `error` event and then [`close()`](#close-1) the stream.
*
* ```php
* $through = new ThroughStream(function ($data) {
* if (!is_string($data)) {
* throw new \UnexpectedValueException('Only strings allowed');
* }
* return $data;
* });
* $through->on('error', $this->expectCallableOnce()));
* $through->on('close', $this->expectCallableOnce()));
* $through->on('data', $this->expectCallableNever()));
*
* $through->write(2);
* ```
*
* @see WritableStreamInterface::write()
* @see WritableStreamInterface::end()
* @see DuplexStreamInterface::close()
* @see WritableStreamInterface::pipe()
*/
class ThroughStream extends EventEmitter implements DuplexStreamInterface
{
private $readable = true;
private $writable = true;
private $closed = false;
private $paused = false;
private $drain = false;
private $callback;

public function filter($data)
public function __construct($callback = null)
{
return $data;
if ($callback !== null && !is_callable($callback)) {
throw new InvalidArgumentException('Invalid transformation callback given');
}

$this->callback = $callback;
}

public function pause()
Expand Down Expand Up @@ -52,7 +126,18 @@ public function write($data)
return false;
}

$this->emit('data', array($this->filter($data)));
if ($this->callback !== null) {
try {
$data = call_user_func($this->callback, $data);
} catch (\Exception $e) {
$this->emit('error', array($e));
$this->close();

return false;
}
}

$this->emit('data', array($data));

if ($this->paused) {
$this->drain = true;
Expand All @@ -70,6 +155,11 @@ public function end($data = null)

if (null !== $data) {
$this->write($data);

// return if write() already caused the stream to close
if (!$this->writable) {
return;
}
}

$this->readable = false;
Expand All @@ -92,6 +182,7 @@ public function close()
$this->closed = true;
$this->paused = true;
$this->drain = false;
$this->callback = null;

$this->emit('close');
}
Expand Down
59 changes: 59 additions & 0 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
*/
class ThroughStreamTest extends TestCase
{
/**
* @test
* @expectedException InvalidArgumentException
*/
public function itShouldRejectInvalidCallback()
{
new ThroughStream(123);
}

/** @test */
public function itShouldReturnTrueForAnyDataWrittenToIt()
{
Expand All @@ -27,6 +36,56 @@ public function itShouldEmitAnyDataWrittenToIt()
$through->write('foo');
}

/** @test */
public function itShouldEmitAnyDataWrittenToItPassedThruFunction()
{
$through = new ThroughStream('strtoupper');
$through->on('data', $this->expectCallableOnceWith('FOO'));
$through->write('foo');
}

/** @test */
public function itShouldEmitAnyDataWrittenToItPassedThruCallback()
{
$through = new ThroughStream('strtoupper');
$through->on('data', $this->expectCallableOnceWith('FOO'));
$through->write('foo');
}

/** @test */
public function itShouldEmitErrorAndCloseIfCallbackThrowsException()
{
$through = new ThroughStream(function () {
throw new \RuntimeException();
});
$through->on('error', $this->expectCallableOnce());
$through->on('close', $this->expectCallableOnce());
$through->on('data', $this->expectCallableNever());
$through->on('end', $this->expectCallableNever());

$through->write('foo');

$this->assertFalse($through->isReadable());
$this->assertFalse($through->isWritable());
}

/** @test */
public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd()
{
$through = new ThroughStream(function () {
throw new \RuntimeException();
});
$through->on('error', $this->expectCallableOnce());
$through->on('close', $this->expectCallableOnce());
$through->on('data', $this->expectCallableNever());
$through->on('end', $this->expectCallableNever());

$through->end('foo');

$this->assertFalse($through->isReadable());
$this->assertFalse($through->isWritable());
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
{
Expand Down