Skip to content

Commit

Permalink
Fail only on first write of 0 in on-writable watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed May 1, 2020
1 parent b867505 commit 0dd0eb8
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 15 deletions.
11 changes: 9 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sudo: false
os: linux

language: php

Expand All @@ -10,7 +10,14 @@ php:
- 7.4
- nightly

matrix:
jobs:
include:
- name: macOS
os: osx
language: generic
before_install:
- curl -s http://getcomposer.org/installer | php
- mv composer.phar /usr/local/bin/composer
allow_failures:
- php: nightly
fast_finish: true
Expand Down
35 changes: 22 additions & 13 deletions lib/ResourceOutputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
final class ResourceOutputStream implements OutputStream
{
/** @deprecated No longer used. */
const MAX_CONSECUTIVE_EMPTY_WRITES = 3;

const LARGE_CHUNK_SIZE = 128 * 1024;

/** @var resource|null */
Expand Down Expand Up @@ -58,7 +60,14 @@ public function __construct($stream, int $chunkSize = null)
$resource = &$this->resource;

$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$chunkSize, &$writable, &$resource) {
static $emptyWrites = 0;
$firstWrite = true;

// Using error handler to verify that a write of zero bytes was not due an error.
// @see https://github.com/reactphp/stream/pull/150
$error = 0;
\set_error_handler(static function (int $errno) use (&$error) {
$error = $errno;
});

try {
while (!$writes->isEmpty()) {
Expand Down Expand Up @@ -97,29 +106,27 @@ public function __construct($stream, int $chunkSize = null)
throw new StreamException($message);
}

$written = (int) $written; // Cast potential false to 0.

// Broken pipes between processes on macOS/FreeBSD do not detect EOF properly.
if ($written === 0 || $written === false) {
if ($emptyWrites++ > self::MAX_CONSECUTIVE_EMPTY_WRITES) {
$message = "Failed to write to stream after multiple attempts";
if ($error = \error_get_last()) {
$message .= \sprintf("; %s", $error["message"]);
}
throw new StreamException($message);
// fwrite() may write zero bytes on subsequent calls due to the buffer filling again.
if ($written === 0 && $error !== 0 && $firstWrite) {
$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf("; %s", $error["message"]);
}

$writes->unshift([$data, $previous, $deferred]);
return;
throw new StreamException($message);
}

$emptyWrites = 0;

if ($length > $written) {
$data = \substr($data, $written);
$writes->unshift([$data, $written + $previous, $deferred]);
return;
}

$deferred->resolve($written + $previous);

$firstWrite = false;
}
} catch (\Throwable $exception) {
$resource = null;
Expand All @@ -137,6 +144,8 @@ public function __construct($stream, int $chunkSize = null)
if ($writes->isEmpty()) {
Loop::disable($watcher);
}

\restore_error_handler();
}
});

Expand Down
32 changes: 32 additions & 0 deletions test/ResourceOutputStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

namespace Amp\ByteStream\Test;

use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use Amp\Delayed;
use Amp\PHPUnit\AsyncTestCase;

class ResourceOutputStreamTest extends AsyncTestCase
Expand Down Expand Up @@ -68,6 +70,36 @@ public function testClosedRemoteSocket()

// The first write still succeeds somehow...
yield $stream->write("foobar");

// A delay seems required for the OS to realize the socket is indeed closed.
yield new Delayed(10);

yield $stream->write("foobar");
}

/**
* @requires PHPUnit >= 7
*
* @see https://github.com/reactphp/stream/pull/150
*/
public function testUploadBiggerBlockSecure()
{
$size = 2 ** 18; // 256kb

$resource = \stream_socket_client('tls://httpbin.org:443');

$output = new ResourceOutputStream($resource);

$body = \str_repeat('.', $size);

yield $output->write("POST /post HTTP/1.0\r\nHost: httpbin.org\r\nContent-Length: $size\r\n\r\n" . $body);

$input = new ResourceInputStream($resource);
$buffer = '';
while (null !== ($chunk = yield $input->read())) {
$buffer .= $chunk;
}

$this->assertStringContainsString($body, $buffer);
}
}

0 comments on commit 0dd0eb8

Please sign in to comment.