Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2115] fix(jdbc-catalog): Can't create a table in database with the same name prefix. #2116

Merged
merged 13 commits into from
Apr 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -124,32 +124,44 @@ public List<String> listTables(String databaseName) throws NoSuchSchemaException
}
}

/**
* Get table information from the result set and attach it to the table builder, If the table is
* not found, it will throw a NoSuchTableException.
*
* @param tablesResult The result set of the table
* @return The builder of the table to be returned
*/
protected JdbcTable.Builder getTableBuilder(
ResultSet tablesResult, String databaseName, String tableName) throws SQLException {
boolean found = false;
JdbcTable.Builder builder = null;
while (tablesResult.next() && !found) {
if (Objects.equals(tablesResult.getString("TABLE_NAME"), tableName)) {
builder = getBasicJdbcTableInfo(tablesResult);
found = true;
}
}

if (!found) {
throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName);
}

return builder;
}

@Override
public JdbcTable load(String databaseName, String tableName) throws NoSuchTableException {
// We should handle case sensitivity and wild card issue in some catalog tables, take a MySQL
// table for example.
// We should handle case sensitivity and wild card issue in some catalog tables, take MySQL
// tables, for example.
// 1. MySQL will get table 'a_b' and 'A_B' when we query 'a_b' in a case-insensitive charset
// like utf8mb4.
// 2. MySQL treats 'a_b' as a wildcard, matching any table name that begins with 'a', followed
// by any character, and ending with 'b'.
try (Connection connection = getConnection(databaseName)) {
// 1.Get table information
ResultSet table = getTable(connection, databaseName, tableName);
// The result of tables may be more than one due to the reason above, so we need to check the
// result
JdbcTable.Builder jdbcTableBuilder = JdbcTable.builder();
boolean found = false;
// Handle case-sensitive issues.
while (table.next() && !found) {
if (Objects.equals(table.getString("TABLE_NAME"), tableName)) {
jdbcTableBuilder = getBasicJdbcTableInfo(table);
found = true;
}
}

if (!found) {
throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName);
}
// 1. Get table information, The result of tables may be more than one due to the reason
// above, so we need to check the result.
ResultSet tables = getTable(connection, databaseName, tableName);
JdbcTable.Builder jdbcTableBuilder = getTableBuilder(tables, databaseName, tableName);

// 2.Get column information
List<JdbcColumn> jdbcColumns = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import com.datastrato.gravitino.exceptions.NoSuchColumnException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
Expand All @@ -32,6 +33,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -70,6 +72,27 @@ public void initialize(
"The `jdbc-database` configuration item is mandatory in PostgreSQL.");
}

protected JdbcTable.Builder getTableBuilder(
ResultSet tablesResult, String databaseName, String tableName) throws SQLException {
boolean found = false;
JdbcTable.Builder builder = null;
while (tablesResult.next() && !found) {
String tableNameInResult = tablesResult.getString("TABLE_NAME");
String tableSchemaInResultLowerCase = tablesResult.getString("TABLE_SCHEM");
if (Objects.equals(tableNameInResult, tableName)
&& Objects.equals(tableSchemaInResultLowerCase, databaseName)) {
builder = getBasicJdbcTableInfo(tablesResult);
found = true;
}
}

if (!found) {
throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName);
}

return builder;
}

@Override
protected String generateCreateTableSql(
String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,74 @@ void testPGListTable() {
}
}

@Test
void testCreateSameTableInDifferentSchema() {
String schemaPrefix = GravitinoITUtils.genRandomName("postgresql_it_schema");
String schemaName1 = schemaPrefix + "1a";
String schemaName2 = schemaPrefix + "_a";
String schemaName3 = schemaPrefix + "__";

String[] dbs = {schemaName1, schemaName2, schemaName3};
for (int i = 0; i < dbs.length; i++) {
catalog
.asSchemas()
.createSchema(
NameIdentifier.of(metalakeName, catalogName, dbs[i]), dbs[i], Maps.newHashMap());
}

String tableName1 = "table1";
String tableName2 = "table2";
String tableName3 = "table3";
Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, null);
Column col2 = Column.of("col_2", Types.IntegerType.get(), "yes", false, false, null);
Column col3 = Column.of("col_3", Types.DateType.get(), "comment", false, false, null);
Column col4 = Column.of("col_4", Types.VarCharType.of(255), "code", false, false, null);
Column col5 = Column.of("col_5", Types.VarCharType.of(255), "config", false, false, null);
Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};

String[] tables = {tableName1, tableName2, tableName3};

for (int i = 0; i < dbs.length; i++) {
for (int j = 0; j < tables.length; j++) {
catalog
.asTableCatalog()
.createTable(
NameIdentifier.of(metalakeName, catalogName, dbs[i], tables[j]),
newColumns,
dbs[i] + "." + tables[j],
Maps.newHashMap(),
Transforms.EMPTY_TRANSFORM,
Distributions.NONE,
new SortOrder[0],
new Index[0]);
}
}

// list table in schema
for (int i = 0; i < dbs.length; i++) {
NameIdentifier[] tableNames =
catalog.asTableCatalog().listTables(Namespace.of(metalakeName, catalogName, dbs[i]));
Assertions.assertEquals(3, tableNames.length);
String[] realNames =
Arrays.stream(tableNames).map(NameIdentifier::name).toArray(String[]::new);
Assertions.assertArrayEquals(tables, realNames);

final int idx = i;
for (String n : realNames) {
Table t =
Assertions.assertDoesNotThrow(
() ->
catalog
.asTableCatalog()
.loadTable(NameIdentifier.of(metalakeName, catalogName, dbs[idx], n)));
Assertions.assertEquals(n, t.name());

// Test the table1 is the `1a`.`table1` not `_a`.`table1` or `__`.`table1`
Assertions.assertEquals(dbs[idx] + "." + n, t.comment());
}
}
}

@Test
void testPostgreSQLSchemaNameCaseSensitive() {
Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, null);
Expand Down
Loading