Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support protocol packages larger than 16 MiB #47 #197

Open
wants to merge 2 commits into
base: 0.7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
coverage: xdebug
ini-file: development
- run: composer install
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 --max-allowed-packet=20M
- run: bash tests/wait-for-mysql.sh
- run: vendor/bin/phpunit --coverage-text
if: ${{ matrix.php >= 7.3 }}
Expand All @@ -49,6 +49,6 @@ jobs:
uses: docker://hhvm/hhvm:3.30-lts-latest
with:
args: hhvm composer.phar install
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 --max-allowed-packet=20M
- run: bash tests/wait-for-mysql.sh
- run: docker run -i --rm --workdir=/data -v "$(pwd):/data" --net=host hhvm/hhvm:3.30-lts-latest hhvm vendor/bin/phpunit
88 changes: 73 additions & 15 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class Parser
* @var Executor
*/
protected $executor;
/**
* Current packet for split packet paring
*/
protected $packet = null;

public function __construct(DuplexStreamInterface $stream, Executor $executor)
{
Expand Down Expand Up @@ -165,22 +169,48 @@ public function handleData($data)
return;
}

$packet = $this->buffer->readBuffer($this->pctSize);
if ($this->packet !== null) {
/**
* We are in packet splitting
* Append data
*/
$packet = null;
$this->packet->append($this->buffer->read($this->pctSize));
if ($this->pctSize < 0xffffff) {
/**
* We're done
*/
$packet = $this->packet;
$this->packet = null;
}
} else {
$packet = $this->buffer->readBuffer($this->pctSize);
}
/**
* Remember last packet size as split packets may have ended with 0 length packet.
*/
$lastPctSize = $this->pctSize;
$this->state = self::STATE_STANDBY;
$this->pctSize = self::PACKET_SIZE_HEADER;
if ($this->packet === null && $packet->length() === 0xffffff && $lastPctSize > 0) {
/**
* Start reading split packets
*/
$this->packet = $packet;
} elseif ($packet !== null) {
try {
$this->parsePacket($packet);
} catch (\UnderflowException $e) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e));
$this->stream->close();
return;
}

try {
$this->parsePacket($packet);
} catch (\UnderflowException $e) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e));
$this->stream->close();
return;
}

