Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward compatibility with Stream v1.0 and v0.7 #184

Merged
merged 2 commits into from
May 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,11 @@ The `Response` class in this project supports to add an instance which implement
for the response body.
So you are able stream data directly into the response body.
Note that other implementations of the `PSR-7 ResponseInterface` likely
only support string.
only support strings.

```php
$server = new Server($socket, function (ServerRequestInterface $request) use ($loop) {
$stream = new ReadableStream();
$stream = new ThroughStream();

$timer = $loop->addPeriodicTimer(0.5, function () use ($stream) {
$stream->emit('data', array(microtime(true) . PHP_EOL));
Expand Down Expand Up @@ -513,7 +513,7 @@ pass this header yourself.
If you know the length of your stream body, you MAY specify it like this instead:

```php
$stream = new ReadableStream()
$stream = new ThroughStream()
$server = new Server($socket, function (ServerRequestInterface $request) use ($loop, $stream) {
return new Response(
200,
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"php": ">=5.3.0",
"ringcentral/psr7": "^1.2",
"react/socket": "^1.0 || ^0.8 || ^0.7 || ^0.6 || ^0.5",
"react/stream": "^0.6 || ^0.5 || ^0.4.4",
"react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.6",
"react/promise": "^2.0 || ^1.1",
"evenement/evenement": "^2.0 || ^1.0"
},
Expand All @@ -18,6 +18,7 @@
},
"require-dev": {
"phpunit/phpunit": "^4.8.10||^5.0",
"react/promise-stream": "^0.1.1",
"react/socket": "^1.0 || ^0.8 || ^0.7",
"clue/block-react": "^1.1"
}
Expand Down
4 changes: 2 additions & 2 deletions examples/05-stream-response.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
use React\Socket\Server;
use React\Http\Response;
use Psr\Http\Message\ServerRequestInterface;
use React\Stream\ReadableStream;
use React\Stream\ThroughStream;

require __DIR__ . '/../vendor/autoload.php';

$loop = Factory::create();
$socket = new Server(isset($argv[1]) ? $argv[1] : '0.0.0.0:0', $loop);

$server = new \React\Http\Server($socket, function (ServerRequestInterface $request) use ($loop) {
$stream = new ReadableStream();
$stream = new ThroughStream();

$timer = $loop->addPeriodicTimer(0.5, function () use ($stream) {
$stream->emit('data', array(microtime(true) . PHP_EOL));
Expand Down
31 changes: 28 additions & 3 deletions examples/99-benchmark-download.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@
use React\Socket\Server;
use React\Http\Response;
use Psr\Http\Message\ServerRequestInterface;
use React\Stream\ReadableStream;
use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;

require __DIR__ . '/../vendor/autoload.php';

$loop = Factory::create();
$socket = new Server(isset($argv[1]) ? $argv[1] : '0.0.0.0:0', $loop);

/** A readable stream that can emit a lot of data */
class ChunkRepeater extends ReadableStream
class ChunkRepeater extends EventEmitter implements ReadableStreamInterface
{
private $chunk;
private $count;
private $position = 0;
private $paused = true;
private $closed = false;

public function __construct($chunk, $count)
{
Expand All @@ -38,7 +41,7 @@ public function pause()

public function resume()
{
if (!$this->paused) {
if (!$this->paused || $this->closed) {
return;
}

Expand All @@ -56,6 +59,28 @@ public function resume()
}
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
return;
}

public function isReadable()
{
return !$this->closed;
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;
$this->count = 0;
$this->paused = true;
$this->emit('close');
}

public function getSize()
{
return strlen($this->chunk) * $this->count;
Expand Down
6 changes: 3 additions & 3 deletions tests/ChunkedDecoderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

namespace React\Tests\Http;

use React\Stream\ReadableStream;
use React\Stream\ThroughStream;
use React\Http\ChunkedDecoder;

class ChunkedDecoderTest extends TestCase
{
public function setUp()
{
$this->input = new ReadableStream();
$this->input = new ThroughStream();
$this->parser = new ChunkedDecoder($this->input);
}

Expand Down Expand Up @@ -386,7 +386,7 @@ public function testHandleClose()

public function testOutputStreamCanCloseInputStream()
{
$input = new ReadableStream();
$input = new ThroughStream();
$input->on('close', $this->expectCallableOnce());

$stream = new ChunkedDecoder($input);
Expand Down
4 changes: 2 additions & 2 deletions tests/ChunkedEncoderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace React\Tests\Http;

use React\Stream\ReadableStream;
use React\Stream\ThroughStream;
use React\Http\ChunkedEncoder;

class ChunkedEncoderTest extends TestCase
Expand All @@ -12,7 +12,7 @@ class ChunkedEncoderTest extends TestCase

public function setUp()
{
$this->input = new ReadableStream();
$this->input = new ThroughStream();
$this->chunkedStream = new ChunkedEncoder($this->input);
}

Expand Down
14 changes: 7 additions & 7 deletions tests/CloseProtectionStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace React\Tests\Http;

use React\Http\CloseProtectionStream;
use React\Stream\ReadableStream;
use React\Stream\ThroughStream;

class CloseProtectionStreamTest extends TestCase
{
Expand All @@ -19,7 +19,7 @@ public function testClosePausesTheInputStreamInsteadOfClosing()

public function testErrorWontCloseStream()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('error', $this->expectCallableOnce());
Expand All @@ -44,7 +44,7 @@ public function testResumeStreamWillResumeInputStream()

public function testInputStreamIsNotReadableAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('close', $this->expectCallableOnce());
Expand All @@ -57,7 +57,7 @@ public function testInputStreamIsNotReadableAfterClose()

public function testPipeStream()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
Expand All @@ -69,7 +69,7 @@ public function testPipeStream()

public function testStopEmittingDataAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
Expand All @@ -86,7 +86,7 @@ public function testStopEmittingDataAfterClose()

public function testErrorIsNeverCalledAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
Expand All @@ -103,7 +103,7 @@ public function testErrorIsNeverCalledAfterClose()

public function testEndWontBeEmittedAfterClose()
{
$input = new ReadableStream();
$input = new ThroughStream();

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
Expand Down
28 changes: 16 additions & 12 deletions tests/FunctionalServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
use Psr\Http\Message\RequestInterface;
use React\Socket\Connector;
use React\Socket\ConnectionInterface;
use React\Stream\BufferedSink;
use Clue\React\Block;
use React\Http\Response;
use React\Socket\SecureServer;
use React\Stream\ReadableStreamInterface;
use React\EventLoop\LoopInterface;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Stream;

class FunctionalServerTest extends TestCase
{
Expand All @@ -28,7 +32,7 @@ public function testPlainHttpOnRandomPort()
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\nHost: " . noScheme($conn->getRemoteAddress()) . "\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand All @@ -52,7 +56,7 @@ public function testPlainHttpOnRandomPortWithoutHostHeaderUsesSocketUri()
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand All @@ -76,7 +80,7 @@ public function testPlainHttpOnRandomPortWithOtherHostHeaderTakesPrecedence()
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\nHost: localhost:1000\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -109,7 +113,7 @@ public function testSecureHttpsOnRandomPort()
$result = $connector->connect('tls://' . noScheme($socket->getAddress()))->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\nHost: " . noScheme($conn->getRemoteAddress()) . "\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -142,7 +146,7 @@ public function testSecureHttpsOnRandomPortWithoutHostHeaderUsesSocketUri()
$result = $connector->connect('tls://' . noScheme($socket->getAddress()))->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -170,7 +174,7 @@ public function testPlainHttpOnStandardPortReturnsUriWithNoPort()
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\nHost: 127.0.0.1\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -198,7 +202,7 @@ public function testPlainHttpOnStandardPortWithoutHostHeaderReturnsUriWithNoPort
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -235,7 +239,7 @@ public function testSecureHttpsOnStandardPortReturnsUriWithNoPort()
$result = $connector->connect('tls://' . noScheme($socket->getAddress()))->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\nHost: 127.0.0.1\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -272,7 +276,7 @@ public function testSecureHttpsOnStandardPortWithoutHostHeaderUsesSocketUri()
$result = $connector->connect('tls://' . noScheme($socket->getAddress()))->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -300,7 +304,7 @@ public function testPlainHttpOnHttpsStandardPortReturnsUriWithPort()
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\nHost: " . noScheme($conn->getRemoteAddress()) . "\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down Expand Up @@ -337,7 +341,7 @@ public function testSecureHttpsOnHttpStandardPortReturnsUriWithPort()
$result = $connector->connect('tls://' . noScheme($socket->getAddress()))->then(function (ConnectionInterface $conn) {
$conn->write("GET / HTTP/1.0\r\nHost: " . noScheme($conn->getRemoteAddress()) . "\r\n\r\n");

return BufferedSink::createPromise($conn);
return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);
Expand Down
4 changes: 2 additions & 2 deletions tests/HttpBodyStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace React\Tests\Http;

use React\Http\HttpBodyStream;
use React\Stream\ReadableStream;
use React\Stream\ThroughStream;

class HttpBodyStreamTest extends TestCase
{
Expand All @@ -12,7 +12,7 @@ class HttpBodyStreamTest extends TestCase

public function setUp()
{
$this->input = new ReadableStream();
$this->input = new ThroughStream();
$this->bodyStream = new HttpBodyStream($this->input, null);
}

Expand Down
6 changes: 3 additions & 3 deletions tests/LengthLimitedStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace React\Tests\Http;

use React\Http\LengthLimitedStream;
use React\Stream\ReadableStream;
use React\Stream\ThroughStream;

class LengthLimitedStreamTest extends TestCase
{
Expand All @@ -12,7 +12,7 @@ class LengthLimitedStreamTest extends TestCase

public function setUp()
{
$this->input = new ReadableStream();
$this->input = new ThroughStream();
}

public function testSimpleChunk()
Expand Down Expand Up @@ -95,7 +95,7 @@ public function testHandleClose()

public function testOutputStreamCanCloseInputStream()
{
$input = new ReadableStream();
$input = new ThroughStream();
$input->on('close', $this->expectCallableOnce());

$stream = new LengthLimitedStream($input, 0);
Expand Down
Loading