Skip to content

Commit

Permalink
Fix parsing ERR after result set
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Aug 20, 2022
1 parent 7b4428d commit 7ce30f4
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
5 changes: 4 additions & 1 deletion src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public function parse($data)

$len = $this->buffer->length();
if ($len < $this->pctSize) {
$this->debug('Buffer not enouth, return');
$this->debug('Waiting for complete packet with ' . $len . '/' . $this->pctSize . ' bytes');

return;
}
Expand Down Expand Up @@ -277,6 +277,9 @@ private function onResultRow($row)

private function onError(Exception $error)
{
$this->rsState = self::RS_STATE_HEADER;
$this->resultFields = [];

// reject current command with error if we're currently executing any commands
// ignore unsolicited server error in case we're not executing any commands (connection will be dropped)
if ($this->currCommand !== null) {
Expand Down
74 changes: 71 additions & 3 deletions tests/Io/ParserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
namespace React\Tests\MySQL\Io;

use React\MySQL\Commands\QueryCommand;
use React\MySQL\Exception;
use React\MySQL\Io\Executor;
use React\MySQL\Io\Parser;
use React\Stream\CompositeStream;
use React\Stream\ThroughStream;
use React\Tests\MySQL\BaseTestCase;
use React\MySQL\Exception;

class ParserTest extends BaseTestCase
{
Expand Down Expand Up @@ -56,7 +57,7 @@ public function testUnexpectedErrorWithoutCurrentCommandWillBeIgnored()
$stream->write("\x17\0\0\0" . "\xFF" . "\x10\x04" . "Too many connections");
}

public function testSendingErrorFrameDuringHandshakeShouldEmitErrorOnFollowingCommand()
public function testReceivingErrorFrameDuringHandshakeShouldEmitErrorOnFollowingCommand()
{
$stream = new ThroughStream();

Expand All @@ -81,7 +82,74 @@ public function testSendingErrorFrameDuringHandshakeShouldEmitErrorOnFollowingCo
$this->assertEquals('Too many connections', $error->getMessage());
}

public function testSendingIncompleteErrorFrameDuringHandshakeShouldNotEmitError()
public function testReceivingErrorFrameForQueryShouldEmitError()
{
$stream = new ThroughStream();

$command = new QueryCommand();
$command->on('error', $this->expectCallableOnce());

$error = null;
$command->on('error', function ($e) use (&$error) {
$error = $e;
});

$executor = new Executor();
$executor->enqueue($command);

$parser = new Parser($stream, $executor);
$parser->start();

$stream->on('close', $this->expectCallableNever());

$stream->write("\x33\0\0\0" . "\x0a" . "mysql\0" . str_repeat("\0", 44));
$stream->write("\x1E\0\0\1" . "\xFF" . "\x46\x04" . "#abcde" . "Unknown thread id: 42");

$this->assertTrue($error instanceof Exception);
$this->assertEquals(1094, $error->getCode());
$this->assertEquals('Unknown thread id: 42', $error->getMessage());
}

public function testReceivingErrorFrameForQueryAfterResultSetHeadersShouldEmitError()
{
$stream = new ThroughStream();

$command = new QueryCommand();
$command->on('error', $this->expectCallableOnce());

$error = null;
$command->on('error', function ($e) use (&$error) {
$error = $e;
});

$executor = new Executor();
$executor->enqueue($command);

$parser = new Parser(new CompositeStream($stream, new ThroughStream()), $executor);
$parser->start();

$stream->on('close', $this->expectCallableNever());

$stream->write("\x33\0\0\0" . "\x0a" . "mysql\0" . str_repeat("\0", 44));
$stream->write("\x01\0\0\1" . "\x01");
$stream->write("\x1E\0\0\2" . "\x03" . "def" . "\0" . "\0" . "\0" . "\x09" . "sleep(10)" . "\0" . "\xC0" . "\x3F\0" . "\1\0\0\0" . "\3" . "\x81\0". "\0" . "\0\0");
$stream->write("\x05\0\0\3" . "\xFE" . "\0\0\2\0");
$stream->write("\x28\0\0\4" . "\xFF" . "\x25\x05" . "#abcde" . "Query execution was interrupted");

$this->assertTrue($error instanceof Exception);
$this->assertEquals(1317, $error->getCode());
$this->assertEquals('Query execution was interrupted', $error->getMessage());

$ref = new \ReflectionProperty($parser, 'rsState');
$ref->setAccessible(true);
$this->assertEquals(0, $ref->getValue($parser));

$ref = new \ReflectionProperty($parser, 'resultFields');
$ref->setAccessible(true);
$this->assertEquals([], $ref->getValue($parser));
}

public function testReceivingIncompleteErrorFrameDuringHandshakeShouldNotEmitError()
{
$stream = new ThroughStream();

Expand Down

0 comments on commit 7ce30f4

Please sign in to comment.