if ($packet->length() !== 0) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)'));
$this->stream->close();
return;
if ($packet->length() !== 0) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)'));
$this->stream->close();
return;
}
}
}
}
Expand Down Expand Up @@ -262,7 +292,7 @@ private function parsePacket(Buffer $packet)
$this->debug(sprintf("AffectedRows: %d, InsertId: %d, WarningCount:%d", $this->affectedRows, $this->insertId, $this->warningCount));
$this->onSuccess();
$this->nextRequest();
} elseif ($fieldCount === 0xFE) {
} elseif ($fieldCount === 0xFE && $packet->length() < 0xfffffe) {
// EOF Packet
$packet->skip(4); // warn, status
if ($this->rsState === self::RS_STATE_ROW) {
Expand Down Expand Up @@ -388,7 +418,35 @@ public function onClose()

public function sendPacket($packet)
{
return $this->stream->write($this->buffer->buildInt3(\strlen($packet)) . $this->buffer->buildInt1($this->seq++) . $packet);
/**
* If packet is longer than 0xffffff (16M), we should split and send many packets
*
*/
$packet_len = \strlen($packet);
$this->debug('sendPacket: len: ' . $packet_len);

if ($packet_len >= 0xffffff) {
$this->debug('Packet split: packet_len: ' . $packet_len);
$ret = null;
while ($packet_len > 0) {
$part = \substr($packet, 0, 0xffffff);
$part_len = \strlen($part);
$ret = $this->stream->write($this->buffer->buildInt3($part_len) . $this->buffer->buildInt1($this->seq++) . $part);
$packet = \substr($packet, $part_len);
$packet_len = \strlen($packet);
// If last part was exactly 0xffffff in size, we need to send an empty packet to signal end
// of packet splitting.
if (\strlen($packet) === 0 && $part_len === 0xffffff) {
$ret = $this->stream->write($this->buffer->buildInt3(0) . $this->buffer->buildInt1($this->seq++));
}
}
$this->debug('Packet sent');
return $ret;
}
/**
* Packet is below 16M
*/
return $this->stream->write($this->buffer->buildInt3($packet_len) . $this->buffer->buildInt1($this->seq++) . $packet);
}

protected function nextRequest($isHandshake = false)
Expand Down
112 changes: 112 additions & 0 deletions tests/ResultQueryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,118 @@ public function testSelectAfterDelay()
Loop::run();
}

protected function checkMaxAllowedPacket($connection, $min = 0x1100000)
{
return $connection->query('SHOW VARIABLES LIKE \'max_allowed_packet\'')->then(
function ($res) use ($min, $connection) {
$current = $res->resultRows[0]['Value'];
if ($current < $min) {
$this->markTestSkipped('max_allowed_packet too low: current: ' . $current . ' min: ' . $min);
}
return true;
}
);
}

public function testSelectStaticTextSplitPacketsExactlyBelow16MiB()
{
$uri = $this->getConnectionString();
$connection = new MysqlClient($uri);

$this->checkMaxAllowedPacket($connection, 0x1000000)->then(
function () use ($connection) {
/**
* This should be exactly below 16MiB packet
*
* x03 + "select ''" = len(10)
*/
$text = str_repeat('A', 0xffffff - 11);
$connection->query('select \'' . $text . '\'')->then(
function (MysqlResult $command) use ($text) {
$this->assertCount(1, $command->resultRows);
$this->assertCount(1, $command->resultRows[0]);
$this->assertSame($text, reset($command->resultRows[0]));
}
);
$connection->quit();
},
function (\Exception $error) {
$this->markTestSkipped($error->getMessage());
}
);
Loop::run();
}

public function testSelectStaticTextSplitPacketsExactly16MiB()
{
$uri = $this->getConnectionString();
$connection = new MysqlClient($uri);

$this->checkMaxAllowedPacket($connection)->then(
function () use ($connection) {
/**
* This should be exactly at 16MiB packet
*
* x03 + "select ''" = len(10)
*/
$text = str_repeat('A', 0xffffff - 10);
$connection->query('select \'' . $text . '\'')->then(
function (MysqlResult $command) use ($text) {
$this->assertCount(1, $command->resultRows);
$this->assertCount(1, $command->resultRows[0]);
$this->assertSame($text, reset($command->resultRows[0]));
}
);
$connection->quit();
},
function (\Exception $error) {
if (method_exists($this, 'assertStringContainsString')) {
// PHPUnit 9+
$this->assertStringContainsString('max_allowed_packet too low: current:', $error->getMessage());
} else {
// legacy PHPUnit < 9
$this->assertContains('max_allowed_packet too low: current:', $error->getMessage());
}
}
);
Loop::run();
}

public function testSelectStaticTextSplitPacketsAbove16MiB()
{
$uri = $this->getConnectionString();
$connection = new MysqlClient($uri);

$this->checkMaxAllowedPacket($connection)->then(
function () use ($connection) {
/**
* This should be exactly at 16MiB + 10 packet
*
* x03 + "select ''" = len(10)
*/
$text = str_repeat('A', 0xffffff);
$connection->query('select \'' . $text . '\'')->then(
function (MysqlResult $command) use ($text) {
$this->assertCount(1, $command->resultRows);
$this->assertCount(1, $command->resultRows[0]);
$this->assertSame($text, reset($command->resultRows[0]));
}
);
$connection->quit();
},
function (\Exception $error) {
if (method_exists($this, 'assertStringContainsString')) {
// PHPUnit 9+
$this->assertStringContainsString('max_allowed_packet too low: current:', $error->getMessage());
} else {
// legacy PHPUnit < 9
$this->assertContains('max_allowed_packet too low: current:', $error->getMessage());
}
}
);
Loop::run();
}

public function testQueryStreamStaticEmptyEmitsSingleRow()
{
$connection = $this->createConnection(Loop::get());
Expand Down