From 74677dfd3fe774a6368372aaa763361c3cbbf801 Mon Sep 17 00:00:00 2001 From: cai can <94670132+caican00@users.noreply.github.com> Date: Mon, 26 Feb 2024 11:56:38 +0800 Subject: [PATCH 1/4] [#1907] improvement(trino-connector): Add missing `@Override` annotation in subclass method (#2338) ### What changes were proposed in this pull request? Add missing `@Override` annotation in subclass method. ### Why are the changes needed? Used to identify overridden parent class methods. Fix: [#1907](https://github.com/datastrato/gravitino/issues/1907) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./gradlew :trino-connector:test ` --- .../gravitino/trino/connector/GravitinoSplitManager.java | 1 + .../trino/connector/catalog/hive/HiveConnectorAdapter.java | 2 ++ .../connector/catalog/iceberg/IcebergConnectorAdapter.java | 2 ++ .../connector/catalog/jdbc/mysql/MySQLConnectorAdapter.java | 2 ++ .../connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java | 1 + .../catalog/jdbc/postgresql/PostgreSQLConnectorAdapter.java | 2 ++ 6 files changed, 10 insertions(+) diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java index 663dad459c5..2fa6b286c3e 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoSplitManager.java @@ -20,6 +20,7 @@ public GravitinoSplitManager(ConnectorSplitManager internalSplitManager) { this.internalSplitManager = internalSplitManager; } + @Override public ConnectorSplitSource getSplits( ConnectorTransactionHandle transaction, ConnectorSession session, diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java index 2fc5ffb4d4d..28cc2762088 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveConnectorAdapter.java @@ -30,6 +30,7 @@ public HiveConnectorAdapter() { this.catalogConverter = new HiveCatalogPropertyConverter(); } + @Override public Map buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception { Map config = new HashMap<>(); @@ -69,6 +70,7 @@ public List> getSchemaProperties() { return propertyMetadata.getSchemaPropertyMetadata(); } + @Override public CatalogConnectorMetadataAdapter getMetadataAdapter() { // TODO yuhui Need to improve schema table and column properties return new HiveMetadataAdapter( diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java index f7e276d028b..4176ce901b1 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/iceberg/IcebergConnectorAdapter.java @@ -28,6 +28,7 @@ public IcebergConnectorAdapter() { this.catalogConverter = new IcebergCatalogPropertyConverter(); } + @Override public Map buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception { Map config = new HashMap<>(); @@ -42,6 +43,7 @@ public Map buildInternalConnectorConfig(GravitinoCatalog catalog return config; } + @Override public CatalogConnectorMetadataAdapter getMetadataAdapter() { // TODO yuhui Need to improve schema table and column properties return new IcebergMetadataAdapter(getSchemaProperties(), getTableProperties(), emptyList()); diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLConnectorAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLConnectorAdapter.java index 06941d90ef4..ee7b7bae7fd 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLConnectorAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLConnectorAdapter.java @@ -30,6 +30,7 @@ public MySQLConnectorAdapter() { this.propertyMetadata = new MySQLPropertyMeta(); } + @Override public Map buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception { Map config = new HashMap<>(); @@ -44,6 +45,7 @@ public Map buildInternalConnectorConfig(GravitinoCatalog catalog return config; } + @Override public CatalogConnectorMetadataAdapter getMetadataAdapter() { // TODO yuhui Need to improve schema table and column properties return new MySQLMetadataAdapter(getSchemaProperties(), getTableProperties(), emptyList()); diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java index 48f9b795eb1..04527dc74fd 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java @@ -44,6 +44,7 @@ public Map toTrinoTableProperties(Map properties } /** Transform trino ConnectorTableMetadata to gravitino table metadata */ + @Override public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) { String tableName = tableMetadata.getTableSchema().getTable().getTableName(); String schemaName = tableMetadata.getTableSchema().getTable().getSchemaName(); diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/postgresql/PostgreSQLConnectorAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/postgresql/PostgreSQLConnectorAdapter.java index 4b9b90e1190..feb12fdb0c5 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/postgresql/PostgreSQLConnectorAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/jdbc/postgresql/PostgreSQLConnectorAdapter.java @@ -24,6 +24,7 @@ public PostgreSQLConnectorAdapter() { this.catalogConverter = new JDBCCatalogPropertyConverter(); } + @Override public Map buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception { Map config = new HashMap<>(); @@ -38,6 +39,7 @@ public Map buildInternalConnectorConfig(GravitinoCatalog catalog return config; } + @Override public CatalogConnectorMetadataAdapter getMetadataAdapter() { // TODO yuhui Need to improve schema table and column properties return new PostgreSQLMetadataAdapter(getSchemaProperties(), getTableProperties(), emptyList()); From 92dd1b714380949301d1f05997a4a6dd4eaac30f Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Mon, 26 Feb 2024 16:58:46 +0800 Subject: [PATCH 2/4] [#2275] fix(test): Check the version and number of gravitino trino connector before starting Trino server (#2278) ### What changes were proposed in this pull request? Modify the build script and start the clean task to remove old version jars. ### Why are the changes needed? In some cases, there may be compatibility issues between different releases version, if we do not do clean task, when switching from an old version to the new version, there is a chance that old jars are not clean and co-exist with new jars, then things go wrong. Fix: #2275 ### Does this PR introduce _any_ user-facing change? N/A. ### How was this patch tested? N/A. --- dev/docker/trino/Dockerfile | 12 +++++----- dev/docker/trino/conf/update-trino-conf.sh | 7 ++++++ docs/docker-image-details.md | 3 +++ integration-test/build.gradle.kts | 22 ++++++++++++++++++- integration-test/trino-it/docker-compose.yaml | 2 +- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/dev/docker/trino/Dockerfile b/dev/docker/trino/Dockerfile index 46d56e23c95..16790b0f4d7 100644 --- a/dev/docker/trino/Dockerfile +++ b/dev/docker/trino/Dockerfile @@ -59,11 +59,11 @@ RUN if [ "$IMAGE_NAME" = "datastrato/trino" ] ; then \ fi - -# Use ARGs to update trino conf and start Trino server -ARG GRAVITINO_HOST_IP -ARG GRAVITINO_HOST_PORT -ARG GRAVITINO_METALAKE_NAME -ARG HIVE_HOST_IP +# The following ENV are needed to update configuration file before starting Trino server, you need to +# pass these ENV variables to the container when starting it like `docker run --env GRAVITINO_HOST_IP=xxx --env GRAVITINO_HOST_PORT=xxxx` +#ENV GRAVITINO_HOST_IP +#ENV GRAVITINO_HOST_PORT +#ENV GRAVITINO_METALAKE_NAME +#ENV HADOOP_USER_NAME ENTRYPOINT ["/bin/bash", "-c", "/etc/trino/update-trino-conf.sh && /usr/lib/trino/bin/run-trino"] diff --git a/dev/docker/trino/conf/update-trino-conf.sh b/dev/docker/trino/conf/update-trino-conf.sh index d800883d485..890dc38ced4 100755 --- a/dev/docker/trino/conf/update-trino-conf.sh +++ b/dev/docker/trino/conf/update-trino-conf.sh @@ -13,3 +13,10 @@ sed "s/GRAVITINO_HOST_IP:GRAVITINO_HOST_PORT/${GRAVITINO_HOST_IP}:${GRAVITINO_HO # Update `gravitino.metalake = GRAVITINO_METALAKE_NAME` in the `conf/catalog/gravitino.properties` sed "s/GRAVITINO_METALAKE_NAME/${GRAVITINO_METALAKE_NAME}/g" "${trino_conf_dir}/catalog/gravitino.properties.tmp" > "${trino_conf_dir}/catalog/gravitino.properties" rm "${trino_conf_dir}/catalog/gravitino.properties.tmp" + +# Check the number of Gravitino connector plugins present in the Trino plugin directory +num_of_gravitino_connector=$(ls /usr/lib/trino/plugin/gravitino | grep gravitino-trino-connector-* | wc -l) +if [[ "${num_of_gravitino_connector}" -ne 1 ]]; then + echo "Multiple versions of the Gravitino connector plugin found or none present." + exit 1 +fi \ No newline at end of file diff --git a/docs/docker-image-details.md b/docs/docker-image-details.md index 6c3436a5f8a..116d81f6ae4 100644 --- a/docs/docker-image-details.md +++ b/docs/docker-image-details.md @@ -139,6 +139,9 @@ You can use this image to test Trino. Changelog +- gravitino-ci-trino:0.1.5 + - Add check for the version of gravitino-trino-connector + - gravitino-ci-trino:0.1.4 - Change `-Xmx1G` to `-Xmx2G` in the config file `/etc/trino/jvm.config` diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 952469e33be..e4231462fb9 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -267,6 +267,26 @@ tasks.test { if (skipITs) { exclude("**/integration/test/**") } else { + // Get current project version + val version = project.version.toString() + println("Current project version: $version") + // Check whether this module has already built + val buildDir = project.buildDir + if (!buildDir.exists()) { + dependsOn(":trino-connector:jar") + } else { + // Check the version gravitino related jars in build equal to the current project version + val gravitinoJars = buildDir.resolve("libs").listFiles { _, name -> name.startsWith("gravitino") }?.filter { + val jarVersion = name.substringAfterLast("-").substringBeforeLast(".") + jarVersion != version + } + + if (gravitinoJars != null && gravitinoJars.isNotEmpty()) { + delete(project(":trino-connector").buildDir) + dependsOn(":trino-connector:jar") + } + } + doFirst { printDockerCheckInfo() @@ -297,7 +317,7 @@ tasks.test { // Gravitino CI Docker image environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.8") - environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "datastrato/gravitino-ci-trino:0.1.3") + environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "datastrato/gravitino-ci-trino:0.1.5") // Change poll image pause time from 30s to 60s environment("TESTCONTAINERS_PULL_PAUSE_TIMEOUT", "60") diff --git a/integration-test/trino-it/docker-compose.yaml b/integration-test/trino-it/docker-compose.yaml index 19fc71ca64a..ed783265b12 100644 --- a/integration-test/trino-it/docker-compose.yaml +++ b/integration-test/trino-it/docker-compose.yaml @@ -64,7 +64,7 @@ services: retries: 5 trino: - image: datastrato/gravitino-ci-trino:0.1.4 + image: datastrato/gravitino-ci-trino:0.1.5 networks: - trino-net container_name: trino-ci-trino From 2a03fd7f60f05cd434dbf9ceb6b2a6b40fbcba89 Mon Sep 17 00:00:00 2001 From: xloya <982052490@qq.com> Date: Mon, 26 Feb 2024 19:59:51 +0800 Subject: [PATCH 3/4] [#2080] feat(core): Add JDBC backend operations for metalake (#1980) ### What changes were proposed in this pull request? The purpose of this PR is to implement JDBC backend operation Metalake metadata which use MySQL as the database. Depend on #1930. Metadata operations of Catalog, Schema, Table and Fileset will be supported in the remaining PRs. ### Why are the changes needed? Fix: #2080 ### How was this patch tested? Add unit tests to test the metalake metadata ops. --------- Co-authored-by: xiaojiebao --- LICENSE.bin | 1 + .../datastrato/gravitino/json/JsonUtils.java | 29 ++ conf/gravitino.conf.template | 3 +- core/build.gradle.kts | 3 + .../com/datastrato/gravitino/Configs.java | 40 ++- .../UnsupportedEntityTypeException.java | 37 +++ .../storage/relational/JDBCBackend.java | 112 +++++++ .../storage/relational/RelationalBackend.java | 3 +- .../relational/RelationalEntityStore.java | 9 +- .../relational/mapper/MetalakeMetaMapper.java | 127 ++++++++ .../relational/mysql/MySQLBackend.java | 69 ---- .../storage/relational/po/MetalakePO.java | 147 +++++++++ .../service/MetalakeMetaService.java | 133 ++++++++ .../session/SqlSessionFactoryHelper.java | 110 +++++++ .../relational/session/SqlSessions.java | 99 ++++++ .../relational/utils/POConverters.java | 128 ++++++++ .../relational/utils/SessionUtils.java | 123 +++++++ core/src/main/resources/mysql/mysql_init.sql | 18 + .../relational/TestRelationalEntityStore.java | 308 +++++++++++++++--- .../relational/session/TestSqlSession.java | 132 ++++++++ .../relational/utils/TestPOConverters.java | 144 ++++++++ core/src/test/resources/h2/h2-init.sql | 18 + docs/gravitino-server-config.md | 21 +- gradle/libs.versions.toml | 4 + 24 files changed, 1678 insertions(+), 140 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/UnsupportedEntityTypeException.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java delete mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/mysql/MySQLBackend.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java create mode 100644 core/src/main/resources/mysql/mysql_init.sql create mode 100644 core/src/test/java/com/datastrato/gravitino/storage/relational/session/TestSqlSession.java create mode 100644 core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java create mode 100644 core/src/test/resources/h2/h2-init.sql diff --git a/LICENSE.bin b/LICENSE.bin index 806a7d64b2f..3456cec1422 100644 --- a/LICENSE.bin +++ b/LICENSE.bin @@ -329,6 +329,7 @@ J2ObjC SQLite JDBC Driver Immutables + MyBatis This product bundles various third-party components also under the Apache Software Foundation License 1.1 diff --git a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java index 8a8f9752bf3..0c2fb6b75a7 100644 --- a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java +++ b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java @@ -39,6 +39,8 @@ import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.types.Type; import com.datastrato.gravitino.rel.types.Types; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; @@ -240,6 +242,33 @@ public static ObjectMapper objectMapper() { return ObjectMapperHolder.INSTANCE; } + /** + * AnyFieldMapperHolder is a static inner class that holds the instance of ObjectMapper which can + * access any field of the object. This class utilizes the Initialization-on-demand holder idiom, + * which is a lazy-loaded singleton. This idiom takes advantage of the fact that inner classes are + * not loaded until they are referenced. It's a thread-safe and efficient way to implement a + * singleton as the instance is created when it's needed at the first time. + */ + private static class AnyFieldMapperHolder { + private static final ObjectMapper INSTANCE = + JsonMapper.builder() + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true) + .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS) + .build() + .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) + .registerModule(new JavaTimeModule()); + } + + /** + * Get the shared AnyFieldMapper instance for JSON serialization/deserialization. + * + * @return The ObjectMapper instance. + */ + public static ObjectMapper anyFieldMapper() { + return AnyFieldMapperHolder.INSTANCE; + } + /** * Get a list of strings from a JSON node property. * diff --git a/conf/gravitino.conf.template b/conf/gravitino.conf.template index 3cd1192b696..5a6cf006380 100644 --- a/conf/gravitino.conf.template +++ b/conf/gravitino.conf.template @@ -27,8 +27,9 @@ gravitino.server.webserver.requestHeaderSize = 131072 gravitino.server.webserver.responseHeaderSize = 131072 # THE CONFIGURATION FOR Gravitino ENTITY STORE -# The entity store to use +# The entity store to use, kv or relational gravitino.entity.store = kv + # The RocksDB entity store gravitino.entity.store.kv = RocksDBKvBackend # The storage path for RocksDB storage implementation, it supports both absolute and relative path, diff --git a/core/build.gradle.kts b/core/build.gradle.kts index ed3cc93eef1..6f6fb3b7266 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -16,9 +16,11 @@ dependencies { implementation(libs.bundles.metrics) implementation(libs.bundles.prometheus) implementation(libs.caffeine) + implementation(libs.commons.dbcp2) implementation(libs.commons.io) implementation(libs.commons.lang3) implementation(libs.guava) + implementation(libs.mybatis) implementation(libs.protobuf.java.util) { exclude("com.google.guava", "guava") .because("Brings in Guava for Android, which we don't want (and breaks multimaps).") @@ -31,6 +33,7 @@ dependencies { testAnnotationProcessor(libs.lombok) testCompileOnly(libs.lombok) + testImplementation(libs.h2db) testImplementation(libs.junit.jupiter.api) testImplementation(libs.junit.jupiter.params) testImplementation(libs.mockito.core) diff --git a/core/src/main/java/com/datastrato/gravitino/Configs.java b/core/src/main/java/com/datastrato/gravitino/Configs.java index 4cc276c6b11..384f47628bf 100644 --- a/core/src/main/java/com/datastrato/gravitino/Configs.java +++ b/core/src/main/java/com/datastrato/gravitino/Configs.java @@ -19,9 +19,15 @@ public interface Configs { String DEFAULT_ENTITY_KV_STORE = "RocksDBKvBackend"; String ENTITY_KV_STORE_KEY = "gravitino.entity.store.kv"; - String DEFAULT_ENTITY_RELATIONAL_STORE = "MySQLBackend"; + String DEFAULT_ENTITY_RELATIONAL_STORE = "JDBCBackend"; String ENTITY_RELATIONAL_STORE_KEY = "gravitino.entity.store.relational"; + String ENTITY_RELATIONAL_JDBC_BACKEND_URL_KEY = "gravitino.entity.store.relational.jdbcUrl"; + String ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER_KEY = "gravitino.entity.store.relational.jdbcDriver"; + String ENTITY_RELATIONAL_JDBC_BACKEND_USER_KEY = "gravitino.entity.store.relational.jdbcUser"; + String ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD_KEY = + "gravitino.entity.store.relational.jdbcPassword"; + String ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY = "gravitino.entity.store.kv.rocksdbPath"; Long DEFAULT_KV_DELETE_AFTER_TIME = 604800000L; // 7 days @@ -59,6 +65,38 @@ public interface Configs { .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) .createWithDefault(DEFAULT_ENTITY_RELATIONAL_STORE); + ConfigEntry ENTITY_RELATIONAL_JDBC_BACKEND_URL = + new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_URL_KEY) + .doc("Connection URL of `JDBCBackend`") + .version(ConfigConstants.VERSION_0_5_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + ConfigEntry ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER = + new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER_KEY) + .doc("Driver Name of `JDBCBackend`") + .version(ConfigConstants.VERSION_0_5_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + ConfigEntry ENTITY_RELATIONAL_JDBC_BACKEND_USER = + new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_USER_KEY) + .doc("Username of `JDBCBackend`") + .version(ConfigConstants.VERSION_0_5_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + ConfigEntry ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD = + new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD_KEY) + .doc("Password of `JDBCBackend`") + .version(ConfigConstants.VERSION_0_5_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + ConfigEntry ENTRY_KV_ROCKSDB_BACKEND_PATH = new ConfigBuilder(ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY) .doc( diff --git a/core/src/main/java/com/datastrato/gravitino/UnsupportedEntityTypeException.java b/core/src/main/java/com/datastrato/gravitino/UnsupportedEntityTypeException.java new file mode 100644 index 00000000000..8f0eb31fef6 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/UnsupportedEntityTypeException.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino; + +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import com.google.errorprone.annotations.FormatMethod; +import com.google.errorprone.annotations.FormatString; + +/** An exception thrown when an entity type is not supported for operations. */ +public class UnsupportedEntityTypeException extends GravitinoRuntimeException { + + /** + * Constructs a new exception with the specified detail message. + * + * @param message the detail message. + * @param args the arguments to the message. + */ + @FormatMethod + public UnsupportedEntityTypeException(@FormatString String message, Object... args) { + super(message, args); + } + + /** + * Constructs a new exception with the specified detail message and cause. + * + * @param cause the cause. + * @param message the detail message. + * @param args the arguments to the message. + */ + @FormatMethod + public UnsupportedEntityTypeException( + Throwable cause, @FormatString String message, Object... args) { + super(cause, message, args); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java new file mode 100644 index 00000000000..d7825445bcd --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -0,0 +1,112 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.UnsupportedEntityTypeException; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; +import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import java.io.IOException; +import java.util.List; +import java.util.function.Function; + +/** + * {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} interface. You can use + * a database that supports the JDBC protocol as storage. If the specified database has special SQL + * syntax, please implement the SQL statements and methods in MyBatis Mapper separately and switch + * according to the {@link Configs#ENTITY_RELATIONAL_JDBC_BACKEND_URL_KEY} parameter. + */ +public class JDBCBackend implements RelationalBackend { + + /** Initialize the jdbc backend instance. */ + @Override + public void initialize(Config config) { + SqlSessionFactoryHelper.getInstance().init(config); + } + + @Override + public List list( + Namespace namespace, Entity.EntityType entityType) { + switch (entityType) { + case METALAKE: + return (List) MetalakeMetaService.getInstance().listMetalakes(); + default: + throw new UnsupportedEntityTypeException( + "Unsupported entity type: %s for list operation", entityType); + } + } + + @Override + public boolean exists(NameIdentifier ident, Entity.EntityType entityType) { + try { + Entity entity = get(ident, entityType); + return entity != null; + } catch (NoSuchEntityException ne) { + return false; + } + } + + @Override + public void insert(E e, boolean overwritten) + throws EntityAlreadyExistsException { + if (e instanceof BaseMetalake) { + MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten); + } else { + throw new UnsupportedEntityTypeException( + "Unsupported entity type: %s for insert operation", e.getClass()); + } + } + + @Override + public E update( + NameIdentifier ident, Entity.EntityType entityType, Function updater) + throws IOException, NoSuchEntityException, AlreadyExistsException { + switch (entityType) { + case METALAKE: + return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater); + default: + throw new UnsupportedEntityTypeException( + "Unsupported entity type: %s for update operation", entityType); + } + } + + @Override + public E get( + NameIdentifier ident, Entity.EntityType entityType) throws NoSuchEntityException { + switch (entityType) { + case METALAKE: + return (E) MetalakeMetaService.getInstance().getMetalakeByIdent(ident); + default: + throw new UnsupportedEntityTypeException( + "Unsupported entity type: %s for get operation", entityType); + } + } + + @Override + public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) { + switch (entityType) { + case METALAKE: + return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade); + default: + throw new UnsupportedEntityTypeException( + "Unsupported entity type: %s for delete operation", entityType); + } + } + + @Override + public void close() throws IOException { + SqlSessionFactoryHelper.getInstance().close(); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java index 8969db9f912..ea9506ed147 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java @@ -65,10 +65,11 @@ void insert(E e, boolean overwritten) * @param entityType The type of the entity. * @return The entity after updating. * @throws NoSuchEntityException If the entity is not exist. + * @throws IOException If the entity is failed to update. */ E update( NameIdentifier ident, Entity.EntityType entityType, Function updater) - throws NoSuchEntityException; + throws IOException, NoSuchEntityException; /** * Retrieves the entity associated with the identifier and the entity type. diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java index f97cf4c52f8..8427e7484af 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java @@ -17,7 +17,6 @@ import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.exceptions.AlreadyExistsException; import com.datastrato.gravitino.exceptions.NoSuchEntityException; -import com.datastrato.gravitino.storage.relational.mysql.MySQLBackend; import com.datastrato.gravitino.utils.Executable; import com.google.common.collect.ImmutableMap; import java.io.IOException; @@ -35,7 +34,7 @@ public class RelationalEntityStore implements EntityStore { private static final Logger LOGGER = LoggerFactory.getLogger(RelationalEntityStore.class); public static final ImmutableMap RELATIONAL_BACKENDS = ImmutableMap.of( - Configs.DEFAULT_ENTITY_RELATIONAL_STORE, MySQLBackend.class.getCanonicalName()); + Configs.DEFAULT_ENTITY_RELATIONAL_STORE, JDBCBackend.class.getCanonicalName()); private RelationalBackend backend; @Override @@ -94,11 +93,7 @@ public E update( public E get( NameIdentifier ident, Entity.EntityType entityType, Class e) throws NoSuchEntityException, IOException { - E entity = backend.get(ident, entityType); - if (entity == null) { - throw new NoSuchEntityException("No such entity:%s", ident.toString()); - } - return entity; + return backend.get(ident, entityType); } @Override diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java new file mode 100644 index 00000000000..b731b718442 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -0,0 +1,127 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.mapper; + +import com.datastrato.gravitino.storage.relational.po.MetalakePO; +import java.util.List; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +/** + * A MyBatis Mapper for metalake meta operation SQLs. + * + *

This interface class is a specification defined by MyBatis. It requires this interface class + * to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or + * write SQLs with annotations in this interface Mapper. See: + */ +public interface MetalakeMetaMapper { + String TABLE_NAME = "metalake_meta"; + + @Select( + "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE deleted_at = 0") + List listMetalakePOs(); + + @Select( + "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_name = #{metalakeName} and deleted_at = 0") + MetalakePO selectMetalakeMetaByName(@Param("metalakeName") String name); + + @Select( + "SELECT metalake_id FROM " + + TABLE_NAME + + " WHERE metalake_name = #{metalakeName} and deleted_at = 0") + Long selectMetalakeIdMetaByName(@Param("metalakeName") String name); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," + + " schema_version, current_version, last_version, deleted_at)" + + " VALUES(" + + " #{metalakeMeta.metalakeId}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}," + + " #{metalakeMeta.currentVersion}," + + " #{metalakeMeta.lastVersion}," + + " #{metalakeMeta.deletedAt}" + + " )") + void insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," + + " schema_version, current_version, last_version, deleted_at)" + + " VALUES(" + + " #{metalakeMeta.metalakeId}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}," + + " #{metalakeMeta.currentVersion}," + + " #{metalakeMeta.lastVersion}," + + " #{metalakeMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " metalake_name = #{metalakeMeta.metalakeName}," + + " metalake_comment = #{metalakeMeta.metalakeComment}," + + " properties = #{metalakeMeta.properties}," + + " audit_info = #{metalakeMeta.auditInfo}," + + " schema_version = #{metalakeMeta.schemaVersion}," + + " current_version = #{metalakeMeta.currentVersion}," + + " last_version = #{metalakeMeta.lastVersion}," + + " deleted_at = #{metalakeMeta.deletedAt}") + void insertMetalakeMetaOnDuplicateKeyUpdate(@Param("metalakeMeta") MetalakePO metalakePO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET metalake_name = #{newMetalakeMeta.metalakeName}," + + " metalake_comment = #{newMetalakeMeta.metalakeComment}," + + " properties = #{newMetalakeMeta.properties}," + + " audit_info = #{newMetalakeMeta.auditInfo}," + + " schema_version = #{newMetalakeMeta.schemaVersion}," + + " current_version = #{newMetalakeMeta.currentVersion}," + + " last_version = #{newMetalakeMeta.lastVersion}" + + " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}" + + " and metalake_name = #{oldMetalakeMeta.metalakeName}" + + " and metalake_comment = #{oldMetalakeMeta.metalakeComment}" + + " and properties = #{oldMetalakeMeta.properties}" + + " and audit_info = #{oldMetalakeMeta.auditInfo}" + + " and schema_version = #{oldMetalakeMeta.schemaVersion}" + + " and current_version = #{oldMetalakeMeta.currentVersion}" + + " and last_version = #{oldMetalakeMeta.lastVersion}" + + " and deleted_at = 0") + Integer updateMetalakeMeta( + @Param("newMetalakeMeta") MetalakePO newMetalakePO, + @Param("oldMetalakeMeta") MetalakePO oldMetalakePO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP() WHERE metalake_id = #{metalakeId}") + Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId); +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mysql/MySQLBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mysql/MySQLBackend.java deleted file mode 100644 index 06aa430a16b..00000000000 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mysql/MySQLBackend.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ -package com.datastrato.gravitino.storage.relational.mysql; - -import com.datastrato.gravitino.Config; -import com.datastrato.gravitino.Entity; -import com.datastrato.gravitino.EntityAlreadyExistsException; -import com.datastrato.gravitino.HasIdentifier; -import com.datastrato.gravitino.NameIdentifier; -import com.datastrato.gravitino.Namespace; -import com.datastrato.gravitino.exceptions.NoSuchEntityException; -import com.datastrato.gravitino.storage.relational.RelationalBackend; -import java.io.IOException; -import java.util.List; -import java.util.function.Function; - -/** - * {@link MySQLBackend} is a MySQL implementation of RelationalBackend interface. If we want to use - * another relational implementation, We can just implement {@link RelationalBackend} interface and - * use it in the Gravitino. - */ -public class MySQLBackend implements RelationalBackend { - - private static final String UNSUPPORTED_OPERATION_MSG = "Unsupported operation now."; - - /** Initialize the MySQL backend instance. */ - @Override - public void initialize(Config config) {} - - @Override - public List list( - Namespace namespace, Entity.EntityType entityType) throws NoSuchEntityException { - throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MSG); - } - - @Override - public boolean exists(NameIdentifier ident, Entity.EntityType entityType) { - throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MSG); - } - - @Override - public void insert(E e, boolean overwritten) - throws EntityAlreadyExistsException { - throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MSG); - } - - @Override - public E update( - NameIdentifier ident, Entity.EntityType entityType, Function updater) - throws NoSuchEntityException { - throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MSG); - } - - @Override - public E get( - NameIdentifier ident, Entity.EntityType entityType) throws IOException { - throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MSG); - } - - @Override - public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) { - throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MSG); - } - - @Override - public void close() throws IOException {} -} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java new file mode 100644 index 00000000000..61182f10fb2 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/MetalakePO.java @@ -0,0 +1,147 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.po; + +import com.google.common.base.Objects; + +public class MetalakePO { + private Long metalakeId; + private String metalakeName; + private String metalakeComment; + private String properties; + private String auditInfo; + private String schemaVersion; + private Long currentVersion; + private Long lastVersion; + private Long deletedAt; + + public Long getMetalakeId() { + return metalakeId; + } + + public String getMetalakeName() { + return metalakeName; + } + + public String getMetalakeComment() { + return metalakeComment; + } + + public String getProperties() { + return properties; + } + + public String getAuditInfo() { + return auditInfo; + } + + public String getSchemaVersion() { + return schemaVersion; + } + + public Long getCurrentVersion() { + return currentVersion; + } + + public Long getLastVersion() { + return lastVersion; + } + + public Long getDeletedAt() { + return deletedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MetalakePO)) { + return false; + } + MetalakePO that = (MetalakePO) o; + return Objects.equal(getMetalakeId(), that.getMetalakeId()) + && Objects.equal(getMetalakeName(), that.getMetalakeName()) + && Objects.equal(getMetalakeComment(), that.getMetalakeComment()) + && Objects.equal(getProperties(), that.getProperties()) + && Objects.equal(getAuditInfo(), that.getAuditInfo()) + && Objects.equal(getSchemaVersion(), that.getSchemaVersion()) + && Objects.equal(getCurrentVersion(), that.getCurrentVersion()) + && Objects.equal(getLastVersion(), that.getLastVersion()) + && Objects.equal(getDeletedAt(), that.getDeletedAt()); + } + + @Override + public int hashCode() { + return Objects.hashCode( + getMetalakeId(), + getMetalakeName(), + getMetalakeComment(), + getProperties(), + getAuditInfo(), + getSchemaVersion(), + getCurrentVersion(), + getLastVersion(), + getDeletedAt()); + } + + public static class Builder { + private final MetalakePO metalakePO; + + public Builder() { + metalakePO = new MetalakePO(); + } + + public MetalakePO.Builder withMetalakeId(Long id) { + metalakePO.metalakeId = id; + return this; + } + + public MetalakePO.Builder withMetalakeName(String name) { + metalakePO.metalakeName = name; + return this; + } + + public MetalakePO.Builder withMetalakeComment(String comment) { + metalakePO.metalakeComment = comment; + return this; + } + + public MetalakePO.Builder withProperties(String properties) { + metalakePO.properties = properties; + return this; + } + + public MetalakePO.Builder withAuditInfo(String auditInfo) { + metalakePO.auditInfo = auditInfo; + return this; + } + + public MetalakePO.Builder withSchemaVersion(String version) { + metalakePO.schemaVersion = version; + return this; + } + + public MetalakePO.Builder withCurrentVersion(Long currentVersion) { + metalakePO.currentVersion = currentVersion; + return this; + } + + public MetalakePO.Builder withLastVersion(Long lastVersion) { + metalakePO.lastVersion = lastVersion; + return this; + } + + public MetalakePO.Builder withDeletedAt(Long deletedAt) { + metalakePO.deletedAt = deletedAt; + return this; + } + + public MetalakePO build() { + return metalakePO; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java new file mode 100644 index 00000000000..9cb1418e444 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.service; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; +import com.datastrato.gravitino.storage.relational.po.MetalakePO; +import com.datastrato.gravitino.storage.relational.utils.POConverters; +import com.datastrato.gravitino.storage.relational.utils.SessionUtils; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +public class MetalakeMetaService { + private static final MetalakeMetaService INSTANCE = new MetalakeMetaService(); + + public static MetalakeMetaService getInstance() { + return INSTANCE; + } + + private MetalakeMetaService() {} + + public List listMetalakes() { + List metalakePOS = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, MetalakeMetaMapper::listMetalakePOs); + return POConverters.fromMetalakePOs(metalakePOS); + } + + public BaseMetalake getMetalakeByIdent(NameIdentifier ident) { + MetalakePO metalakePO = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name())); + if (metalakePO == null) { + throw new NoSuchEntityException("No such entity: %s", ident.toString()); + } + return POConverters.fromMetalakePO(metalakePO); + } + + public void insertMetalake(BaseMetalake baseMetalake, boolean overwrite) { + try { + SessionUtils.doWithCommit( + MetalakeMetaMapper.class, + mapper -> { + MetalakePO po = POConverters.initializeMetalakePOWithVersion(baseMetalake); + if (overwrite) { + mapper.insertMetalakeMetaOnDuplicateKeyUpdate(po); + } else { + mapper.insertMetalakeMeta(po); + } + }); + } catch (RuntimeException re) { + if (re.getCause() != null + && re.getCause().getCause() != null + && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { + // TODO We should make more fine-grained exception judgments + // Usually throwing `SQLIntegrityConstraintViolationException` means that + // SQL violates the constraints of `primary key` and `unique key`. + // We simply think that the entity already exists at this time. + throw new EntityAlreadyExistsException( + String.format( + "Metalake entity: %s already exists", baseMetalake.nameIdentifier().name())); + } + throw re; + } + } + + public BaseMetalake updateMetalake( + NameIdentifier ident, Function updater) throws IOException { + MetalakePO oldMetalakePO = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name())); + if (oldMetalakePO == null) { + throw new NoSuchEntityException("No such entity: %s", ident.toString()); + } + + BaseMetalake oldMetalakeEntity = POConverters.fromMetalakePO(oldMetalakePO); + BaseMetalake newMetalakeEntity = (BaseMetalake) updater.apply((E) oldMetalakeEntity); + Preconditions.checkArgument( + Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()), + "The updated metalake entity id: %s should be same with the metalake entity id before: %s", + newMetalakeEntity.id(), + oldMetalakeEntity.id()); + MetalakePO newMetalakePO = + POConverters.updateMetalakePOWithVersion(oldMetalakePO, newMetalakeEntity); + + Integer updateResult = + SessionUtils.doWithCommitAndFetchResult( + MetalakeMetaMapper.class, + mapper -> mapper.updateMetalakeMeta(newMetalakePO, oldMetalakePO)); + if (updateResult > 0) { + return newMetalakeEntity; + } else { + throw new IOException("Failed to update the entity: " + ident); + } + } + + public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { + Long metalakeId = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(ident.name())); + if (metalakeId != null) { + if (cascade) { + SessionUtils.doMultipleWithCommit( + () -> + SessionUtils.doWithoutCommit( + MetalakeMetaMapper.class, + mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId)), + () -> { + // TODO We will cascade delete the metadata of sub-resources under the metalake + }); + } else { + // TODO Check whether the sub-resources are empty. If the sub-resources are not empty, + // deletion is not allowed. + SessionUtils.doWithCommit( + MetalakeMetaMapper.class, + mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId)); + } + } + return true; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java new file mode 100644 index 00000000000..aaff7ce6130 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java @@ -0,0 +1,110 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.session; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; +import com.google.common.base.Preconditions; +import java.sql.SQLException; +import java.time.Duration; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.pool2.impl.BaseObjectPoolConfig; +import org.apache.ibatis.mapping.Environment; +import org.apache.ibatis.session.Configuration; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.SqlSessionFactoryBuilder; +import org.apache.ibatis.transaction.TransactionFactory; +import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; + +/** + * SqlSessionFactoryHelper maintains the MyBatis's {@link SqlSessionFactory} object, which is used + * to create the {@link org.apache.ibatis.session.SqlSession} object. It is a singleton class and + * should be initialized only once. + */ +public class SqlSessionFactoryHelper { + private static volatile SqlSessionFactory sqlSessionFactory; + private static final SqlSessionFactoryHelper INSTANCE = new SqlSessionFactoryHelper(); + + public static SqlSessionFactoryHelper getInstance() { + return INSTANCE; + } + + private SqlSessionFactoryHelper() {} + + /** + * Initialize the SqlSessionFactory object. + * + * @param config Config object to get the jdbc connection details from the config. + */ + @SuppressWarnings("deprecation") + public void init(Config config) { + // Initialize the data source + BasicDataSource dataSource = new BasicDataSource(); + dataSource.setUrl(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL)); + dataSource.setDriverClassName(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)); + dataSource.setUsername(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER)); + dataSource.setPassword(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)); + // Close the auto commit, so that we can control the transaction manual commit + dataSource.setDefaultAutoCommit(false); + dataSource.setMaxWaitMillis(1000L); + dataSource.setMaxTotal(20); + dataSource.setMaxIdle(5); + dataSource.setMinIdle(0); + dataSource.setLogAbandoned(true); + dataSource.setRemoveAbandonedOnBorrow(true); + dataSource.setRemoveAbandonedTimeout(60); + dataSource.setTimeBetweenEvictionRunsMillis(Duration.ofMillis(10 * 60 * 1000L).toMillis()); + dataSource.setTestOnBorrow(BaseObjectPoolConfig.DEFAULT_TEST_ON_BORROW); + dataSource.setTestWhileIdle(BaseObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE); + dataSource.setMinEvictableIdleTimeMillis(1000); + dataSource.setNumTestsPerEvictionRun(BaseObjectPoolConfig.DEFAULT_NUM_TESTS_PER_EVICTION_RUN); + dataSource.setTestOnReturn(BaseObjectPoolConfig.DEFAULT_TEST_ON_RETURN); + dataSource.setSoftMinEvictableIdleTimeMillis( + BaseObjectPoolConfig.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME.toMillis()); + dataSource.setLifo(BaseObjectPoolConfig.DEFAULT_LIFO); + + // Create the transaction factory and env + TransactionFactory transactionFactory = new JdbcTransactionFactory(); + Environment environment = new Environment("development", transactionFactory, dataSource); + + // Initialize the configuration + Configuration configuration = new Configuration(environment); + configuration.addMapper(MetalakeMetaMapper.class); + + // Create the SqlSessionFactory object, it is a singleton object + if (sqlSessionFactory == null) { + synchronized (SqlSessionFactoryHelper.class) { + if (sqlSessionFactory == null) { + sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration); + } + } + } + } + + public SqlSessionFactory getSqlSessionFactory() { + Preconditions.checkState(sqlSessionFactory != null, "SqlSessionFactory is not initialized."); + return sqlSessionFactory; + } + + public void close() { + if (sqlSessionFactory != null) { + synchronized (SqlSessionFactoryHelper.class) { + if (sqlSessionFactory != null) { + try { + BasicDataSource dataSource = + (BasicDataSource) + sqlSessionFactory.getConfiguration().getEnvironment().getDataSource(); + dataSource.close(); + } catch (SQLException e) { + // silently ignore the error report + } + sqlSessionFactory = null; + } + } + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java new file mode 100644 index 00000000000..e9d9128003a --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.session; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.TransactionIsolationLevel; + +/** + * SqlSessions is a utility class to maintain the MyBatis's {@link SqlSession} object. It is a + * thread local class and should be used to get the {@link SqlSession} object. It also provides the + * methods to commit, rollback and close the {@link SqlSession} object. + */ +public final class SqlSessions { + private static final ThreadLocal sessions = new ThreadLocal<>(); + + private SqlSessions() {} + + @VisibleForTesting + static ThreadLocal getSessions() { + return sessions; + } + + /** + * Get the SqlSession object. If the SqlSession object is not present in the thread local, then + * create a new SqlSession object and set it in the thread local. + * + * @return SqlSession object from the thread local storage. + */ + public static SqlSession getSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession == null) { + sqlSession = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .openSession(TransactionIsolationLevel.READ_COMMITTED); + sessions.set(sqlSession); + return sqlSession; + } + return sqlSession; + } + + /** + * Commit the SqlSession object and close it. It also removes the SqlSession object from the + * thread local storage. + */ + public static void commitAndCloseSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession != null) { + try { + sqlSession.commit(); + sqlSession.close(); + } finally { + sessions.remove(); + } + } + } + + /** + * Rollback the SqlSession object and close it. It also removes the SqlSession object from the + * thread local storage. + */ + public static void rollbackAndCloseSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession != null) { + try { + sqlSession.rollback(); + sqlSession.close(); + } finally { + sessions.remove(); + } + } + } + + /** Close the SqlSession object and remove it from the thread local storage. */ + public static void closeSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession != null) { + try { + sqlSession.close(); + } finally { + sessions.remove(); + } + } + } + + /** + * Get the Mapper object from the SqlSession object. + * + * @param className the class name of the Mapper object. + * @return the Mapper object. + */ + public static T getMapper(Class className) { + return getSqlSession().getMapper(className); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java new file mode 100644 index 00000000000..0c03faa77ba --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java @@ -0,0 +1,128 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.utils; + +import com.datastrato.gravitino.json.JsonUtils; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relational.po.MetalakePO; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** POConverters is a utility class to convert PO to Base and vice versa. */ +public class POConverters { + + private POConverters() {} + + /** + * Convert {@link BaseMetalake} to {@link MetalakePO} + * + * @param baseMetalake BaseMetalake object + * @return MetalakePO object from BaseMetalake object + */ + @VisibleForTesting + static MetalakePO toMetalakePO(BaseMetalake baseMetalake) { + try { + return new MetalakePO.Builder() + .withMetalakeId(baseMetalake.id()) + .withMetalakeName(baseMetalake.name()) + .withMetalakeComment(baseMetalake.comment()) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(baseMetalake.properties())) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(baseMetalake.auditInfo())) + .withSchemaVersion( + JsonUtils.anyFieldMapper().writeValueAsString(baseMetalake.getVersion())) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } + } + + /** + * Initialize MetalakePO + * + * @param baseMetalake BaseMetalake object + * @return MetalakePO object with version initialized + */ + public static MetalakePO initializeMetalakePOWithVersion(BaseMetalake baseMetalake) { + MetalakePO metalakePO = toMetalakePO(baseMetalake); + return new MetalakePO.Builder() + .withMetalakeId(metalakePO.getMetalakeId()) + .withMetalakeName(metalakePO.getMetalakeName()) + .withMetalakeComment(metalakePO.getMetalakeComment()) + .withProperties(metalakePO.getProperties()) + .withAuditInfo(metalakePO.getAuditInfo()) + .withSchemaVersion(metalakePO.getSchemaVersion()) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) + .build(); + } + + /** + * Update MetalakePO version + * + * @param oldMetalakePO the old MetalakePO object + * @param newMetalake the new BaseMetalake object + * @return MetalakePO object with updated version + */ + public static MetalakePO updateMetalakePOWithVersion( + MetalakePO oldMetalakePO, BaseMetalake newMetalake) { + MetalakePO newMetalakePO = toMetalakePO(newMetalake); + Long lastVersion = oldMetalakePO.getLastVersion(); + // Will set the version to the last version + 1 when having some fields need be multiple version + Long nextVersion = lastVersion; + return new MetalakePO.Builder() + .withMetalakeId(newMetalakePO.getMetalakeId()) + .withMetalakeName(newMetalakePO.getMetalakeName()) + .withMetalakeComment(newMetalakePO.getMetalakeComment()) + .withProperties(newMetalakePO.getProperties()) + .withAuditInfo(newMetalakePO.getAuditInfo()) + .withSchemaVersion(newMetalakePO.getSchemaVersion()) + .withCurrentVersion(nextVersion) + .withLastVersion(nextVersion) + .withDeletedAt(0L) + .build(); + } + + /** + * Convert {@link MetalakePO} to {@link BaseMetalake} + * + * @param metalakePO MetalakePO object + * @return BaseMetalake object from MetalakePO object + */ + public static BaseMetalake fromMetalakePO(MetalakePO metalakePO) { + try { + return new BaseMetalake.Builder() + .withId(metalakePO.getMetalakeId()) + .withName(metalakePO.getMetalakeName()) + .withComment(metalakePO.getMetalakeComment()) + .withProperties( + JsonUtils.anyFieldMapper().readValue(metalakePO.getProperties(), Map.class)) + .withAuditInfo( + JsonUtils.anyFieldMapper().readValue(metalakePO.getAuditInfo(), AuditInfo.class)) + .withVersion( + JsonUtils.anyFieldMapper() + .readValue(metalakePO.getSchemaVersion(), SchemaVersion.class)) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to deserialize json object:", e); + } + } + + /** + * Convert list of {@link MetalakePO} to list of {@link BaseMetalake} + * + * @param metalakePOS list of MetalakePO objects + * @return list of BaseMetalake objects from list of MetalakePO objects + */ + public static List fromMetalakePOs(List metalakePOS) { + return metalakePOS.stream().map(POConverters::fromMetalakePO).collect(Collectors.toList()); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java new file mode 100644 index 00000000000..2d9009acfa8 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java @@ -0,0 +1,123 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.utils; + +import com.datastrato.gravitino.storage.relational.session.SqlSessions; +import java.util.Arrays; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.ibatis.session.SqlSession; + +/** + * This class provides utility methods to perform database operations with MyBatis mappers in the + * SqlSession. + */ +public class SessionUtils { + private SessionUtils() {} + + /** + * This method is used to perform a database operation with a commit. If the operation fails, the + * transaction will roll back. + * + * @param mapperClazz mapper class to be used for the operation + * @param consumer the operation to be performed with the mapper + * @param the type of the mapper + */ + public static void doWithCommit(Class mapperClazz, Consumer consumer) { + try (SqlSession session = SqlSessions.getSqlSession()) { + try { + T mapper = SqlSessions.getMapper(mapperClazz); + consumer.accept(mapper); + SqlSessions.commitAndCloseSqlSession(); + } catch (Throwable t) { + SqlSessions.rollbackAndCloseSqlSession(); + throw new RuntimeException(t); + } + } + } + + /** + * This method is used to perform a database operation with a commit and fetch the result. If the + * operation fails, the transaction will roll back. + * + * @param mapperClazz mapper class to be used for the operation + * @param func the operation to be performed with the mapper + * @return the result of the operation + * @param the type of the mapper + * @param the type of the result + */ + public static R doWithCommitAndFetchResult(Class mapperClazz, Function func) { + try (SqlSession session = SqlSessions.getSqlSession()) { + try { + T mapper = SqlSessions.getMapper(mapperClazz); + R result = func.apply(mapper); + SqlSessions.commitAndCloseSqlSession(); + return result; + } catch (Throwable t) { + SqlSessions.rollbackAndCloseSqlSession(); + throw new RuntimeException(t); + } + } + } + + /** + * This method is used to perform a database operation without a commit. If the operation fails, + * will throw the RuntimeException. + * + * @param mapperClazz mapper class to be used for the operation + * @param consumer the operation to be performed with the mapper + * @param the type of the mapper + */ + public static void doWithoutCommit(Class mapperClazz, Consumer consumer) { + try { + T mapper = SqlSessions.getMapper(mapperClazz); + consumer.accept(mapper); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + /** + * This method is used to perform a database operation without a commit and fetch the result. If + * the operation fails, will throw a RuntimeException. + * + * @param mapperClazz mapper class to be used for the operation + * @param func the operation to be performed with the mapper + * @return the result of the operation + * @param the type of the mapper + * @param the type of the result + */ + public static R getWithoutCommit(Class mapperClazz, Function func) { + try (SqlSession session = SqlSessions.getSqlSession()) { + try { + T mapper = SqlSessions.getMapper(mapperClazz); + return func.apply(mapper); + } catch (Throwable t) { + throw new RuntimeException(t); + } finally { + SqlSessions.closeSqlSession(); + } + } + } + + /** + * This method is used to perform multiple database operations with a commit. If any of the + * operations fail, the transaction will totally roll back. + * + * @param operations the operations to be performed + */ + public static void doMultipleWithCommit(Runnable... operations) { + try (SqlSession session = SqlSessions.getSqlSession()) { + try { + Arrays.stream(operations).forEach(Runnable::run); + SqlSessions.commitAndCloseSqlSession(); + } catch (Throwable t) { + SqlSessions.rollbackAndCloseSqlSession(); + throw new RuntimeException(t); + } + } + } +} diff --git a/core/src/main/resources/mysql/mysql_init.sql b/core/src/main/resources/mysql/mysql_init.sql new file mode 100644 index 00000000000..77f300810b6 --- /dev/null +++ b/core/src/main/resources/mysql/mysql_init.sql @@ -0,0 +1,18 @@ +-- +-- Copyright 2024 Datastrato Pvt Ltd. +-- This software is licensed under the Apache License version 2. +-- + +CREATE TABLE IF NOT EXISTS `metalake_meta` ( + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `metalake_name` VARCHAR(128) NOT NULL COMMENT 'metalake name', + `metalake_comment` VARCHAR(256) DEFAULT '' COMMENT 'metalake comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'metalake properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'metalake audit info', + `schema_version` MEDIUMTEXT NOT NULL COMMENT 'metalake schema version info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 NULL COMMENT 'metalake deleted at', + PRIMARY KEY (`metalake_id`), + UNIQUE KEY `uk_mn_del` (`metalake_name`, `deleted_at`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'metalake metadata'; \ No newline at end of file diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java index 039e3bf0f3a..e1703c360ff 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java @@ -6,99 +6,301 @@ package com.datastrato.gravitino.storage.relational; import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_STORE; +import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.datastrato.gravitino.Config; -import com.datastrato.gravitino.Configs; import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; import com.datastrato.gravitino.EntityStore; import com.datastrato.gravitino.EntityStoreFactory; -import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import java.io.BufferedReader; +import java.io.File; import java.io.IOException; -import java.util.function.Function; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import org.apache.ibatis.session.SqlSession; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestRelationalEntityStore { - private static EntityStore store; + private static final Logger Logger = LoggerFactory.getLogger(TestRelationalEntityStore.class); + private static final String MYSQL_STORE_PATH = + "/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + private static final String DB_DIR = MYSQL_STORE_PATH + "/testdb"; + private static EntityStore entityStore = null; @BeforeAll public static void setUp() { + File dir = new File(DB_DIR); + if (dir.exists() || !dir.isDirectory()) { + dir.delete(); + } + dir.mkdirs(); + + // Use H2 DATABASE to simulate MySQL Config config = Mockito.mock(Config.class); - Mockito.when(config.get(Configs.ENTITY_STORE)).thenReturn(Configs.RELATIONAL_ENTITY_STORE); - Mockito.when(config.get(Configs.ENTITY_RELATIONAL_STORE)) - .thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE); - store = EntityStoreFactory.createEntityStore(config); - store.initialize(config); - Assertions.assertTrue(store instanceof RelationalEntityStore); + Mockito.when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE); + Mockito.when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL)) + .thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", DB_DIR)); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("root"); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("123"); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); + entityStore = EntityStoreFactory.createEntityStore(config); + entityStore.initialize(config); + + // Read the ddl sql to create table + String scriptPath = "h2/h2-init.sql"; + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement()) { + URL scriptUrl = ClassLoader.getSystemResource(scriptPath); + if (scriptUrl == null) { + throw new IllegalStateException("Cannot find init sql script:" + scriptPath); + } + StringBuilder ddlBuilder = new StringBuilder(); + try (InputStreamReader inputStreamReader = + new InputStreamReader( + Files.newInputStream(Paths.get(scriptUrl.getPath())), StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { + String line; + while ((line = bufferedReader.readLine()) != null) { + ddlBuilder.append(line).append("\n"); + } + } + statement.execute(ddlBuilder.toString()); + } catch (Exception e) { + throw new IllegalStateException("Create tables failed", e); + } + } + + @AfterEach + public void destroy() { + truncateAllTables(); } @AfterAll - public static void teardown() throws IOException { - store.close(); - store = null; + public static void tearDown() { + dropAllTables(); + try { + entityStore.close(); + } catch (IOException e) { + Logger.error("Close the entity store failed:", e); + } + + File dir = new File(DB_DIR); + if (dir.exists()) { + dir.delete(); + } } @Test - public void testSetSerDe() { - Assertions.assertThrows(UnsupportedOperationException.class, () -> store.setSerDe(null)); + public void testPutAndGet() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + BaseMetalake insertedMetalake = + entityStore.get(metalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); + assertNotNull(insertedMetalake); + assertTrue(checkMetalakeEquals(metalake, insertedMetalake)); + + // overwrite false + BaseMetalake duplicateMetalake = createMetalake(1L, "test_metalake", "this is test"); + assertThrows( + EntityAlreadyExistsException.class, () -> entityStore.put(duplicateMetalake, false)); + + // overwrite true + BaseMetalake overittenMetalake = createMetalake(1L, "test_metalake2", "this is test2"); + entityStore.put(overittenMetalake, true); + BaseMetalake insertedMetalake1 = + entityStore.get( + overittenMetalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); + assertEquals( + 1, + entityStore.list(Namespace.empty(), BaseMetalake.class, Entity.EntityType.METALAKE).size()); + assertEquals("test_metalake2", insertedMetalake1.name()); + assertEquals("this is test2", insertedMetalake1.comment()); } @Test - public void testExecuteInTransaction() { - Assertions.assertThrows( - UnsupportedOperationException.class, () -> store.executeInTransaction(null)); + public void testPutAndList() throws IOException { + BaseMetalake metalake1 = createMetalake(1L, "test_metalake1", "this is test 1"); + BaseMetalake metalake2 = createMetalake(2L, "test_metalake2", "this is test 2"); + List beforePutList = + entityStore.list(metalake1.namespace(), BaseMetalake.class, Entity.EntityType.METALAKE); + assertNotNull(beforePutList); + assertEquals(0, beforePutList.size()); + + entityStore.put(metalake1, false); + entityStore.put(metalake2, false); + List metalakes = + entityStore.list(metalake1.namespace(), BaseMetalake.class, Entity.EntityType.METALAKE); + assertNotNull(metalakes); + assertEquals(2, metalakes.size()); + assertTrue(checkMetalakeEquals(metalake1, metalakes.get(0))); + assertTrue(checkMetalakeEquals(metalake2, metalakes.get(1))); } @Test - public void testExists() throws IOException { - NameIdentifier nameIdentifier = Mockito.mock(NameIdentifier.class); - Assertions.assertThrows( - UnsupportedOperationException.class, - () -> store.exists(nameIdentifier, Entity.EntityType.METALAKE)); + public void testPutAndDelete() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + entityStore.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, false); + assertThrows( + NoSuchEntityException.class, + () -> + entityStore.get( + metalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class)); } @Test - public void testPut() throws IOException { - BaseMetalake metalake = Mockito.mock(BaseMetalake.class); - Assertions.assertThrows(UnsupportedOperationException.class, () -> store.put(metalake, false)); + public void testPutAndUpdate() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + assertThrows( + RuntimeException.class, + () -> + entityStore.update( + metalake.nameIdentifier(), + BaseMetalake.class, + Entity.EntityType.METALAKE, + m -> { + BaseMetalake.Builder builder = + new BaseMetalake.Builder() + // Change the id, which is not allowed + .withId(2L) + .withName("test_metalake2") + .withComment("this is test 2") + .withProperties(new HashMap<>()) + .withAuditInfo((AuditInfo) m.auditInfo()) + .withVersion(m.getVersion()); + return builder.build(); + })); + + AuditInfo changedAuditInfo = + AuditInfo.builder().withCreator("changed_creator").withCreateTime(Instant.now()).build(); + BaseMetalake updatedMetalake = + entityStore.update( + metalake.nameIdentifier(), + BaseMetalake.class, + Entity.EntityType.METALAKE, + m -> { + BaseMetalake.Builder builder = + new BaseMetalake.Builder() + .withId(m.id()) + .withName("test_metalake2") + .withComment("this is test 2") + .withProperties(new HashMap<>()) + .withAuditInfo(changedAuditInfo) + .withVersion(m.getVersion()); + return builder.build(); + }); + BaseMetalake storedMetalake = + entityStore.get( + updatedMetalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); + assertEquals(metalake.id(), storedMetalake.id()); + assertEquals("test_metalake2", updatedMetalake.name()); + assertEquals("this is test 2", updatedMetalake.comment()); + assertEquals(changedAuditInfo.creator(), updatedMetalake.auditInfo().creator()); } - @Test - public void testGet() { - NameIdentifier nameIdentifier = Mockito.mock(NameIdentifier.class); - Assertions.assertThrows( - UnsupportedOperationException.class, - () -> store.get(nameIdentifier, Entity.EntityType.METALAKE, BaseMetalake.class)); + private static BaseMetalake createMetalake(Long id, String name, String comment) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + return new BaseMetalake.Builder() + .withId(id) + .withName(name) + .withComment(comment) + .withProperties(new HashMap<>()) + .withAuditInfo(auditInfo) + .withVersion(SchemaVersion.V_0_1) + .build(); } - @Test - public void testUpdate() { - NameIdentifier nameIdentifier = Mockito.mock(NameIdentifier.class); - Function function = Mockito.mock(Function.class); - Assertions.assertThrows( - UnsupportedOperationException.class, - () -> - store.update(nameIdentifier, BaseMetalake.class, Entity.EntityType.METALAKE, function)); + private static boolean checkMetalakeEquals(BaseMetalake expected, BaseMetalake actual) { + return expected.id().equals(actual.id()) + && expected.name().equals(actual.name()) + && expected.comment().equals(actual.comment()) + && expected.properties().equals(actual.properties()) + && expected.auditInfo().equals(actual.auditInfo()) + && expected.getVersion().equals(actual.getVersion()); } - @Test - public void testList() { - Namespace namespace = Mockito.mock(Namespace.class); - Assertions.assertThrows( - UnsupportedOperationException.class, - () -> store.list(namespace, BaseMetalake.class, Entity.EntityType.METALAKE)); + private static void truncateAllTables() { + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { + try (Connection connection = sqlSession.getConnection()) { + try (Statement statement = connection.createStatement()) { + String query = "SHOW TABLES"; + List tableList = new ArrayList<>(); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + tableList.add(rs.getString(1)); + } + } + for (String table : tableList) { + statement.execute("TRUNCATE TABLE " + table); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Clear table failed", e); + } } - @Test - public void testDelete() { - NameIdentifier nameIdentifier = Mockito.mock(NameIdentifier.class); - Assertions.assertThrows( - UnsupportedOperationException.class, - () -> store.delete(nameIdentifier, Entity.EntityType.METALAKE, false)); + private static void dropAllTables() { + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { + try (Connection connection = sqlSession.getConnection()) { + try (Statement statement = connection.createStatement()) { + String query = "SHOW TABLES"; + List tableList = new ArrayList<>(); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + tableList.add(rs.getString(1)); + } + } + for (String table : tableList) { + statement.execute("DROP TABLE " + table); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Drop table failed", e); + } } } diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/session/TestSqlSession.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/session/TestSqlSession.java new file mode 100644 index 00000000000..29df8d5afd3 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/session/TestSqlSession.java @@ -0,0 +1,132 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.session; + +import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_STORE; +import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.datastrato.gravitino.Config; +import java.io.File; +import java.io.IOException; +import java.sql.SQLException; +import java.util.UUID; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.ibatis.session.SqlSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestSqlSession { + private static final String MYSQL_STORE_PATH = + "/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + private static final String DB_DIR = MYSQL_STORE_PATH + "/testdb"; + + private static Config config; + + @BeforeAll + public static void setUp() { + File dir = new File(DB_DIR); + if (dir.exists() || !dir.isDirectory()) { + dir.delete(); + } + dir.mkdirs(); + + config = Mockito.mock(Config.class); + Mockito.when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE); + Mockito.when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL)) + .thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", DB_DIR)); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("root"); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("123"); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); + } + + @BeforeEach + public void init() { + SqlSessionFactoryHelper.getInstance().init(config); + } + + @AfterEach + public void cleanUp() { + SqlSessionFactoryHelper.getInstance().close(); + } + + @AfterAll + public static void tearDown() throws IOException { + File dir = new File(DB_DIR); + if (dir.exists()) { + dir.delete(); + } + SqlSessionFactoryHelper.getInstance().close(); + } + + @Test + public void testGetInstance() { + SqlSessionFactoryHelper instance = SqlSessionFactoryHelper.getInstance(); + assertNotNull(instance); + } + + @Test + public void testInit() throws SQLException { + SqlSessionFactoryHelper.getInstance().close(); + SqlSessionFactoryHelper.getInstance().init(config); + assertNotNull(SqlSessionFactoryHelper.getInstance().getSqlSessionFactory()); + BasicDataSource dataSource = + (BasicDataSource) + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getEnvironment() + .getDataSource(); + assertEquals("org.h2.Driver", dataSource.getDriverClassName()); + assertEquals(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL), dataSource.getUrl()); + } + + @Test + public void testGetSqlSessionFactoryWithoutInit() { + SqlSessionFactoryHelper.getInstance().close(); + assertThrows( + IllegalStateException.class, + () -> SqlSessionFactoryHelper.getInstance().getSqlSessionFactory()); + } + + @Test + public void testOpenAndCloseSqlSession() { + SqlSession session = SqlSessions.getSqlSession(); + assertNotNull(session); + SqlSessions.closeSqlSession(); + assertNull(SqlSessions.getSessions().get()); + } + + @Test + public void testOpenAndCommitAndCloseSqlSession() { + SqlSession session = SqlSessions.getSqlSession(); + assertNotNull(session); + SqlSessions.commitAndCloseSqlSession(); + assertNull(SqlSessions.getSessions().get()); + } + + @Test + public void testOpenAndRollbackAndCloseSqlSession() { + SqlSession session = SqlSessions.getSqlSession(); + assertNotNull(session); + SqlSessions.rollbackAndCloseSqlSession(); + assertNull(SqlSessions.getSessions().get()); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java new file mode 100644 index 00000000000..285e756c578 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java @@ -0,0 +1,144 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.datastrato.gravitino.json.JsonUtils; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relational.po.MetalakePO; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class TestPOConverters { + private static final LocalDateTime FIX_DATE_TIME = LocalDateTime.of(2024, 2, 6, 0, 0, 0); + + private static final Instant FIX_INSTANT = FIX_DATE_TIME.toInstant(ZoneOffset.UTC); + + @Test + public void testFromMetalakePO() throws JsonProcessingException { + MetalakePO metalakePO = createMetalakePO(1L, "test", "this is test"); + + BaseMetalake expectedMetalake = createMetalake(1L, "test", "this is test"); + + BaseMetalake convertedMetalake = POConverters.fromMetalakePO(metalakePO); + + // Assert + assertEquals(expectedMetalake.id(), convertedMetalake.id()); + assertEquals(expectedMetalake.name(), convertedMetalake.name()); + assertEquals(expectedMetalake.comment(), convertedMetalake.comment()); + assertEquals( + expectedMetalake.properties().get("key"), convertedMetalake.properties().get("key")); + assertEquals(expectedMetalake.auditInfo().creator(), convertedMetalake.auditInfo().creator()); + assertEquals(expectedMetalake.getVersion(), convertedMetalake.getVersion()); + } + + @Test + public void testFromMetalakePOs() throws JsonProcessingException { + MetalakePO metalakePO1 = createMetalakePO(1L, "test", "this is test"); + MetalakePO metalakePO2 = createMetalakePO(2L, "test2", "this is test2"); + List metalakePOs = new ArrayList<>(Arrays.asList(metalakePO1, metalakePO2)); + List convertedMetalakes = POConverters.fromMetalakePOs(metalakePOs); + + BaseMetalake expectedMetalake1 = createMetalake(1L, "test", "this is test"); + BaseMetalake expectedMetalake2 = createMetalake(2L, "test2", "this is test2"); + List expectedMetalakes = + new ArrayList<>(Arrays.asList(expectedMetalake1, expectedMetalake2)); + + // Assert + int index = 0; + for (BaseMetalake metalake : convertedMetalakes) { + assertEquals(expectedMetalakes.get(index).id(), metalake.id()); + assertEquals(expectedMetalakes.get(index).name(), metalake.name()); + assertEquals(expectedMetalakes.get(index).comment(), metalake.comment()); + assertEquals( + expectedMetalakes.get(index).properties().get("key"), metalake.properties().get("key")); + assertEquals( + expectedMetalakes.get(index).auditInfo().creator(), metalake.auditInfo().creator()); + assertEquals(expectedMetalakes.get(index).getVersion(), metalake.getVersion()); + index++; + } + } + + @Test + public void testInitMetalakePOVersion() throws JsonProcessingException { + BaseMetalake metalakePO = createMetalake(1L, "test", "this is test"); + MetalakePO initPO = POConverters.initializeMetalakePOWithVersion(metalakePO); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + } + + @Test + public void testUpdateMetalakePOVersion() throws JsonProcessingException { + BaseMetalake metalake = createMetalake(1L, "test", "this is test"); + BaseMetalake updatedMetalake = createMetalake(1L, "test", "this is test2"); + MetalakePO initPO = POConverters.initializeMetalakePOWithVersion(metalake); + MetalakePO updatePO = POConverters.updateMetalakePOWithVersion(initPO, updatedMetalake); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + assertEquals("this is test2", updatePO.getMetalakeComment()); + } + + @Test + public void testToMetalakePO() throws JsonProcessingException { + BaseMetalake metalake = createMetalake(1L, "test", "this is test"); + + MetalakePO expectedMetalakePO = createMetalakePO(1L, "test", "this is test"); + + MetalakePO actualMetalakePO = POConverters.toMetalakePO(metalake); + + // Assert + assertEquals(expectedMetalakePO.getMetalakeId(), actualMetalakePO.getMetalakeId()); + assertEquals(expectedMetalakePO.getMetalakeName(), actualMetalakePO.getMetalakeName()); + assertEquals(expectedMetalakePO.getMetalakeComment(), actualMetalakePO.getMetalakeComment()); + assertEquals(expectedMetalakePO.getProperties(), actualMetalakePO.getProperties()); + assertEquals(expectedMetalakePO.getAuditInfo(), actualMetalakePO.getAuditInfo()); + assertEquals(expectedMetalakePO.getSchemaVersion(), actualMetalakePO.getSchemaVersion()); + } + + private static BaseMetalake createMetalake(Long id, String name, String comment) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + Map properties = new HashMap<>(); + properties.put("key", "value"); + return new BaseMetalake.Builder() + .withId(id) + .withName(name) + .withComment(comment) + .withProperties(properties) + .withAuditInfo(auditInfo) + .withVersion(SchemaVersion.V_0_1) + .build(); + } + + private static MetalakePO createMetalakePO(Long id, String name, String comment) + throws JsonProcessingException { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + Map properties = new HashMap<>(); + properties.put("key", "value"); + return new MetalakePO.Builder() + .withMetalakeId(id) + .withMetalakeName(name) + .withMetalakeComment(comment) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties)) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo)) + .withSchemaVersion(JsonUtils.anyFieldMapper().writeValueAsString(SchemaVersion.V_0_1)) + .build(); + } +} diff --git a/core/src/test/resources/h2/h2-init.sql b/core/src/test/resources/h2/h2-init.sql new file mode 100644 index 00000000000..74d00874577 --- /dev/null +++ b/core/src/test/resources/h2/h2-init.sql @@ -0,0 +1,18 @@ +-- +-- Copyright 2024 Datastrato Pvt Ltd. +-- This software is licensed under the Apache License version 2. +-- + +CREATE TABLE IF NOT EXISTS `metalake_meta` ( + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `metalake_name` VARCHAR(128) NOT NULL COMMENT 'metalake name', + `metalake_comment` VARCHAR(256) DEFAULT '' COMMENT 'metalake comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'metalake properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'metalake audit info', + `schema_version` MEDIUMTEXT NOT NULL COMMENT 'metalake schema version info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 NULL COMMENT 'metalake deleted at', + PRIMARY KEY (metalake_id), + CONSTRAINT uk_mn_del UNIQUE (metalake_name, deleted_at) +) ENGINE = InnoDB; \ No newline at end of file diff --git a/docs/gravitino-server-config.md b/docs/gravitino-server-config.md index 7a4072b8db3..f70259436cf 100644 --- a/docs/gravitino-server-config.md +++ b/docs/gravitino-server-config.md @@ -43,14 +43,19 @@ You can also specify filter parameters by setting configuration entries of the f ### Storage configuration -| Configuration item | Description | Default value | Required | Since version | -|---------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|----------|---------------| -| `gravitino.entity.store` | Which storage implementation to use. Key-value pair storage is currently supported, and the default value is `kv`. | `kv` | No | 0.1.0 | -| `gravitino.entity.store.kv` | Detailed implementation of KV storage. `RocksDB` storage is currently supported, and the implementation is `RocksDBKvBackend`. | `RocksDBKvBackend` | No | 0.1.0 | -| `gravitino.entity.store.kv.rocksdbPath` | The storage path for RocksDB storage implementation. It supports both absolute and relative path, if the value is a relative path, the final path is `${GRAVITINO_HOME}/${PATH_YOU_HAVA_SET}`, default value is `${GRAVITINO_HOME}/data/rocksdb` | `${GRAVITINO_HOME}/data/rocksdb` | No | 0.1.0 | -| `graivitino.entity.serde` | The serialization/deserialization class used to support entity storage. `proto' is currently supported. | `proto` | No | 0.1.0 | -| `gravitino.entity.store.maxTransactionSkewTimeMs` | The maximum skew time of transactions in milliseconds. | `2000` | No | 0.3.0 | -| `gravitino.entity.store.kv.deleteAfterTimeMs` | The maximum time in milliseconds that deleted and old-version data is kept. Set to at least 10 minutes and no longer than 30 days. | `604800000`(7 days) | No | 0.3.0 | +| Configuration item | Description | Default value | Required | Since version | +|---------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------|----------------------------------|---------------| +| `gravitino.entity.store` | Which storage implementation to use. Key-value pair storage and relational storage are currently supported, the default value is `kv`, and the optional value is `relational`. | `kv` | No | 0.1.0 | +| `gravitino.entity.store.kv` | Detailed implementation of KV storage. `RocksDB` storage is currently supported, and the implementation is `RocksDBKvBackend`. | `RocksDBKvBackend` | No | 0.1.0 | +| `gravitino.entity.store.kv.rocksdbPath` | The storage path for RocksDB storage implementation. It supports both absolute and relative path, if the value is a relative path, the final path is `${GRAVITINO_HOME}/${PATH_YOU_HAVA_SET}`, default value is `${GRAVITINO_HOME}/data/rocksdb` | `${GRAVITINO_HOME}/data/rocksdb` | No | 0.1.0 | +| `graivitino.entity.serde` | The serialization/deserialization class used to support entity storage. `proto' is currently supported. | `proto` | No | 0.1.0 | +| `gravitino.entity.store.maxTransactionSkewTimeMs` | The maximum skew time of transactions in milliseconds. | `2000` | No | 0.3.0 | +| `gravitino.entity.store.kv.deleteAfterTimeMs` | The maximum time in milliseconds that deleted and old-version data is kept. Set to at least 10 minutes and no longer than 30 days. | `604800000`(7 days) | No | 0.3.0 | +| `gravitino.entity.store.relational` | Detailed implementation of Relational storage. `MySQL` is currently supported, and the implementation is `JDBCBackend`. | `JDBCBackend` | No | 0.5.0 | +| `gravitino.entity.store.relational.jdbcUrl` | The database url that the `JDBCBackend` needs to connect to. If you use `MySQL`, you should firstly initialize the database tables yourself by executing the file named `mysql_init.sql` in the `src/main/resources/mysql` directory of the `core` module. | (none) | Yes if you use `JdbcBackend` | 0.5.0 | +| `gravitino.entity.store.relational.jdbcDriver` | The jdbc driver name that the `JDBCBackend` needs to use. You should place the driver Jar package in the `${GRAVITINO_HOME}/libs/` directory. | (none) | Yes if you use `JdbcBackend` | 0.5.0 | +| `gravitino.entity.store.relational.jdbcUser` | The username that the `JDBCBackend` needs to use when connecting the database. It is required for `MySQL`. | (none) | Yes if you use `JdbcBackend` | 0.5.0 | +| `gravitino.entity.store.relational.jdbcPassword` | The password that the `JDBCBackend` needs to use when connecting the database. It is required for `MySQL`. | (none) | Yes if you use `JdbcBackend` | 0.5.0 | :::caution We strongly recommend that you change the default value of `gravitino.entity.store.kv.rocksdbPath`, as it's under the deployment directory and future version upgrades may remove it. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b683328281f..24f8bf0dee8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -45,6 +45,8 @@ postgresql = "42.6.0" immutables-value = "2.10.0" selenium = "3.141.59" rauschig = "1.2.0" +mybatis = "3.5.6" +h2db = "1.4.200" protobuf-plugin = "0.9.2" spotless-plugin = '6.11.0' @@ -146,6 +148,8 @@ sun-activation = { group = "com.sun.activation", name = "javax.activation", vers selenium = { group = "org.seleniumhq.selenium", name = "selenium-java", version.ref = "selenium" } rauschig = { group = "org.rauschig", name = "jarchivelib", version.ref = "rauschig" } +mybatis = { group = "org.mybatis", name = "mybatis", version.ref = "mybatis"} +h2db = { group = "com.h2database", name = "h2", version.ref = "h2db"} [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"] From a0b53d6313d69f980c7f603fcbb3a89988e63972 Mon Sep 17 00:00:00 2001 From: Mohit Kambli <31406633+MohitKambli@users.noreply.github.com> Date: Mon, 26 Feb 2024 07:18:01 -0500 Subject: [PATCH 4/4] #1940 : [Improvement](common) MapUtils.java is missing a private constructor (#2345) ### What changes were proposed in this pull request? A private constructor has been added to the MapUtils.java file, which is located at 'common\src\main\java\com\datastrato\gravitino\utils'. ### Why are the changes needed? In order to follow efficient programming, these changes have been introduced. Fix: # ([1940](https://github.com/datastrato/gravitino/issues/1940)) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Now, for some reason, I am not able to run the `./gradlew :common:spotlessApply` command. I mean it opens a dialog box like this: ![image](https://github.com/datastrato/gravitino/assets/31406633/972fa4bf-5aed-4b4c-9144-4e8a69f44a2d) I am not sure what changes are required this time. When I try executing the command like `bash +x gradlew :common:spotlessApply`, I am getting the following errors: ![image](https://github.com/datastrato/gravitino/assets/31406633/828e0244-1ba9-402d-857b-eaacbdf3523a) Sincere apologies for all the trouble that's being caused till now. I don't want to cause any trouble but somehow I am ending up doing that. If at all there are any changes required then do let me know. Sincere apologies for any troubles caused. Thanks and Regards, Mohit Kambli --- .../src/main/java/com/datastrato/gravitino/utils/MapUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java b/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java index 82799ad6b49..2a235fe13e3 100644 --- a/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java +++ b/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java @@ -11,6 +11,7 @@ /** Utility class for working with maps. */ public class MapUtils { + private MapUtils() {} /** * Returns a map with all keys that start with the given prefix.