From 08c8da2cecec04ec819186993b3c16293101fc5a Mon Sep 17 00:00:00 2001 From: Jeroen Noten Date: Mon, 2 Aug 2021 14:02:06 +0200 Subject: [PATCH] Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers SELECT ... FOR UPDATE row locks also locks indices. Since locking rows and indices is not one atomic operation, this might cause deadlocks when running multiple workers. Removing indices on queue_name and available_at resolves this problem. --- Tests/Transport/Doctrine/ConnectionTest.php | 54 +++++++++++++++++++++ Transport/Doctrine/Connection.php | 9 ++-- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/Tests/Transport/Doctrine/ConnectionTest.php b/Tests/Transport/Doctrine/ConnectionTest.php index c7575e9b..d8b14472 100644 --- a/Tests/Transport/Doctrine/ConnectionTest.php +++ b/Tests/Transport/Doctrine/ConnectionTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; use Doctrine\DBAL\Abstraction\Result as AbstractionResult; +use Doctrine\DBAL\Configuration; use Doctrine\DBAL\Connection as DBALConnection; use Doctrine\DBAL\DBALException; use Doctrine\DBAL\Driver\Result as DriverResult; @@ -23,8 +24,11 @@ use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\AbstractSchemaManager; +use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\SchemaConfig; +use Doctrine\DBAL\Schema\TableDiff; use Doctrine\DBAL\Statement; +use Doctrine\DBAL\Types\Types; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\TransportException; @@ -402,4 +406,54 @@ public function providePlatformSql(): iterable 'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ', ]; } + + /** + * @dataProvider setupIndicesProvider + */ + public function testSetupIndices(string $platformClass, array $expectedIndices) + { + $driverConnection = $this->createMock(DBALConnection::class); + $driverConnection->method('getConfiguration')->willReturn(new Configuration()); + + $schemaManager = $this->createMock(AbstractSchemaManager::class); + $schema = new Schema(); + $expectedTable = $schema->createTable('messenger_messages'); + $expectedTable->addColumn('id', Types::BIGINT); + $expectedTable->setPrimaryKey(['id']); + // Make sure columns for indices exists so addIndex() will not throw + foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) { + $expectedTable->addColumn($columnName, Types::STRING); + } + foreach ($expectedIndices as $indexColumns) { + $expectedTable->addIndex($indexColumns); + } + $schemaManager->method('createSchema')->willReturn($schema); + $driverConnection->method('getSchemaManager')->willReturn($schemaManager); + + $platformMock = $this->createMock($platformClass); + $platformMock + ->expects(self::once()) + ->method('getAlterTableSQL') + ->with(self::callback(static function (TableDiff $tableDiff): bool { + return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes); + })) + ->willReturn([]); + $driverConnection->method('getDatabasePlatform')->willReturn($platformMock); + + $connection = new Connection([], $driverConnection); + $connection->setup(); + } + + public function setupIndicesProvider(): iterable + { + yield 'MySQL' => [ + MySQL57Platform::class, + [['delivered_at']], + ]; + + yield 'Other platforms' => [ + AbstractPlatform::class, + [['queue_name'], ['available_at'], ['delivered_at']], + ]; + } } diff --git a/Transport/Doctrine/Connection.php b/Transport/Doctrine/Connection.php index 20098e22..e936ca09 100644 --- a/Transport/Doctrine/Connection.php +++ b/Transport/Doctrine/Connection.php @@ -17,6 +17,7 @@ use Doctrine\DBAL\Exception; use Doctrine\DBAL\Exception\TableNotFoundException; use Doctrine\DBAL\LockMode; +use Doctrine\DBAL\Platforms\MySqlPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Comparator; @@ -386,7 +387,6 @@ private function getSchema(): Schema $table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT) ->setNotnull(true); $table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING) - ->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode ->setNotnull(true); $table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE) ->setNotnull(true); @@ -395,8 +395,11 @@ private function getSchema(): Schema $table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE) ->setNotnull(false); $table->setPrimaryKey(['id']); - $table->addIndex(['queue_name']); - $table->addIndex(['available_at']); + // No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers. + if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) { + $table->addIndex(['queue_name']); + $table->addIndex(['available_at']); + } $table->addIndex(['delivered_at']); return $schema;