Skip to content

Commit

Permalink
Forward compatibility with Stream v1.0 and v0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed May 13, 2017
1 parent ce9fb0b commit 5a75c3b
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 56 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,11 @@ The `Response` class in this project supports to add an instance which implement
for the response body.
So you are able stream data directly into the response body.
Note that other implementations of the `PSR-7 ResponseInterface` likely
only support string.
only support strings.

```php
$server = new Server($socket, function (ServerRequestInterface $request) use ($loop) {
$stream = new ReadableStream();
$stream = new ThroughStream();

$timer = $loop->addPeriodicTimer(0.5, function () use ($stream) {
$stream->emit('data', array(microtime(true) . PHP_EOL));
Expand Down Expand Up @@ -513,7 +513,7 @@ pass this header yourself.
If you know the length of your stream body, you MAY specify it like this instead:

```php
$stream = new ReadableStream()
$stream = new ThroughStream()
$server = new Server($socket, function (ServerRequestInterface $request) use ($loop, $stream) {
return new Response(
200,
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"php": ">=5.3.0",
"ringcentral/psr7": "^1.2",
"react/socket": "^1.0 || ^0.8 || ^0.7 || ^0.6 || ^0.5",
"react/stream": "^0.6 || ^0.5 || ^0.4.4",
"react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.4",
"react/promise": "^2.0 || ^1.1",
"evenement/evenement": "^2.0 || ^1.0"
},
Expand Down
4 changes: 2 additions & 2 deletions examples/05-stream-response.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
use React\Socket\Server;
use React\Http\Response;
use Psr\Http\Message\ServerRequestInterface;
use React\Stream\ReadableStream;
use React\Stream\ThroughStream;

require __DIR__ . '/../vendor/autoload.php';

$loop = Factory::create();
$socket = new Server(isset($argv[1]) ? $argv[1] : '0.0.0.0:0', $loop);

$server = new \React\Http\Server($socket, function (ServerRequestInterface $request) use ($loop) {
$stream = new ReadableStream();
$stream = new ThroughStream();

$timer = $loop->addPeriodicTimer(0.5, function () use ($stream) {
$stream->emit('data', array(microtime(true) . PHP_EOL));
Expand Down
31 changes: 28 additions & 3 deletions examples/99-benchmark-download.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@
use React\Socket\Server;
use React\Http\Response;
use Psr\Http\Message\ServerRequestInterface;
use React\Stream\ReadableStream;
use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;

require __DIR__ . '/../vendor/autoload.php';

$loop = Factory::create();
$socket = new Server(isset($argv[1]) ? $argv[1] : '0.0.0.0:0', $loop);

/** A readable stream that can emit a lot of data */
class ChunkRepeater extends ReadableStream
class ChunkRepeater extends EventEmitter implements ReadableStreamInterface
{
private $chunk;
private $count;
private $position = 0;
private $paused = true;
private $closed = false;

public function __construct($chunk, $count)
{
Expand All @@ -38,7 +41,7 @@ public function pause()

public function resume()
{
if (!$this->paused) {
if (!$this->paused || $this->closed) {
return;
}

Expand All @@ -56,6 +59,28 @@ public function resume()
}
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
return;
}

public function isReadable()
{
return !$this->closed;
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;
$this->count = 0;
$this->paused = true;
$this->emit('close');
}

public function getSize()
{
return strlen($this->chunk) * $this->count;
Expand Down
6 changes: 3 additions & 3 deletions tests/ChunkedDecoderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

namespace React\Tests\Http;

use React\Stream\ReadableStream;
use React\Stream\ThroughStream;
use React\Http\ChunkedDecoder;

class ChunkedDecoderTest extends TestCase
{
public function setUp()
{
$this->input = new ReadableStream();
$this->input = new ThroughStream();
$this->parser = new ChunkedDecoder($this->input);
}

Expand Down Expand Up @@ -386,7 +386,7 @@ public function testHandleClose()

public function testOutputStreamCanCloseInputStream()
{
$input = new ReadableStream();
$input = new ThroughStream();
$input->on('close', $this->expectCallableOnce());

$stream = new ChunkedDecoder($input);
Expand Down
4 changes: 2 additions & 2 deletions tests/ChunkedEncoderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace React\Tests\Http;

use React\Stream\ReadableStream;
use React\Stream\ThroughStream;
use React\Http\ChunkedEncoder;

class ChunkedEncoderTest extends TestCase
Expand All @@ -12,7 +12,7 @@ class ChunkedEncoderTest extends TestCase

public function setUp()
{
$this->input = new ReadableStream();
$this->input = new ThroughStream();
$this->chunkedStream = new ChunkedEncoder($this->input);
}

Expand Down
14 changes: 7 additions & 7 deletions tests/CloseProtectionStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace React\Tests\Http;

use React\Http\CloseProtectionStream;
use React\Stream\ReadableStream;
use React\Stream\ThroughStream;

class CloseProtectionStreamTest extends TestCase
{
Expand All @@ -19,7 +19,7 @@ public function testClosePausesTheInputStreamInsteadOfClosing()

public function testErrorWontCloseStream()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('error', $this->expectCallableOnce());
Expand All @@ -44,7 +44,7 @@ public function testResumeStreamWillResumeInputStream()

public function testInputStreamIsNotReadableAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('close', $this->expectCallableOnce());
Expand All @@ -57,7 +57,7 @@ public function testInputStreamIsNotReadableAfterClose()

public function testPipeStream()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
Expand All @@ -69,7 +69,7 @@ public function testPipeStream()

public function testStopEmittingDataAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
Expand All @@ -86,7 +86,7 @@ public function testStopEmittingDataAfterClose()

public function testErrorIsNeverCalledAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
Expand All @@ -103,7 +103,7 @@ public function testErrorIsNeverCalledAfterClose()

public function testEndWontBeEmittedAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
Expand Down
Loading

0 comments on commit 5a75c3b

Please sign in to comment.