Skip to content

Commit

Permalink
Fix cardinality error in SQL generated by getListTableForeignKeysSQL
Browse files Browse the repository at this point in the history
Issue #950: The cardinality error would happen when the schema wasn't
prefixed in the $table parameter. As the subquery in the WHERE = clause
would return more than one result if there was the same table name in
two different schemas. (This is specific to postgres only.)

This adds:
 - LIMIT 1 to the subquery of the WHERE = (SELECT ...) clause
 - A clearer parameter name $tableWithSchema and doc blocks to convey
   what's expected for this function
 - A deprecation message for when the schema is not included
  • Loading branch information
jwilson-cee committed Nov 7, 2019
1 parent 80a45c6 commit 5d36b30
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 20 deletions.
43 changes: 26 additions & 17 deletions lib/Doctrine/DBAL/Platforms/PostgreSqlPlatform.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Doctrine\DBAL\Types\IntegerType;
use Doctrine\DBAL\Types\Type;
use UnexpectedValueException;
use const E_USER_DEPRECATED;
use function array_diff;
use function array_merge;
use function array_unique;
Expand All @@ -31,6 +32,7 @@
use function sprintf;
use function strpos;
use function strtolower;
use function trigger_error;
use function trim;

/**
Expand Down Expand Up @@ -272,17 +274,18 @@ public function getListViewsSQL(string $database) : string
}

/**
* {@inheritDoc}
* @param string $tableWithSchema The name of the table prefixed with the schema (i.e. 'some_schema.some_table').
*/
public function getListTableForeignKeysSQL(string $table, ?string $database = null) : string
public function getListTableForeignKeysSQL(string $tableWithSchema, ?string $database = null) : string
{
return 'SELECT quote_ident(r.conname) as conname, pg_catalog.pg_get_constraintdef(r.oid, true) as condef
FROM pg_catalog.pg_constraint r
WHERE r.conrelid =
(
SELECT c.oid
FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n
WHERE ' . $this->getTableWhereClause($table) . " AND n.oid = c.relnamespace
WHERE ' . $this->getTableWhereClause($tableWithSchema) . " AND n.oid = c.relnamespace
LIMIT 1
)
AND r.contype = 'f'";
}
Expand Down Expand Up @@ -331,11 +334,9 @@ public function getListTableConstraintsSQL(string $table) : string
}

