From dc3a438c7da4f2dae020346ea7ff154f9ed80fbe Mon Sep 17 00:00:00 2001 From: Clearvive Date: Tue, 30 Jan 2024 20:06:09 +0800 Subject: [PATCH] [#1736] feat(postgresql): Support PostgreSQL index. --- .../jdbc/operation/JdbcTableOperations.java | 46 ++++++- .../mysql/operation/MysqlTableOperations.java | 41 ------ .../operation/PostgreSqlTableOperations.java | 47 +++++++ .../jdbc/postgresql/CatalogPostgreSqlIT.java | 122 +++++++++++++++++- .../TestPostgreSqlTableOperations.java | 83 ++++++++++++ 5 files changed, 296 insertions(+), 43 deletions(-) diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java index 697c28004f0..cfb8fecf9e8 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java @@ -16,7 +16,10 @@ import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; @@ -25,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import javax.sql.DataSource; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -239,7 +243,47 @@ protected void correctJdbcTableFields( protected List getIndexes(String databaseName, String tableName, DatabaseMetaData metaData) throws SQLException { - return Collections.emptyList(); + List indexes = new ArrayList<>(); + + // Get primary key information + SetMultimap primaryKeyGroupByName = HashMultimap.create(); + ResultSet primaryKeys = getPrimaryKeys(databaseName, tableName, metaData); + while (primaryKeys.next()) { + String columnName = primaryKeys.getString("COLUMN_NAME"); + primaryKeyGroupByName.put(primaryKeys.getString("PK_NAME"), columnName); + } + for (String key : primaryKeyGroupByName.keySet()) { + indexes.add(Indexes.primary(key, convertIndexFieldNames(primaryKeyGroupByName.get(key)))); + } + + // Get unique key information + SetMultimap indexGroupByName = HashMultimap.create(); + ResultSet indexInfo = getIndexInfo(databaseName, tableName, metaData); + while (indexInfo.next()) { + String indexName = indexInfo.getString("INDEX_NAME"); + if (!indexInfo.getBoolean("NON_UNIQUE") && !primaryKeyGroupByName.containsKey(indexName)) { + String columnName = indexInfo.getString("COLUMN_NAME"); + indexGroupByName.put(indexName, columnName); + } + } + for (String key : indexGroupByName.keySet()) { + indexes.add(Indexes.unique(key, convertIndexFieldNames(indexGroupByName.get(key)))); + } + return indexes; + } + + protected ResultSet getIndexInfo(String databaseName, String tableName, DatabaseMetaData metaData) + throws SQLException { + return metaData.getIndexInfo(databaseName, null, tableName, false, false); + } + + protected ResultSet getPrimaryKeys( + String databaseName, String tableName, DatabaseMetaData metaData) throws SQLException { + return metaData.getPrimaryKeys(databaseName, null, tableName); + } + + protected String[][] convertIndexFieldNames(Set fieldNames) { + return fieldNames.stream().map(colName -> new String[] {colName}).toArray(String[][]::new); } protected abstract String generateCreateTableSql( diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java index 88d77a0d0fe..8c436ae27ec 100644 --- a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java @@ -20,10 +20,7 @@ import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.indexes.Indexes; import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.SetMultimap; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -35,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -210,43 +206,6 @@ public static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) { } } - protected List getIndexes(String databaseName, String tableName, DatabaseMetaData metaData) - throws SQLException { - List indexes = new ArrayList<>(); - - // Get primary key information - SetMultimap primaryKeyGroupByName = HashMultimap.create(); - ResultSet primaryKeys = metaData.getPrimaryKeys(databaseName, null, tableName); - while (primaryKeys.next()) { - String columnName = primaryKeys.getString("COLUMN_NAME"); - primaryKeyGroupByName.put(primaryKeys.getString("PK_NAME"), columnName); - } - for (String key : primaryKeyGroupByName.keySet()) { - indexes.add(Indexes.primary(key, convertIndexFieldNames(primaryKeyGroupByName.get(key)))); - } - - // Get unique key information - SetMultimap indexGroupByName = HashMultimap.create(); - ResultSet indexInfo = metaData.getIndexInfo(databaseName, null, tableName, false, false); - while (indexInfo.next()) { - String indexName = indexInfo.getString("INDEX_NAME"); - if (!indexInfo.getBoolean("NON_UNIQUE") - && !StringUtils.equalsIgnoreCase(Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME, indexName)) { - String columnName = indexInfo.getString("COLUMN_NAME"); - indexGroupByName.put(indexName, columnName); - } - } - for (String key : indexGroupByName.keySet()) { - indexes.add(Indexes.unique(key, convertIndexFieldNames(indexGroupByName.get(key)))); - } - - return indexes; - } - - private String[][] convertIndexFieldNames(Set fieldNames) { - return fieldNames.stream().map(colName -> new String[] {colName}).toArray(String[][]::new); - } - @Override protected boolean getAutoIncrementInfo(ResultSet resultSet) throws SQLException { return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT")); diff --git a/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java b/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java index 3e9183589b7..e3243431b4b 100644 --- a/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java +++ b/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.sql.DataSource; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ArrayUtils; @@ -77,6 +78,7 @@ protected String generateCreateTableSql( sqlBuilder.append(",\n"); } } + appendIndexesSql(indexes, sqlBuilder); sqlBuilder.append("\n)"); // Add table properties if any if (MapUtils.isNotEmpty(properties)) { @@ -115,6 +117,39 @@ protected String generateCreateTableSql( return result; } + public static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) { + for (Index index : indexes) { + String fieldStr = + Arrays.stream(index.fieldNames()) + .map( + colNames -> { + if (colNames.length > 1) { + throw new IllegalArgumentException( + "Index does not support complex fields in PostgreSQL"); + } + return PG_QUOTE + colNames[0] + PG_QUOTE; + }) + .collect(Collectors.joining(", ")); + sqlBuilder.append(",\n"); + switch (index.type()) { + case PRIMARY_KEY: + if (StringUtils.isNotEmpty(index.name())) { + sqlBuilder.append("CONSTRAINT ").append(PG_QUOTE).append(index.name()).append(PG_QUOTE); + } + sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")"); + break; + case UNIQUE_KEY: + if (StringUtils.isNotEmpty(index.name())) { + sqlBuilder.append("CONSTRAINT ").append(PG_QUOTE).append(index.name()).append(PG_QUOTE); + } + sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")"); + break; + default: + throw new IllegalArgumentException("PostgreSQL doesn't support index : " + index.type()); + } + } + } + private StringBuilder appendColumnDefinition(JdbcColumn column, StringBuilder sqlBuilder) { // Add data type sqlBuilder @@ -377,6 +412,18 @@ private String updateColumnCommentFieldDefinition( return "COMMENT ON COLUMN " + tableName + "." + col + " IS '" + newComment + "';"; } + @Override + protected ResultSet getIndexInfo(String schemaName, String tableName, DatabaseMetaData metaData) + throws SQLException { + return metaData.getIndexInfo(database, schemaName, tableName, false, false); + } + + @Override + protected ResultSet getPrimaryKeys(String schemaName, String tableName, DatabaseMetaData metaData) + throws SQLException { + return metaData.getPrimaryKeys(database, schemaName, tableName); + } + @Override protected Connection getConnection(String schema) throws SQLException { Connection connection = dataSource.getConnection(); diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/CatalogPostgreSqlIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/CatalogPostgreSqlIT.java index 95603d0969d..ba4af7096ff 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/CatalogPostgreSqlIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/CatalogPostgreSqlIT.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.integration.test.catalog.jdbc.postgresql; +import static org.junit.jupiter.api.Assertions.assertThrows; + import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; @@ -29,6 +31,8 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; import com.datastrato.gravitino.rel.types.Types; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -43,7 +47,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.datanucleus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -552,4 +556,120 @@ void testCreateAndLoadSchema() { Assertions.assertEquals("anonymous", schema.auditInfo().creator()); Assertions.assertTrue(StringUtils.isEmpty(schema.comment())); } + + @Test + void testCreateIndexTable() { + Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, null); + Column col2 = Column.of("col_2", Types.VarCharType.of(100), "yes", false, false, null); + Column col3 = Column.of("col_3", Types.DateType.get(), "comment", false, false, null); + Column col4 = Column.of("col_4", Types.VarCharType.of(255), "code", false, false, null); + Column col5 = Column.of("col_5", Types.VarCharType.of(255), "config", false, false, null); + Column[] newColumns = new Column[] {col1, col2, col3, col4, col5}; + + Index[] indexes = + new Index[] { + Indexes.primary("k1_pk", new String[][] {{"col_1"}, {"col_2"}}), + Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}}), + Indexes.unique("u2_key", new String[][] {{"col_3"}, {"col_4"}}), + Indexes.unique("u3_key", new String[][] {{"col_5"}, {"col_4"}}), + Indexes.unique("u4_key", new String[][] {{"col_2"}, {"col_3"}, {"col_4"}}), + Indexes.unique("u5_key", new String[][] {{"col_2"}, {"col_3"}, {"col_5"}}), + Indexes.unique("u6_key", new String[][] {{"col_1"}, {"col_2"}, {"col_3"}, {"col_4"}}), + }; + + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + newColumns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + indexes); + assertionsTableInfo( + tableName, table_comment, Arrays.asList(newColumns), properties, indexes, createdTable); + Table table = tableCatalog.loadTable(tableIdentifier); + assertionsTableInfo( + tableName, table_comment, Arrays.asList(newColumns), properties, indexes, table); + + IllegalArgumentException illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> { + tableCatalog.createTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, "test_failed"), + newColumns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + new Index[] {Indexes.createMysqlPrimaryKey(new String[][] {{"col_1", "col_2"}})}); + }); + Assertions.assertTrue( + StringUtils.contains( + illegalArgumentException.getMessage(), + "Index does not support complex fields in PostgreSQL")); + + illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> { + tableCatalog.createTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, "test_failed"), + newColumns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + new Index[] {Indexes.unique("u1_key", new String[][] {{"col_2", "col_3"}})}); + }); + Assertions.assertTrue( + StringUtils.contains( + illegalArgumentException.getMessage(), + "Index does not support complex fields in PostgreSQL")); + + table = + tableCatalog.createTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, "test_null_key"), + newColumns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + new Index[] { + Indexes.of( + Index.IndexType.UNIQUE_KEY, + null, + new String[][] {{"col_1"}, {"col_3"}, {"col_4"}}), + Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][] {{"col_4"}}), + }); + Assertions.assertEquals(2, table.index().length); + Assertions.assertNotNull(table.index()[0].name()); + Assertions.assertNotNull(table.index()[1].name()); + + table = + tableCatalog.createTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, "many_index"), + newColumns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + new Index[] { + Indexes.unique("u4_key_2", new String[][] {{"col_2"}, {"col_3"}, {"col_4"}}), + Indexes.unique("u5_key_3", new String[][] {{"col_2"}, {"col_3"}, {"col_4"}}), + }); + Assertions.assertEquals(1, table.index().length); + Assertions.assertEquals("u4_key_2", table.index()[0].name()); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/TestPostgreSqlTableOperations.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/TestPostgreSqlTableOperations.java index 035fad28a4d..f96b3f126a2 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/TestPostgreSqlTableOperations.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/jdbc/postgresql/TestPostgreSqlTableOperations.java @@ -12,9 +12,11 @@ import com.datastrato.gravitino.catalog.postgresql.converter.PostgreSqlTypeConverter; import com.datastrato.gravitino.catalog.postgresql.operation.PostgreSqlSchemaOperations; import com.datastrato.gravitino.catalog.postgresql.operation.PostgreSqlTableOperations; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; import com.datastrato.gravitino.exceptions.NoSuchTableException; import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.indexes.Indexes; import com.datastrato.gravitino.rel.types.Type; import com.datastrato.gravitino.rel.types.Types; @@ -479,4 +481,85 @@ public void testCreateAutoIncrementTable() { Assertions.assertTrue( StringUtils.contains(illegalArgumentException.getMessage(), "Unsupported auto-increment")); } + + @Test + public void testCreateIndexTable() { + String tableName = GravitinoITUtils.genRandomName("index_table_"); + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + columns.add( + new JdbcColumn.Builder() + .withName("col_1") + .withType(Types.LongType.get()) + .withComment("increment key") + .withNullable(false) + .withAutoIncrement(true) + .build()); + columns.add( + new JdbcColumn.Builder() + .withName("col_2") + .withType(INT) + .withNullable(false) + .withComment("id-1") + .build()); + columns.add( + new JdbcColumn.Builder() + .withName("col_3") + .withType(VARCHAR) + .withNullable(false) + .withComment("name") + .build()); + columns.add( + new JdbcColumn.Builder() + .withName("col_4") + .withType(VARCHAR) + .withNullable(false) + .withComment("city") + .build()); + Map properties = new HashMap<>(); + + Index[] indexes = + new Index[] { + Indexes.primary("test_pk", new String[][] {{"col_1"}, {"col_2"}}), + Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}}), + Indexes.unique("u2_key", new String[][] {{"col_3"}, {"col_4"}}) + }; + + // Test create table index success. + TABLE_OPERATIONS.create( + TEST_DB_NAME, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + properties, + null, + indexes); + + JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + TABLE_OPERATIONS.drop(TEST_DB_NAME, tableName); + + // Test create table index failed. + GravitinoRuntimeException gravitinoRuntimeException = + Assertions.assertThrows( + GravitinoRuntimeException.class, + () -> + TABLE_OPERATIONS.create( + TEST_DB_NAME, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + properties, + null, + new Index[] { + Indexes.primary("no_exist_pk", new String[][] {{"no_exist_1"}}), + Indexes.unique( + "no_exist_key", new String[][] {{"no_exist_2"}, {"no_exist_3"}}) + })); + Assertions.assertTrue( + StringUtils.contains( + gravitinoRuntimeException.getMessage(), + "column \"no_exist_1\" named in key does not exist")); + } }