Skip to content

Commit

Permalink
Data callback is allowed to throw Exception
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Apr 24, 2017
1 parent e1ddc34 commit 62a939f
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,23 @@ $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
$through->write(array(2, true));
```

The callback function is allowed to throw an `Exception`. In this case,
the stream will emit an `error` event and then [`close()`](#close-1) the stream.

```php
$through = new ThroughStream(function ($data) {
if (!is_string($data)) {
throw new \UnexpectedValueException('Only strings allowed');
}
return $data;
});
$through->on('error', $this->expectCallableOnce()));
$through->on('close', $this->expectCallableOnce()));
$through->on('data', $this->expectCallableNever()));

$through->write(2);
```

## Usage
```php
$loop = React\EventLoop\Factory::create();
Expand Down
44 changes: 34 additions & 10 deletions src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@
* $through->write(array(2, true));
* ```
*
* The callback function is allowed to throw an `Exception`. In this case,
* the stream will emit an `error` event and then [`close()`](#close-1) the stream.
*
* ```php
* $through = new ThroughStream(function ($data) {
* if (!is_string($data)) {
* throw new \UnexpectedValueException('Only strings allowed');
* }
* return $data;
* });
* $through->on('error', $this->expectCallableOnce()));
* $through->on('close', $this->expectCallableOnce()));
* $through->on('data', $this->expectCallableNever()));
*
* $through->write(2);
* ```
*
* @see WritableStreamInterface::write()
* @see WritableStreamInterface::end()
* @see DuplexStreamInterface::close()
Expand Down Expand Up @@ -109,7 +126,18 @@ public function write($data)
return false;
}

$this->emit('data', array($this->filter($data)));
if ($this->callback !== null) {
try {
$data = call_user_func($this->callback, $data);
} catch (\Exception $e) {
$this->emit('error', array($e));
$this->close();

return false;
}
}

$this->emit('data', array($data));

if ($this->paused) {
$this->drain = true;
Expand All @@ -127,6 +155,11 @@ public function end($data = null)

if (null !== $data) {
$this->write($data);

// return if write() already caused the stream to close
if (!$this->writable) {
return;
}
}

$this->readable = false;
Expand All @@ -153,13 +186,4 @@ public function close()

$this->emit('close');
}

private function filter($data)
{
if ($this->callback !== null) {
$data = call_user_func($this->callback, $data);
}

return $data;
}
}
34 changes: 34 additions & 0 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,40 @@ public function itShouldEmitAnyDataWrittenToItPassedThruCallback()
$through->write('foo');
}

/** @test */
public function itShouldEmitErrorAndCloseIfCallbackThrowsException()
{
$through = new ThroughStream(function () {
throw new \RuntimeException();
});
$through->on('error', $this->expectCallableOnce());
$through->on('close', $this->expectCallableOnce());
$through->on('data', $this->expectCallableNever());
$through->on('end', $this->expectCallableNever());

$through->write('foo');

$this->assertFalse($through->isReadable());
$this->assertFalse($through->isWritable());
}

/** @test */
public function itShouldEmitErrorAndCloseIfCallbackThrowsExceptionOnEnd()
{
$through = new ThroughStream(function () {
throw new \RuntimeException();
});
$through->on('error', $this->expectCallableOnce());
$through->on('close', $this->expectCallableOnce());
$through->on('data', $this->expectCallableNever());
$through->on('end', $this->expectCallableNever());

$through->end('foo');

$this->assertFalse($through->isReadable());
$this->assertFalse($through->isWritable());
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
{
Expand Down

0 comments on commit 62a939f

Please sign in to comment.