From 7ce30f4eb3399546efc9bc73beca86cba0ac0ec6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Thu, 18 Aug 2022 18:56:19 +0200 Subject: [PATCH] Fix parsing ERR after result set --- src/Io/Parser.php | 5 ++- tests/Io/ParserTest.php | 74 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/src/Io/Parser.php b/src/Io/Parser.php index 0085a84..05996d7 100644 --- a/src/Io/Parser.php +++ b/src/Io/Parser.php @@ -127,7 +127,7 @@ public function parse($data) $len = $this->buffer->length(); if ($len < $this->pctSize) { - $this->debug('Buffer not enouth, return'); + $this->debug('Waiting for complete packet with ' . $len . '/' . $this->pctSize . ' bytes'); return; } @@ -277,6 +277,9 @@ private function onResultRow($row) private function onError(Exception $error) { + $this->rsState = self::RS_STATE_HEADER; + $this->resultFields = []; + // reject current command with error if we're currently executing any commands // ignore unsolicited server error in case we're not executing any commands (connection will be dropped) if ($this->currCommand !== null) { diff --git a/tests/Io/ParserTest.php b/tests/Io/ParserTest.php index b96c9e9..118e291 100644 --- a/tests/Io/ParserTest.php +++ b/tests/Io/ParserTest.php @@ -3,11 +3,12 @@ namespace React\Tests\MySQL\Io; use React\MySQL\Commands\QueryCommand; +use React\MySQL\Exception; use React\MySQL\Io\Executor; use React\MySQL\Io\Parser; +use React\Stream\CompositeStream; use React\Stream\ThroughStream; use React\Tests\MySQL\BaseTestCase; -use React\MySQL\Exception; class ParserTest extends BaseTestCase { @@ -56,7 +57,7 @@ public function testUnexpectedErrorWithoutCurrentCommandWillBeIgnored() $stream->write("\x17\0\0\0" . "\xFF" . "\x10\x04" . "Too many connections"); } - public function testSendingErrorFrameDuringHandshakeShouldEmitErrorOnFollowingCommand() + public function testReceivingErrorFrameDuringHandshakeShouldEmitErrorOnFollowingCommand() { $stream = new ThroughStream(); @@ -81,7 +82,74 @@ public function testSendingErrorFrameDuringHandshakeShouldEmitErrorOnFollowingCo $this->assertEquals('Too many connections', $error->getMessage()); } - public function testSendingIncompleteErrorFrameDuringHandshakeShouldNotEmitError() + public function testReceivingErrorFrameForQueryShouldEmitError() + { + $stream = new ThroughStream(); + + $command = new QueryCommand(); + $command->on('error', $this->expectCallableOnce()); + + $error = null; + $command->on('error', function ($e) use (&$error) { + $error = $e; + }); + + $executor = new Executor(); + $executor->enqueue($command); + + $parser = new Parser($stream, $executor); + $parser->start(); + + $stream->on('close', $this->expectCallableNever()); + + $stream->write("\x33\0\0\0" . "\x0a" . "mysql\0" . str_repeat("\0", 44)); + $stream->write("\x1E\0\0\1" . "\xFF" . "\x46\x04" . "#abcde" . "Unknown thread id: 42"); + + $this->assertTrue($error instanceof Exception); + $this->assertEquals(1094, $error->getCode()); + $this->assertEquals('Unknown thread id: 42', $error->getMessage()); + } + + public function testReceivingErrorFrameForQueryAfterResultSetHeadersShouldEmitError() + { + $stream = new ThroughStream(); + + $command = new QueryCommand(); + $command->on('error', $this->expectCallableOnce()); + + $error = null; + $command->on('error', function ($e) use (&$error) { + $error = $e; + }); + + $executor = new Executor(); + $executor->enqueue($command); + + $parser = new Parser(new CompositeStream($stream, new ThroughStream()), $executor); + $parser->start(); + + $stream->on('close', $this->expectCallableNever()); + + $stream->write("\x33\0\0\0" . "\x0a" . "mysql\0" . str_repeat("\0", 44)); + $stream->write("\x01\0\0\1" . "\x01"); + $stream->write("\x1E\0\0\2" . "\x03" . "def" . "\0" . "\0" . "\0" . "\x09" . "sleep(10)" . "\0" . "\xC0" . "\x3F\0" . "\1\0\0\0" . "\3" . "\x81\0". "\0" . "\0\0"); + $stream->write("\x05\0\0\3" . "\xFE" . "\0\0\2\0"); + $stream->write("\x28\0\0\4" . "\xFF" . "\x25\x05" . "#abcde" . "Query execution was interrupted"); + + $this->assertTrue($error instanceof Exception); + $this->assertEquals(1317, $error->getCode()); + $this->assertEquals('Query execution was interrupted', $error->getMessage()); + + $ref = new \ReflectionProperty($parser, 'rsState'); + $ref->setAccessible(true); + $this->assertEquals(0, $ref->getValue($parser)); + + $ref = new \ReflectionProperty($parser, 'resultFields'); + $ref->setAccessible(true); + $this->assertEquals([], $ref->getValue($parser)); + } + + public function testReceivingIncompleteErrorFrameDuringHandshakeShouldNotEmitError() { $stream = new ThroughStream();