diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fea7490f2a1..1f67b50ee40 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -97,7 +97,7 @@ When adding new code or fixing a bug be sure to add unit tests to provide covera Spotless checks code formatting. If your code isn't correctly formatted, the build fails. To correctly format your code please use Spotless. ```bash -./grawdlew spotlessApply +./gradlew spotlessApply ``` All files must have a license header and the build fails if any files are missing license headers. If you are adding third-party code be sure to understand how to add the third-party license to Gravitino LICENSE and NOTICE files. diff --git a/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java b/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java index af971586ee2..ce471880a9e 100644 --- a/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java +++ b/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java @@ -49,17 +49,17 @@ public class PostgreSqlTableOperations extends JdbcTableOperations { + " pg_namespace AS n ON n.oid = c.relnamespace\n" + "WHERE \n" + " a.attnum > 0 \n" - + " AND c.relname = ?"; + + " AND c.relname = ? AND n.nspname = ?"; private static final String SHOW_COLUMN_INFO_SQL = - "select * FROM information_schema.columns WHERE table_name = ? order by ordinal_position"; + "select * FROM information_schema.columns WHERE table_name = ? AND table_schema = ? order by ordinal_position"; private static final String SHOW_TABLE_COMMENT_SQL = "SELECT tb.table_name, d.description\n" + "FROM information_schema.tables tb\n" + " JOIN pg_class c ON c.relname = tb.table_name\n" + " LEFT JOIN pg_description d ON d.objoid = c.oid AND d.objsubid = '0'\n" - + "WHERE tb.table_name = ?;"; + + "WHERE tb.table_name = ? AND table_schema = ?;"; private String database; @@ -80,15 +80,18 @@ public void initialize( public JdbcTable load(String schema, String tableName) throws NoSuchTableException { try (Connection connection = getConnection(schema)) { // The first step is to obtain the comment information of the column. - Map columnCommentMap = selectColumnComment(tableName, connection); + Map columnCommentMap = selectColumnComment(schema, tableName, connection); // The second step is to obtain the column information of the table. List jdbcColumns = selectColumnInfoAndExecute( - tableName, connection, (builder, s) -> builder.withComment(columnCommentMap.get(s))); + schema, + tableName, + connection, + (builder, s) -> builder.withComment(columnCommentMap.get(s))); // The third step is to obtain the comment information of the table. - String comment = selectTableComment(tableName, connection); + String comment = selectTableComment(schema, tableName, connection); return new JdbcTable.Builder() .withName(tableName) .withColumns(jdbcColumns.toArray(new JdbcColumn[0])) @@ -102,6 +105,7 @@ public JdbcTable load(String schema, String tableName) throws NoSuchTableExcepti } private List selectColumnInfoAndExecute( + String schemaName, String tableName, Connection connection, BiConsumer builderConsumer) @@ -109,6 +113,7 @@ private List selectColumnInfoAndExecute( List jdbcColumns = new ArrayList<>(); try (PreparedStatement preparedStatement = connection.prepareStatement(SHOW_COLUMN_INFO_SQL)) { preparedStatement.setString(1, tableName); + preparedStatement.setString(2, schemaName); try (ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { ColDataType colDataType = new ColDataType(); @@ -153,10 +158,12 @@ private static List getArgList(ResultSet resultSet) throws SQLException return result; } - private String selectTableComment(String tableName, Connection connection) throws SQLException { + private String selectTableComment(String schema, String tableName, Connection connection) + throws SQLException { try (PreparedStatement preparedStatement = connection.prepareStatement(SHOW_TABLE_COMMENT_SQL)) { preparedStatement.setString(1, tableName); + preparedStatement.setString(2, schema); try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { return resultSet.getString("description"); @@ -170,13 +177,14 @@ private String selectTableComment(String tableName, Connection connection) throw * @return Returns the column names and comments of the table * @throws SQLException */ - private Map selectColumnComment(String tableName, Connection connection) - throws SQLException { + private Map selectColumnComment( + String schema, String tableName, Connection connection) throws SQLException { Map columnCommentMap = new HashMap<>(); try (PreparedStatement preparedStatement = connection.prepareStatement(SHOW_COLUMN_COMMENT_SQL)) { preparedStatement.setString(1, tableName); + preparedStatement.setString(2, schema); try (ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { String comment = resultSet.getString("comment"); diff --git a/common/src/main/java/com/datastrato/gravitino/dto/MetalakeDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/MetalakeDTO.java index 3aabfd60c32..ef4cd90612b 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/MetalakeDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/MetalakeDTO.java @@ -7,14 +7,13 @@ import com.datastrato.gravitino.Audit; import com.datastrato.gravitino.Metalake; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; import java.util.Map; import javax.annotation.Nullable; -import lombok.EqualsAndHashCode; import lombok.ToString; /** Represents a Metalake Data Transfer Object (DTO) that implements the Metalake interface. */ -@EqualsAndHashCode @ToString public class MetalakeDTO implements Metalake { @@ -132,4 +131,40 @@ public MetalakeDTO build() { return new MetalakeDTO(name, comment, properties, audit); } } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MetalakeDTO that = (MetalakeDTO) o; + return Objects.equal(name, that.name) + && Objects.equal(comment, that.comment) + && propertyEqual(properties, that.properties) + && Objects.equal(audit, that.audit); + } + + private boolean propertyEqual(Map p1, Map p2) { + if (p1 == null && p2 == null) { + return true; + } + + if (p1 != null && p1.isEmpty() && p2 == null) { + return true; + } + + if (p2 != null && p2.isEmpty() && p1 == null) { + return true; + } + + return java.util.Objects.equals(p1, p2); + } + + @Override + public int hashCode() { + return Objects.hashCode(name, comment, audit); + } } diff --git a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java index 38186c452de..ed9ec3ea1a3 100644 --- a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java +++ b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java @@ -704,9 +704,13 @@ public TableChange.ColumnPosition deserialize(JsonParser p, DeserializationConte node != null && !node.isNull(), "Cannot parse column position from invalid JSON: %s", node); - if (node.isTextual() && node.asText().equals(POSITION_FIRST)) { + if (node.isTextual() + && (node.asText().equals(POSITION_FIRST) + || node.asText().equals(POSITION_FIRST.toUpperCase()))) { return TableChange.ColumnPosition.first(); - } else if (node.isTextual() && node.asText().equals(POSITION_DEFAULT)) { + } else if (node.isTextual() + && (node.asText().equalsIgnoreCase(POSITION_DEFAULT) + || node.asText().equalsIgnoreCase(POSITION_DEFAULT.toUpperCase()))) { return TableChange.ColumnPosition.defaultPos(); } else if (node.isObject()) { String afterColumn = getString(POSITION_AFTER, node); diff --git a/common/src/test/java/com/datastrato/gravitino/dto/requests/TestTableUpdatesRequest.java b/common/src/test/java/com/datastrato/gravitino/dto/requests/TestTableUpdatesRequest.java index 78a24cabc66..6800012437f 100644 --- a/common/src/test/java/com/datastrato/gravitino/dto/requests/TestTableUpdatesRequest.java +++ b/common/src/test/java/com/datastrato/gravitino/dto/requests/TestTableUpdatesRequest.java @@ -151,6 +151,13 @@ public void testAddTableColumnRequest() throws JsonProcessingException { + "}"; Assertions.assertEquals( JsonUtils.objectMapper().readTree(expected), JsonUtils.objectMapper().readTree(jsonString)); + Assertions.assertTrue( + JsonUtils.objectMapper() + .readValue( + jsonString.replace("first", "FIRST"), + TableUpdateRequest.AddTableColumnRequest.class) + .getPosition() + instanceof TableChange.First); // test default position addTableColumnRequest = @@ -170,5 +177,12 @@ public void testAddTableColumnRequest() throws JsonProcessingException { + "}"; Assertions.assertEquals( JsonUtils.objectMapper().readTree(expected), JsonUtils.objectMapper().readTree(jsonString)); + Assertions.assertTrue( + JsonUtils.objectMapper() + .readValue( + jsonString.replace("default", "DEFAULT"), + TableUpdateRequest.AddTableColumnRequest.class) + .getPosition() + instanceof TableChange.Default); } } diff --git a/core/src/main/java/com/datastrato/gravitino/StringIdentifier.java b/core/src/main/java/com/datastrato/gravitino/StringIdentifier.java index 9f40c7dab87..942ce27ab53 100644 --- a/core/src/main/java/com/datastrato/gravitino/StringIdentifier.java +++ b/core/src/main/java/com/datastrato/gravitino/StringIdentifier.java @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -108,6 +109,27 @@ public static Map addToProperties( .build(); } + /** + * Remove StringIdentifier from properties. + * + * @param properties the properties to remove the string identifier from. + * @return the properties with the string identifier removed. + */ + public static Map removeIdFromProperties(Map properties) { + if (properties == null) { + return null; + } + + if (!properties.containsKey(ID_KEY)) { + return properties; + } + + Map copy = Maps.newHashMap(properties); + copy.remove(ID_KEY); + + return ImmutableMap.builder().putAll(copy).build(); + } + public static StringIdentifier fromProperties(Map properties) { if (properties == null) { return null; diff --git a/core/src/main/java/com/datastrato/gravitino/meta/BaseMetalake.java b/core/src/main/java/com/datastrato/gravitino/meta/BaseMetalake.java index 69cdef18d6b..f6de4bf2a54 100644 --- a/core/src/main/java/com/datastrato/gravitino/meta/BaseMetalake.java +++ b/core/src/main/java/com/datastrato/gravitino/meta/BaseMetalake.java @@ -10,6 +10,7 @@ import com.datastrato.gravitino.Field; import com.datastrato.gravitino.HasIdentifier; import com.datastrato.gravitino.Metalake; +import com.datastrato.gravitino.StringIdentifier; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -126,7 +127,7 @@ public EntityType type() { */ @Override public Map properties() { - return properties; + return StringIdentifier.removeIdFromProperties(properties); } /** Builder class for creating instances of {@link BaseMetalake}. */ diff --git a/core/src/main/java/com/datastrato/gravitino/meta/MetalakeManager.java b/core/src/main/java/com/datastrato/gravitino/meta/MetalakeManager.java index f7214d58ed7..5ca502f367a 100644 --- a/core/src/main/java/com/datastrato/gravitino/meta/MetalakeManager.java +++ b/core/src/main/java/com/datastrato/gravitino/meta/MetalakeManager.java @@ -52,9 +52,8 @@ public MetalakeManager(EntityStore store, IdGenerator idGenerator) { @Override public BaseMetalake[] listMetalakes() { try { - return store - .list(Namespace.empty(), BaseMetalake.class, EntityType.METALAKE) - .toArray(new BaseMetalake[0]); + return store.list(Namespace.empty(), BaseMetalake.class, EntityType.METALAKE).stream() + .toArray(BaseMetalake[]::new); } catch (IOException ioe) { LOG.error("Listing Metalakes failed due to storage issues.", ioe); throw new RuntimeException(ioe); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java index 15d3bd25b1f..b481e635411 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java @@ -44,7 +44,7 @@ public final class KvGarbageCollector implements Closeable { new ScheduledThreadPoolExecutor( 2, r -> { - Thread t = new Thread(r, "KvEntityStore-Garbage-Collector-%d"); + Thread t = new Thread(r, "KvEntityStore-Garbage-Collector"); t.setDaemon(true); return t; }, diff --git a/core/src/test/java/com/datastrato/gravitino/meta/TestMetalakeManager.java b/core/src/test/java/com/datastrato/gravitino/meta/TestMetalakeManager.java index 7caf69d602e..eede4fb2c69 100644 --- a/core/src/test/java/com/datastrato/gravitino/meta/TestMetalakeManager.java +++ b/core/src/test/java/com/datastrato/gravitino/meta/TestMetalakeManager.java @@ -173,8 +173,6 @@ private void testProperties(Map expectedProps, Map tableNames = TABLE_OPERATIONS.listTables(TEST_DB_NAME); Assertions.assertFalse(tableNames.contains(table_1)); + Assertions.assertThrows( + NoSuchTableException.class, () -> TABLE_OPERATIONS.load(TEST_DB_NAME, table_1)); + + Assertions.assertThrows( + NoSuchTableException.class, () -> TABLE_OPERATIONS.load("other_schema", table_1)); + Assertions.assertThrows( + NoSuchTableException.class, () -> postgreSqlTableOperations.load("other_schema", table_1)); + String table_2 = "table_multiple_2"; TABLE_OPERATIONS.create( TEST_DB_NAME, @@ -369,10 +377,11 @@ public void testCreateMultipleTable() throws SQLException { Assertions.assertFalse(tableNames.contains(table_2)); Assertions.assertThrows( - NoSuchTableException.class, - () -> { - postgreSqlTableOperations.load(TEST_DB_NAME, table_2); - }); + NoSuchTableException.class, () -> postgreSqlTableOperations.load(TEST_DB_NAME, table_2)); + Assertions.assertThrows( + NoSuchTableException.class, () -> postgreSqlTableOperations.load("other_schema", table_2)); + Assertions.assertThrows( + NoSuchTableException.class, () -> TABLE_OPERATIONS.load("other_schema", table_2)); postgreSqlTableOperations.purge(TEST_DB_NAME, table_1); Assertions.assertThrows( diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTHiveCatalogIT.java index 70a4267aaf6..dba80ebaa66 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTHiveCatalogIT.java @@ -4,7 +4,15 @@ */ package com.datastrato.gravitino.integration.test.catalog.lakehouse.iceberg; +import com.datastrato.gravitino.aux.AuxiliaryServiceManager; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergRESTService; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.HiveContainer; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; @@ -15,7 +23,47 @@ @Tag("gravitino-docker-it") @TestInstance(Lifecycle.PER_CLASS) public class IcebergRESTHiveCatalogIT extends IcebergRESTServiceIT { + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + public IcebergRESTHiveCatalogIT() { catalogType = IcebergCatalogBackend.HIVE; } + + @Override + void initEnv() { + containerSuite.startHiveContainer(); + } + + @Override + Map getCatalogConfig() { + Map customConfigs = new HashMap<>(); + customConfigs.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_BACKEND.getKey(), + IcebergCatalogBackend.HIVE.toString().toLowerCase()); + + customConfigs.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_URI.getKey(), + String.format( + "thrift://%s:%d", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HIVE_METASTORE_PORT)); + + customConfigs.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_WAREHOUSE.getKey(), + GravitinoITUtils.genRandomName( + String.format( + "hdfs://%s:%d/user/hive/warehouse-hive", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT))); + return customConfigs; + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTJdbcCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTJdbcCatalogIT.java index b70b9250b5a..36088027d5a 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTJdbcCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTJdbcCatalogIT.java @@ -5,7 +5,15 @@ package com.datastrato.gravitino.integration.test.catalog.lakehouse.iceberg; +import com.datastrato.gravitino.aux.AuxiliaryServiceManager; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergRESTService; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.HiveContainer; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; @@ -13,7 +21,65 @@ @Tag("gravitino-docker-it") @TestInstance(Lifecycle.PER_CLASS) public class IcebergRESTJdbcCatalogIT extends IcebergRESTServiceIT { + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + public IcebergRESTJdbcCatalogIT() { catalogType = IcebergCatalogBackend.JDBC; } + + @Override + void initEnv() { + containerSuite.startHiveContainer(); + } + + public Map getCatalogConfig() { + Map configMap = new HashMap<>(); + + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_BACKEND.getKey(), + IcebergCatalogBackend.JDBC.toString().toLowerCase()); + + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_URI.getKey(), + "jdbc:sqlite::memory:"); + + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.JDBC_USER.getKey(), + "iceberg"); + + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.JDBC_PASSWORD.getKey(), + "iceberg"); + + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.JDBC_INIT_TABLES.getKey(), + "true"); + + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_WAREHOUSE.getKey(), + GravitinoITUtils.genRandomName( + String.format( + "hdfs://%s:%d/user/hive/warehouse-jdbc-sqlite", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT))); + return configMap; + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceBaseIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceBaseIT.java index efa5b5e0abe..7fb56fa8276 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceBaseIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceBaseIT.java @@ -10,16 +10,12 @@ import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergRESTService; -import com.datastrato.gravitino.integration.test.container.ContainerSuite; -import com.datastrato.gravitino.integration.test.container.HiveContainer; import com.datastrato.gravitino.integration.test.util.AbstractIT; -import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; import com.datastrato.gravitino.server.web.JettyServerConfig; import com.datastrato.gravitino.utils.MapUtils; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,8 +27,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.TestInstance.Lifecycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,31 +34,33 @@ *