/**
* {@inheritDoc}
*
* @link http://ezcomponents.org/docs/api/trunk/DatabaseSchema/ezcDbSchemaPgsqlReader.html
* @param string $tableWithSchema The name of the table prefixed with the schema (i.e. 'some_schema.some_table').
*/
public function getListTableIndexesSQL(string $table, ?string $currentDatabase = null) : string
public function getListTableIndexesSQL(string $tableWithSchema, ?string $currentDatabase = null) : string
{
return 'SELECT quote_ident(relname) as relname, pg_index.indisunique, pg_index.indisprimary,
pg_index.indkey, pg_index.indrelid,
Expand All @@ -344,25 +345,33 @@ public function getListTableIndexesSQL(string $table, ?string $currentDatabase =
WHERE oid IN (
SELECT indexrelid
FROM pg_index si, pg_class sc, pg_namespace sn
WHERE ' . $this->getTableWhereClause($table, 'sc', 'sn') . ' AND sc.oid=si.indrelid AND sc.relnamespace = sn.oid
WHERE ' . $this->getTableWhereClause($tableWithSchema, 'sc', 'sn') . ' AND sc.oid=si.indrelid AND sc.relnamespace = sn.oid
) AND pg_index.indexrelid = oid';
}

private function getTableWhereClause(string $table, string $classAlias = 'c', string $namespaceAlias = 'n') : string
/**
* @param string $tableWithSchema The name of the table prefixed with the schema (i.e. 'some_schema.some_table').
*/
private function getTableWhereClause(string $tableWithSchema, string $classAlias = 'c', string $namespaceAlias = 'n') : string
{
$whereClause = $namespaceAlias . ".nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND ";
if (strpos($table, '.') !== false) {
[$schema, $table] = explode('.', $table);
if (strpos($tableWithSchema, '.') !== false) {
[$schema, $table] = explode('.', $tableWithSchema);
$schema = $this->quoteStringLiteral($schema);
} else {
$table = $tableWithSchema;
$schema = "ANY(string_to_array((select replace(replace(setting,'\"\$user\"',user),' ','') from pg_catalog.pg_settings where name = 'search_path'),','))";
@trigger_error(sprintf(
'Providing a table name without a schema prefix, i.e. \'some_schema.some_table\', (%s given) is deprecated and will cause an error in Doctrine 3.0',
$tableWithSchema
), E_USER_DEPRECATED);
}

$table = new Identifier($table);
$table = $this->quoteStringLiteral($table->getName());

return $whereClause . sprintf(
'%s.relname = %s AND %s.nspname = %s',
return sprintf(
"%s.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND %s.relname = %s AND %s.nspname = %s",
$namespaceAlias,
$classAlias,
$table,
$namespaceAlias,
Expand All @@ -371,9 +380,9 @@ private function getTableWhereClause(string $table, string $classAlias = 'c', st
}

/**
* {@inheritDoc}
* @param string $tableWithSchema The name of the table prefixed with the schema (i.e. 'some_schema.some_table').
*/
public function getListTableColumnsSQL(string $table, ?string $database = null) : string
public function getListTableColumnsSQL(string $tableWithSchema, ?string $database = null) : string
{
return "SELECT
a.attnum,
Expand All @@ -400,7 +409,7 @@ public function getListTableColumnsSQL(string $table, ?string $database = null)
FROM pg_description WHERE pg_description.objoid = c.oid AND a.attnum = pg_description.objsubid
) AS comment
FROM pg_attribute a, pg_class c, pg_type t, pg_namespace n
WHERE " . $this->getTableWhereClause($table, 'c', 'n') . '
WHERE " . $this->getTableWhereClause($tableWithSchema, 'c', 'n') . '
AND a.attnum > 0
AND a.attrelid = c.oid
AND a.atttypid = t.oid
Expand Down
12 changes: 12 additions & 0 deletions lib/Doctrine/DBAL/Schema/PostgreSqlSchemaManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ public function determineExistingSchemaSearchPaths() : void
});
}

/**
* Lists the foreign keys for the given table in a given schema.
*
* @param string $tableWithSchema The name of the table prefixed with the schema (i.e. 'some_schema.some_table').
*
* @return array<int|string, ForeignKeyConstraint>
*/
public function listTableForeignKeys(string $tableWithSchema, ?string $database = null) : array
{
return parent::listTableForeignKeys($tableWithSchema, $database);
}

/**
* {@inheritdoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ public function testQuotesTableNameInListTableForeignKeysSQL() : void
{
self::assertStringContainsStringIgnoringCase(
"'Foo''Bar\\'",
$this->platform->getListTableForeignKeysSQL("Foo'Bar\\")
$this->platform->getListTableForeignKeysSQL("public.Foo'Bar\\")
);
}

Expand Down Expand Up @@ -994,7 +994,7 @@ public function testQuotesTableNameInListTableIndexesSQL() : void
{
self::assertStringContainsStringIgnoringCase(
"'Foo''Bar\\'",
$this->platform->getListTableIndexesSQL("Foo'Bar\\")
$this->platform->getListTableIndexesSQL("public.Foo'Bar\\")
);
}

Expand All @@ -1016,7 +1016,7 @@ public function testQuotesTableNameInListTableColumnsSQL() : void
{
self::assertStringContainsStringIgnoringCase(
"'Foo''Bar\\'",
$this->platform->getListTableColumnsSQL("Foo'Bar\\")
$this->platform->getListTableColumnsSQL("public.Foo'Bar\\")
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

declare(strict_types=1);

namespace Doctrine\Tests\DBAL\Schema;

use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Platforms\PostgreSqlPlatform;
use Doctrine\Tests\DbalFunctionalTestCase;
use function extension_loaded;
use function in_array;

class PostgreSqlListTableFunctionsTest extends DbalFunctionalTestCase
{
protected function setUp() : void
{
if (! extension_loaded('pdo_pgsql')) {
$this->markTestSkipped('pdo_pgsql is not loaded.');

return;
}

parent::setUp();

if (! in_array('pdo_pgsql', DriverManager::getAvailableDrivers())) {
$this->markTestSkipped('PostgreSQL driver not available');

return;
}

if (! $this->connection->getDatabasePlatform() instanceof PostgreSqlPlatform) {
$this->markTestSkipped('PostgreSQL Only test.');

return;
}

$this->connection->executeQuery('CREATE SCHEMA test_schema1;');
$this->connection->executeQuery('CREATE SCHEMA test_schema2;');
$this->connection->executeQuery('set search_path = "test_schema1", "test_schema2";');
$this->connection->executeQuery('CREATE TABLE test_schema1.test_foreign1 ( id integer PRIMARY KEY );');
$this->connection->executeQuery('CREATE TABLE test_schema1.test_table ( test_column1 varchar(5), test_foreign1_id integer constraint fk_test_foreign1 references test_schema1.test_foreign1 (id) );');
$this->connection->executeQuery('CREATE INDEX idx_test_column1 ON test_schema1.test_table (test_column1);');
$this->connection->executeQuery('CREATE TABLE test_schema2.test_foreign2 ( id integer PRIMARY KEY );');
$this->connection->executeQuery('CREATE TABLE test_schema2.test_table ( test_column2 varchar(5), test_foreign2_id integer constraint fk_test_foreign2 references test_schema2.test_foreign2 (id) );');
$this->connection->executeQuery('CREATE INDEX idx_test_column2 ON test_schema2.test_table (test_column2);');
}

public function testListTableFunctions() : void
{
$foreignKeys = $this->connection->getSchemaManager()->listTableForeignKeys('test_table');
$this->assertNotEmpty($foreignKeys);
$foreignKeys = $this->connection->getSchemaManager()->listTableForeignKeys('test_schema1.test_table');
$columns = isset($foreignKeys[0]) ? $foreignKeys[0]->getColumns() : [];
$this->assertContains('test_foreign1_id', $columns);
$foreignKeys = $this->connection->getSchemaManager()->listTableForeignKeys('test_schema2.test_table');
$columns = isset($foreignKeys[0]) ? $foreignKeys[0]->getColumns() : [];
$this->assertContains('test_foreign2_id', $columns);

$columns = $this->connection->getSchemaManager()->listTableColumns('test_table');
$this->assertNotEmpty($columns);
$columns = $this->connection->getSchemaManager()->listTableColumns('test_schema1.test_table');
$this->assertArrayHasKey('test_column1', $columns);
$columns = $this->connection->getSchemaManager()->listTableColumns('test_schema2.test_table');
$this->assertArrayHasKey('test_column2', $columns);

$indexes = $this->connection->getSchemaManager()->listTableIndexes('test_table');
$this->assertNotEmpty($indexes);
$indexes = $this->connection->getSchemaManager()->listTableIndexes('test_schema1.test_table');
$this->assertArrayHasKey('idx_test_column1', $indexes);
$indexes = $this->connection->getSchemaManager()->listTableIndexes('test_schema2.test_table');
$this->assertArrayHasKey('idx_test_column2', $indexes);
}
}

0 comments on commit 5d36b30

Please sign in to comment.