Skip to content

Commit

Permalink
Remove indices in messenger table on MySQL to prevent deadlocks while…
Browse files Browse the repository at this point in the history
… removing messages when running multiple consumers

SELECT ... FOR UPDATE row locks also locks indices. Since locking rows and indices is not one atomic operation,
this might cause deadlocks when running multiple workers. Removing indices on queue_name and available_at
resolves this problem.
  • Loading branch information
jeroennoten committed Aug 26, 2021
1 parent c8c91d6 commit 08c8da2
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
54 changes: 54 additions & 0 deletions Tests/Transport/Doctrine/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Result as DriverResult;
Expand All @@ -23,8 +24,11 @@
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaConfig;
use Doctrine\DBAL\Schema\TableDiff;
use Doctrine\DBAL\Statement;
use Doctrine\DBAL\Types\Types;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
Expand Down Expand Up @@ -402,4 +406,54 @@ public function providePlatformSql(): iterable
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
];
}

/**
* @dataProvider setupIndicesProvider
*/
public function testSetupIndices(string $platformClass, array $expectedIndices)
{
$driverConnection = $this->createMock(DBALConnection::class);
$driverConnection->method('getConfiguration')->willReturn(new Configuration());

$schemaManager = $this->createMock(AbstractSchemaManager::class);
$schema = new Schema();
$expectedTable = $schema->createTable('messenger_messages');
$expectedTable->addColumn('id', Types::BIGINT);
$expectedTable->setPrimaryKey(['id']);
// Make sure columns for indices exists so addIndex() will not throw
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
$expectedTable->addColumn($columnName, Types::STRING);
}
foreach ($expectedIndices as $indexColumns) {
$expectedTable->addIndex($indexColumns);
}
$schemaManager->method('createSchema')->willReturn($schema);
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);

$platformMock = $this->createMock($platformClass);
$platformMock
->expects(self::once())
->method('getAlterTableSQL')
->with(self::callback(static function (TableDiff $tableDiff): bool {
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
}))
->willReturn([]);
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);

$connection = new Connection([], $driverConnection);
$connection->setup();
}

public function setupIndicesProvider(): iterable
{
yield 'MySQL' => [
MySQL57Platform::class,
[['delivered_at']],
];

yield 'Other platforms' => [
AbstractPlatform::class,
[['queue_name'], ['available_at'], ['delivered_at']],
];
}
}
9 changes: 6 additions & 3 deletions Transport/Doctrine/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\LockMode;
use Doctrine\DBAL\Platforms\MySqlPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\Comparator;
Expand Down Expand Up @@ -386,7 +387,6 @@ private function getSchema(): Schema
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
->setNotnull(true);
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(true);
Expand All @@ -395,8 +395,11 @@ private function getSchema(): Schema
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(false);
$table->setPrimaryKey(['id']);
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
}
$table->addIndex(['delivered_at']);

return $schema;
Expand Down

0 comments on commit 08c8da2

Please sign in to comment.