Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Expose DuplexStreamInterface for connections that requires Upgrades #382

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/Browser.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use React\Http\Io\Sender;
use React\Http\Io\Transaction;
use React\Promise\PromiseInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
use React\Stream\ReadableStreamInterface;
use InvalidArgumentException;
Expand Down Expand Up @@ -707,7 +708,7 @@ public function withResponseBuffer($maximumSize)
* @see self::withFollowRedirects()
* @see self::withRejectErrorResponse()
*/
private function withOptions(array $options)
public function withOptions(array $options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I like exposing the withOptions() method again. It has been deprecated with clue/reactphp-buzz#172 not too long ago.

Do we really need this method? It's my understanding we could just rely on the Connection: upgrade and/or Upgrade: … request header(s) being present?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clue I didn't know the method was deprecated.

But my thought behind it was to let the consumer using Browser to be able to choose to watch out for the upgrade event or not. As by default, Browser will not. This means that, if an upgrade did happen, I need to configure browser to capture it like $browser->withOptions(['upgrade' => true]) (default is false in Transaction.php).

This, however, won't be necessary if the default mode should be to capture the upgrade. That was the reason I made the method public (also seeing it was public in clue/reactphp-buzz)

{
$browser = clone $this;
$browser->transaction = $this->transaction->withOptions($options);
Expand All @@ -720,7 +721,7 @@ private function withOptions(array $options)
* @param string $url
* @param array $headers
* @param string|ReadableStreamInterface $body
* @return PromiseInterface<ResponseInterface,Exception>
* @return PromiseInterface<ResponseInterface,Exception, ConnectionInterface>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this is supposed to mean? The promise either fulfills with a ResponseInterface or rejects with an Exception.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it in the case, where an upgrade did happen, the promise will resolve with a ConnectionInterface rather than an Exception or a ResponseInterface. Still not sure if this is the best way to express that.

*/
private function requestMayBeStreaming($method, $url, array $headers = array(), $body = '')
{
Expand Down
27 changes: 24 additions & 3 deletions src/Client/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe
});
}

protected function responseIsAnUpgradeResponse($response)
{
return
$response->hasHeader('Connection') &&
(in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) &&
(int) $response->getStatusCode() === 101;
}

public function write($data)
{
if (!$this->isWritable()) {
Expand Down Expand Up @@ -138,7 +146,16 @@ public function handleData($data)
// buffer until double CRLF (or double LF for compatibility with legacy servers)
if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) {
try {
list($response, $bodyChunk) = $this->parseResponse($this->buffer);
$psrResponse = gPsr\parse_response($this->buffer);

if ($this->responseIsAnUpgradeResponse($psrResponse)) {
$this->stream->removeListener('data', array($this, 'handleData'));

$this->emit('upgrade', array($this->stream, $psrResponse, $this));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I like exposing the React\Http\Client\Request which is currently marked as @internal only. There's ongoing effort to remove all classes from this namespace as there's some duplication with the Io namespace.

Do we really need to expose the response and request objects?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the right Request object to expose, but it's kind of a good idea to expose a request object because there could be cases where you want to verify some things about the request after upgrade and then close the connection if that verification fails. That's my thought about it. But sure, I might be wrong here.

I actually read a piece of Node docs to see how upgrade is being handled by the consumer side. I followed that in adding the upgrade event also. Here is a sample piece of how node handles the upgrade.

Sure, If there's a better you think this could be or should be done, I'll be willing to make it work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the whole upgrade event. How does this interfere with the existing request/response semantics?

I like how this is exposed on the server side via https://github.com/reactphp/http#streaming-outgoing-response, perhaps we can find a similar solution here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually followed a Node piece I read here as I also mentioned above.

I will check out the sample you sent, I kind of like it, knowing that a ThroughStream also implements DuplexStreamInterface.

return;
}

list($response, $bodyChunk) = $this->parseResponse($psrResponse);
} catch (\InvalidArgumentException $exception) {
$this->emit('error', array($exception));
}
Expand Down Expand Up @@ -222,9 +239,8 @@ public function close()
$this->removeAllListeners();
}

protected function parseResponse($data)
protected function parseResponse($psrResponse)
{
$psrResponse = gPsr\parse_response($data);
$headers = array_map(function($val) {
if (1 === count($val)) {
$val = $val[0];
Expand Down Expand Up @@ -292,4 +308,9 @@ public function getResponseFactory()

return $factory;
}

public function getRequestData()
{
return $this->requestData;
}
}
8 changes: 8 additions & 0 deletions src/Client/RequestData.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,12 @@ private function getAuthHeaders()

return array();
}

/**
* @return array
*/
public function getHeaders()
{
return $this->headers;
}
}
56 changes: 56 additions & 0 deletions src/Client/UpgradedResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

namespace React\Http\Client;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Stream\DuplexStreamInterface;

class UpgradedResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a new class as part of our public API in the internal React\Http\Client namespace.

{
/**
* @var DuplexStreamInterface
*/
private $connection;

/**
* @var ResponseInterface
*/
private $response;

/**
* @var RequestInterface
*/
private $request;

public function __construct(DuplexStreamInterface $connection, ResponseInterface $response, RequestInterface $request)
{
$this->connection = $connection;
$this->response = $response;
$this->request = $request;
}

/**
* @return DuplexStreamInterface
*/
public function getConnection()
{
return $this->connection;
}

/**
* @return ResponseInterface
*/
public function getResponse()
{
return $this->response;
}

/**
* @return RequestInterface
*/
public function getRequest()
{
return $this->request;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing EOL

67 changes: 44 additions & 23 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
namespace React\Http\Io;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\LoopInterface;
use React\Http\Client\Client as HttpClient;
use React\Http\Client\Response as ResponseStream;
use React\Http\Message\Response;
use React\Http\Client\UpgradedResponse;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Socket\ConnectorInterface;
use React\Stream\DuplexStreamInterface;
use React\Stream\ReadableStreamInterface;

/**
Expand Down Expand Up @@ -70,33 +73,14 @@ public function __construct(HttpClient $http)
*
* @internal
* @param RequestInterface $request
* @return PromiseInterface Promise<ResponseInterface, Exception>
* @param bool $upgrade Configures the sender to listen for upgrade
* @return PromiseInterface Promise<ResponseInterface, ConnectionInterface, Exception>
*/
public function send(RequestInterface $request)
public function send(RequestInterface $request, $upgrade = false)
{
$body = $request->getBody();
$size = $body->getSize();

if ($size !== null && $size !== 0) {
// automatically assign a "Content-Length" request header if the body size is known and non-empty
$request = $request->withHeader('Content-Length', (string)$size);
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
$request = $request->withHeader('Content-Length', '0');
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
$request = $request->withHeader('Transfer-Encoding', 'chunked');
} else {
// do not use chunked encoding if size is known or if this is an empty request body
$size = 0;
}

$headers = array();
foreach ($request->getHeaders() as $name => $values) {
$headers[$name] = implode(', ', $values);
}

$requestStream = $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion());
list($size, $requestStream) = $this->createRequestStream($request);

$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
// close request stream if request is cancelled
Expand Down Expand Up @@ -127,6 +111,16 @@ public function send(RequestInterface $request)
));
});