Referred from spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java */ -@TestInstance(Lifecycle.PER_CLASS) -public class IcebergRESTServiceBaseIT extends AbstractIT { +public abstract class IcebergRESTServiceBaseIT extends AbstractIT { public static final Logger LOG = LoggerFactory.getLogger(IcebergRESTServiceBaseIT.class); - private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); private SparkSession sparkSession; protected IcebergCatalogBackend catalogType = IcebergCatalogBackend.MEMORY; @BeforeAll void initIcebergTestEnv() throws Exception { - containerSuite.startHiveContainer(); + // Start Gravitino docker container + initEnv(); + // Inject Iceberg REST service config to gravitino.conf registerIcebergCatalogConfig(); + // Start Gravitino server AbstractIT.startIntegrationTest(); + // Start Spark session initSparkEnv(); - LOG.info("gravitino and spark env started,{}", catalogType); + LOG.info("Gravitino and Spark env started,{}", catalogType); } @AfterAll void stopIcebergTestEnv() throws Exception { stopSparkEnv(); AbstractIT.stopIntegrationTest(); - LOG.info("gravitino and spark env stopped,{}", catalogType); + LOG.info("Gravitino and Spark env stopped,{}", catalogType); } // AbstractIT#startIntegrationTest() is static, so we couldn't inject catalog info - // if startIntegrationTest() is auto invoked by Junit. so here we override + // if startIntegrationTest() is auto invoked by Junit. So here we override // startIntegrationTest() to disable the auto invoke by junit. @BeforeAll public static void startIntegrationTest() {} @@ -76,129 +72,16 @@ boolean catalogTypeNotMemory() { return !catalogType.equals(IcebergCatalogBackend.MEMORY); } - private void registerIcebergCatalogConfig() { - Map icebergConfigs; + abstract void initEnv(); - switch (catalogType) { - case HIVE: - icebergConfigs = getIcebergHiveCatalogConfigs(); - break; - case JDBC: - icebergConfigs = getIcebergJdbcCatalogConfigs(); - break; - case MEMORY: - icebergConfigs = getIcebergMemoryCatalogConfigs(); - break; - default: - throw new RuntimeException("Not support Iceberg catalog type:" + catalogType); - } + abstract Map getCatalogConfig(); + private void registerIcebergCatalogConfig() { + Map icebergConfigs = getCatalogConfig(); AbstractIT.registerCustomConfigs(icebergConfigs); LOG.info("Iceberg REST service config registered," + StringUtils.join(icebergConfigs)); } - private static Map getIcebergMemoryCatalogConfigs() { - Map configMap = new HashMap<>(); - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_BACKEND.getKey(), - IcebergCatalogBackend.MEMORY.toString().toLowerCase()); - - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_WAREHOUSE.getKey(), - "/tmp/"); - return configMap; - } - - private static Map getIcebergJdbcCatalogConfigs() { - Map configMap = new HashMap<>(); - - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_BACKEND.getKey(), - IcebergCatalogBackend.JDBC.toString().toLowerCase()); - - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_URI.getKey(), - "jdbc:sqlite::memory:"); - - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.JDBC_USER.getKey(), - "iceberg"); - - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.JDBC_PASSWORD.getKey(), - "iceberg"); - - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.JDBC_INIT_TABLES.getKey(), - "true"); - - configMap.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_WAREHOUSE.getKey(), - GravitinoITUtils.genRandomName( - String.format( - "hdfs://%s:%d/user/hive/warehouse-jdbc-sqlite", - containerSuite.getHiveContainer().getContainerIpAddress(), - HiveContainer.HDFS_DEFAULTFS_PORT))); - - return configMap; - } - - private static Map getIcebergHiveCatalogConfigs() { - Map customConfigs = new HashMap<>(); - customConfigs.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_BACKEND.getKey(), - IcebergCatalogBackend.HIVE.toString().toLowerCase()); - - customConfigs.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_URI.getKey(), - String.format( - "thrift://%s:%d", - containerSuite.getHiveContainer().getContainerIpAddress(), - HiveContainer.HIVE_METASTORE_PORT)); - - customConfigs.put( - AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX - + IcebergRESTService.SERVICE_NAME - + "." - + IcebergConfig.CATALOG_WAREHOUSE.getKey(), - GravitinoITUtils.genRandomName( - String.format( - "hdfs://%s:%d/user/hive/warehouse-hive", - containerSuite.getHiveContainer().getContainerIpAddress(), - HiveContainer.HDFS_DEFAULTFS_PORT))); - return customConfigs; - } - private static IcebergConfig buildIcebergConfig(Config config) { Map m = config.getConfigsWithPrefix(AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX); diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceIT.java index f57ff8c822e..aa749042fee 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/IcebergRESTServiceIT.java @@ -5,10 +5,14 @@ package com.datastrato.gravitino.integration.test.catalog.lakehouse.iceberg; +import com.datastrato.gravitino.aux.AuxiliaryServiceManager; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergRESTService; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -23,14 +27,12 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.condition.EnabledIf; @TestInstance(Lifecycle.PER_CLASS) -@Tag("gravitino-docker-it") public class IcebergRESTServiceIT extends IcebergRESTServiceBaseIT { private static final String ICEBERG_REST_NS_PREFIX = "iceberg_rest_"; @@ -48,6 +50,28 @@ void cleanup() { purgeAllIcebergTestNamespaces(); } + @Override + void initEnv() {} + + @Override + Map getCatalogConfig() { + Map configMap = new HashMap<>(); + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_BACKEND.getKey(), + IcebergCatalogBackend.MEMORY.toString().toLowerCase()); + + configMap.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + IcebergRESTService.SERVICE_NAME + + "." + + IcebergConfig.CATALOG_WAREHOUSE.getKey(), + "/tmp/"); + return configMap; + } + private void purgeTable(String namespace, String table) { sql(String.format("DROP TABLE %s.%s PURGE", namespace, table)); } diff --git a/integration-test/src/test/resources/trino-queries/catalogs/jdbc-postgresql/00003_join_pushdown.sql b/integration-test/src/test/resources/trino-queries/catalogs/jdbc-postgresql/00003_join_pushdown.sql new file mode 100644 index 00000000000..46395154373 --- /dev/null +++ b/integration-test/src/test/resources/trino-queries/catalogs/jdbc-postgresql/00003_join_pushdown.sql @@ -0,0 +1,61 @@ +CREATE SCHEMA "test.jdbc-postgresql".gt_db1; + +use "test.jdbc-postgresql".gt_db1; + +CREATE TABLE "test.jdbc-postgresql".gt_db1.employee_performance ( + employee_id integer, + evaluation_date date, + rating integer +) +COMMENT 'comment'; + +CREATE TABLE "test.jdbc-postgresql".gt_db1.employees ( + employee_id integer, + department_id integer, + job_title varchar(100), + given_name varchar(100), + family_name varchar(100), + birth_date date, + hire_date date +) +COMMENT 'comment'; + +INSERT INTO "test.jdbc-postgresql".gt_db1.employee_performance (employee_id, evaluation_date, rating) VALUES +(1, DATE '2018-02-24', 4), +(1, DATE '2016-12-25', 7), +(1, DATE '2023-04-07', 4), +(3, DATE '2012-11-08', 7), +(3, DATE '2019-09-15', 2), +(3, DATE '2017-06-21', 8), +(3, DATE '2019-07-16', 4), +(3, DATE '2015-10-06', 4), +(3, DATE '2021-01-05', 6), +(3, DATE '2014-10-24', 4); + +INSERT INTO "test.jdbc-postgresql".gt_db1.employees (employee_id, department_id, job_title, given_name, family_name, birth_date, hire_date) VALUES +(1, 1, 'Manager', 'Gregory', 'Smith', DATE '1968-04-15', DATE '2014-06-04'), +(2, 1, 'Sales Assistant', 'Owen', 'Rivers', DATE '1988-08-13', DATE '2021-02-05'), +(3, 1, 'Programmer', 'Avram', 'Lawrence', DATE '1969-11-21', DATE '2010-09-29'), +(4, 1, 'Sales Assistant', 'Burton', 'Everett', DATE '2001-12-07', DATE '2016-06-25'), +(5, 1, 'Sales Assistant', 'Cedric', 'Barlow', DATE '1972-02-02', DATE '2012-08-15'), +(6, 2, 'Sales Assistant', 'Jasper', 'Mack', DATE '2002-03-29', DATE '2020-09-13'), +(7, 1, 'Sales Assistant', 'Felicia', 'Robinson', DATE '1973-08-21', DATE '2023-05-14'), +(8, 3, 'Sales Assistant', 'Mason', 'Steele', DATE '1964-05-19', DATE '2019-02-06'), +(9, 3, 'Programmer', 'Bernard', 'Cameron', DATE '1995-08-27', DATE '2018-07-12'), +(10, 2, 'Programmer', 'Chelsea', 'Wade', DATE '2007-01-29', DATE '2016-04-16'); + +SELECT + given_name, + family_name, + rating +FROM "test.jdbc-postgresql".gt_db1.employee_performance AS p +JOIN "test.jdbc-postgresql".gt_db1.employees AS e + ON p.employee_id = e.employee_id +ORDER BY +rating DESC +LIMIT 10; + +drop table "test.jdbc-postgresql".gt_db1.employee_performance; +drop table "test.jdbc-postgresql".gt_db1.employees; + +drop schema "test.jdbc-postgresql".gt_db1; \ No newline at end of file diff --git a/integration-test/src/test/resources/trino-queries/catalogs/jdbc-postgresql/00003_join_pushdown.txt b/integration-test/src/test/resources/trino-queries/catalogs/jdbc-postgresql/00003_join_pushdown.txt new file mode 100644 index 00000000000..4b75b585b91 --- /dev/null +++ b/integration-test/src/test/resources/trino-queries/catalogs/jdbc-postgresql/00003_join_pushdown.txt @@ -0,0 +1,28 @@ +CREATE SCHEMA + +USE + +CREATE TABLE + +CREATE TABLE + +INSERT: 10 rows + +INSERT: 10 rows + +"Avram","Lawrence","8" +"Avram","Lawrence","7" +"Gregory","Smith","7" +"Avram","Lawrence","6" +"Avram","Lawrence","4" +"Avram","Lawrence","4" +"Gregory","Smith","4" +"Gregory","Smith","4" +"Avram","Lawrence","4" +"Avram","Lawrence","2" + +DROP TABLE + +DROP TABLE + +DROP SCHEMA \ No newline at end of file diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java index 2468ad5b78b..ef7532b2f8f 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java @@ -30,11 +30,12 @@ public ConnectorSplitSource getSplits( GravitinoTransactionHandle gravitinoTransactionHandle = (GravitinoTransactionHandle) transaction; + // TODO(yuhui) add dynamic filter return internalSplitManager.getSplits( gravitinoTransactionHandle.getInternalTransactionHandle(), session, gravitinoTableHandle.getInternalTableHandle(), - dynamicFilter, + DynamicFilter.EMPTY, constraint); } } diff --git a/web/build.gradle.kts b/web/build.gradle.kts index b29f093893d..3dbac5c32f6 100644 --- a/web/build.gradle.kts +++ b/web/build.gradle.kts @@ -52,7 +52,10 @@ tasks { } clean { + delete(".node") delete("build") delete("dist") + delete("node_modules") + delete("yarn-error.log") } }