/**
* We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately
* This is useful for websocket connections that requires an HTTP Upgrade.
*/
if ($upgrade) {
$requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) {
$deferred->resolve(new UpgradedResponse($socket, $response, $request));
});
}

if ($body instanceof ReadableStreamInterface) {
if ($body->isReadable()) {
// length unknown => apply chunked transfer-encoding
Expand Down Expand Up @@ -162,4 +156,31 @@ public function send(RequestInterface $request)

return $deferred->promise();
}

protected function createRequestStream(RequestInterface $request)
{
$body = $request->getBody();
$size = $body->getSize();

if ($size !== null && $size !== 0) {
// automatically assign a "Content-Length" request header if the body size is known and non-empty
$request = $request->withHeader('Content-Length', (string)$size);
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
$request = $request->withHeader('Content-Length', '0');
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
$request = $request->withHeader('Transfer-Encoding', 'chunked');
} else {
// do not use chunked encoding if size is known or if this is an empty request body
$size = 0;
}

$headers = array();
foreach ($request->getHeaders() as $name => $values) {
$headers[$name] = implode(', ', $values);
}

return array($size, $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()));
}
}
9 changes: 7 additions & 2 deletions src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Transaction

private $streaming = false;

// Determines whether to return the connection of an upgrade or not
private $upgrade = false;

private $maximumSize = 16777216; // 16 MiB = 2^24 bytes

public function __construct(Sender $sender, LoopInterface $loop)
Expand Down Expand Up @@ -81,7 +84,7 @@ public function send(RequestInterface $request)

$loop = $this->loop;
$this->next($request, $deferred)->then(
function (ResponseInterface $response) use ($deferred, $loop, &$timeout) {
function ($response) use ($deferred, $loop, &$timeout) {
if (isset($deferred->timeout)) {
$loop->cancelTimer($deferred->timeout);
unset($deferred->timeout);
Expand Down Expand Up @@ -144,7 +147,9 @@ private function next(RequestInterface $request, Deferred $deferred)
$that = $this;
++$deferred->numRequests;

$promise = $this->sender->send($request);
$promise = $this->sender->send($request, $this->upgrade);

if ($this->upgrade) return $promise;

if (!$this->streaming) {
$promise = $promise->then(function ($response) use ($deferred, $that) {
Expand Down