diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 63c5f1385ec..7b122c413a1 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -36,4 +36,5 @@ dependencies { testImplementation(libs.junit.jupiter.params) testRuntimeOnly(libs.junit.jupiter.engine) testImplementation(libs.mockito.core) + testImplementation(libs.h2db) } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java index 7f121d3b7dc..097b9c3ff82 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.function.Function; @@ -50,17 +51,18 @@ public List list( List metalakePOS = ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) .listMetalakePOs(); - return (List) - metalakePOS.stream() + return metalakePOS != null + ? metalakePOS.stream() .map( metalakePO -> { try { - return POConverters.fromMetalakePO(metalakePO); + return (E) POConverters.fromMetalakePO(metalakePO); } catch (JsonProcessingException e) { throw new RuntimeException(e); } }) - .collect(Collectors.toList()); + .collect(Collectors.toList()) + : new ArrayList<>(); case CATALOG: case SCHEMA: case TABLE: @@ -102,16 +104,24 @@ public void insert(E e, boolean overwritten) try (SqlSession session = SqlSessions.getSqlSession()) { try { if (e instanceof BaseMetalake) { - MetalakePO metalakePO = POConverters.toMetalakePO((BaseMetalake) e); + MetalakePO metalakePO = + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .selectMetalakeMetaByName(e.nameIdentifier().name()); + if (!overwritten && metalakePO != null) { + throw new EntityAlreadyExistsException( + String.format("Metalake entity: %s already exists", e.nameIdentifier().name())); + } + if (overwritten) { ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) - .insertMetalakeMetaWithUpdate(metalakePO); + .insertMetalakeMetaWithUpdate(POConverters.toMetalakePO((BaseMetalake) e)); } else { ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) - .insertMetalakeMeta(metalakePO); + .insertMetalakeMeta(POConverters.toMetalakePO((BaseMetalake) e)); } SqlSessions.commitAndCloseSqlSession(); } else { + SqlSessions.closeSqlSession(); throw new IllegalArgumentException( String.format("Unsupported entity type: %s for put operation", e.getClass())); } @@ -138,7 +148,7 @@ public E update( Preconditions.checkArgument( Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()), String.format( - "The updated metalake entity id: %s is not same with the metalake entity id before: %s", + "The updated metalake entity id: %s should same with the metalake entity id before: %s", newMetalakeEntity.id(), oldMetalakeEntity.id())); ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) .updateMetalakeMeta(POConverters.toMetalakePO(newMetalakeEntity), oldMetalakePO); @@ -169,6 +179,9 @@ public E get( MetalakePO metalakePO = ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) .selectMetalakeMetaByName(ident.name()); + if (metalakePO == null) { + return null; + } return (E) POConverters.fromMetalakePO(metalakePO); case CATALOG: case SCHEMA: @@ -197,7 +210,7 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) .deleteMetalakeMetaById(metalakeId); if (cascade) { - // TODO We will cascade delete the metadata of sub-resources under metalake + // TODO We will cascade delete the metadata of sub-resources under the metalake } SqlSessions.commitAndCloseSqlSession(); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/MetalakeMetaMapper.java index 5a34889481c..5f863f88328 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/MetalakeMetaMapper.java @@ -78,7 +78,7 @@ public interface MetalakeMetaMapper { + " audit_info = #{newMetalakeMeta.auditInfo}," + " schema_version = #{newMetalakeMeta.schemaVersion}" + " WHERE id = #{oldMetalakeMeta.id}" - + " and metalake_name = #{oldMetalakeMeta.metalakeComment}" + + " and metalake_name = #{oldMetalakeMeta.metalakeName}" + " and metalake_comment = #{oldMetalakeMeta.metalakeComment}" + " and properties = #{oldMetalakeMeta.properties}" + " and audit_info = #{oldMetalakeMeta.auditInfo}" diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessions.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessions.java index 13db581d41a..78f7d361baf 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessions.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessions.java @@ -5,12 +5,10 @@ package com.datastrato.gravitino.storage.relation.mysql.orm; -import java.io.Closeable; -import java.io.IOException; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.TransactionIsolationLevel; -public final class SqlSessions implements Closeable { +public final class SqlSessions { private static final ThreadLocal sessions = new ThreadLocal<>(); private SqlSessions() {} @@ -57,9 +55,4 @@ public static void closeSqlSession() { public static T getMapper(Class className) { return (T) getSqlSession().getMapper(className); } - - @Override - public void close() throws IOException { - sessions.remove(); - } } diff --git a/core/src/main/resources/mysql/mysql_init.sql b/core/src/main/resources/mysql/mysql_init.sql new file mode 100644 index 00000000000..54b28a6d7c8 --- /dev/null +++ b/core/src/main/resources/mysql/mysql_init.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS `metalake_meta` +( + `id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `metalake_name` VARCHAR(128) NOT NULL COMMENT 'metalake name', + `metalake_comment` VARCHAR(256) DEFAULT '' COMMENT 'metalake comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'metalake properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'metalake audit info', + `schema_version` TEXT NOT NULL COMMENT 'metalake schema version info', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_mn` (`metalake_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'metalake metadata'; \ No newline at end of file diff --git a/core/src/main/resources/mysql_init/mysql_backend_init.sql b/core/src/main/resources/mysql_init/mysql_backend_init.sql deleted file mode 100644 index 1e9c0760410..00000000000 --- a/core/src/main/resources/mysql_init/mysql_backend_init.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE DATABASE IF NOT EXISTS `gravitino_meta` DEFAULT CHARACTER SET utf8mb4; -USE `gravitino_meta`; -CREATE TABLE IF NOT EXISTS `metalake_meta` -( - `id` bigint(20) unsigned NOT NULL COMMENT 'metalake id', - `metalake_name` varchar(128) NOT NULL COMMENT 'metalake name', - `metalake_comment` varchar(256) DEFAULT '' COMMENT 'metalake comment', - `properties` mediumtext DEFAULT NULL COMMENT 'metalake properties', - `audit_info` mediumtext NOT NULL COMMENT 'metalake audit info', - `schema_version` text NOT NULL COMMENT 'metalake schema version info', - PRIMARY KEY (`id`), - UNIQUE KEY `uk_mn` (`metalake_name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'metalake metadata'; \ No newline at end of file diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relation/TestRelationEntityStore.java b/core/src/test/java/com/datastrato/gravitino/storage/relation/TestRelationEntityStore.java new file mode 100644 index 00000000000..71dd43ac9b9 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/storage/relation/TestRelationEntityStore.java @@ -0,0 +1,290 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation; + +import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_RELATION_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_RELATION_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_STORE; +import static com.datastrato.gravitino.Configs.MYSQL_ENTITY_STORE_DRIVER_NAME_KEY; +import static com.datastrato.gravitino.Configs.MYSQL_ENTITY_STORE_URL_KEY; +import static com.datastrato.gravitino.Configs.MYSQL_ENTITY_STORE_USERNAME_KEY; +import static com.datastrato.gravitino.Configs.RELATION_ENTITY_STORE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityStore; +import com.datastrato.gravitino.EntityStoreFactory; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessionFactoryHelper; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import org.apache.ibatis.session.SqlSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestRelationEntityStore { + private static final String MYSQL_STORE_PATH = + "/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + private static final String DB_DIR = MYSQL_STORE_PATH + "/testdb"; + private static EntityStore entityStore = null; + + @BeforeAll + public static void setUp() { + File dir = new File(DB_DIR); + if (dir.exists() || !dir.isDirectory()) { + dir.delete(); + } + dir.mkdirs(); + + // Use H2 DATABASE to simulate MySQL + Config config = Mockito.mock(Config.class); + Mockito.when(config.get(ENTITY_STORE)).thenReturn(RELATION_ENTITY_STORE); + Mockito.when(config.get(ENTITY_RELATION_STORE)).thenReturn(DEFAULT_ENTITY_RELATION_STORE); + Mockito.when(config.getRawString(MYSQL_ENTITY_STORE_URL_KEY)) + .thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", DB_DIR)); + Mockito.when(config.getRawString(MYSQL_ENTITY_STORE_USERNAME_KEY)).thenReturn("sa"); + Mockito.when(config.getRawString(MYSQL_ENTITY_STORE_DRIVER_NAME_KEY)) + .thenReturn("org.h2.Driver"); + entityStore = EntityStoreFactory.createEntityStore(config); + entityStore.initialize(config); + + // Read the ddl sql to create table + String scriptPath = "h2/h2-init.sql"; + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement()) { + URL scriptUrl = ClassLoader.getSystemResource(scriptPath); + if (scriptUrl == null) { + throw new IllegalStateException("Cannot find init sql script:" + scriptPath); + } + StringBuilder ddlBuilder = new StringBuilder(); + try (InputStreamReader inputStreamReader = + new InputStreamReader( + Files.newInputStream(Paths.get(scriptUrl.getPath())), StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { + String line; + while ((line = bufferedReader.readLine()) != null) { + ddlBuilder.append(line).append("\n"); + } + } + statement.execute(ddlBuilder.toString()); + } catch (Exception e) { + throw new IllegalStateException("Create tables failed", e); + } + } + + @AfterEach + public void destroy() { + truncateAllTables(); + } + + @AfterAll + public static void tearDown() throws IOException { + dropAllTables(); + entityStore.close(); + File dir = new File(DB_DIR); + if (dir.exists()) { + dir.delete(); + } + } + + @Test + public void testPutAndGet() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + BaseMetalake insertedMetalake = + entityStore.get(metalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); + assertNotNull(insertedMetalake); + assertTrue(checkMetalakeEquals(metalake, insertedMetalake)); + + // overwrite false + BaseMetalake duplicateMetalake = createMetalake(1L, "test_metalake", "this is test"); + assertThrows(RuntimeException.class, () -> entityStore.put(duplicateMetalake, false)); + + // overwrite true + BaseMetalake overittenMetalake = createMetalake(1L, "test_metalake2", "this is test2"); + entityStore.put(overittenMetalake, true); + BaseMetalake insertedMetalake1 = + entityStore.get( + overittenMetalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); + assertEquals( + 1, + entityStore.list(Namespace.empty(), BaseMetalake.class, Entity.EntityType.METALAKE).size()); + assertEquals("test_metalake2", insertedMetalake1.name()); + assertEquals("this is test2", insertedMetalake1.comment()); + } + + @Test + public void testPutAndList() throws IOException { + BaseMetalake metalake1 = createMetalake(1L, "test_metalake1", "this is test 1"); + BaseMetalake metalake2 = createMetalake(2L, "test_metalake2", "this is test 2"); + entityStore.put(metalake1, false); + entityStore.put(metalake2, false); + List metalakes = + entityStore.list(metalake1.namespace(), BaseMetalake.class, Entity.EntityType.METALAKE); + assertNotNull(metalakes); + assertEquals(2, metalakes.size()); + assertTrue(checkMetalakeEquals(metalake1, metalakes.get(0))); + assertTrue(checkMetalakeEquals(metalake2, metalakes.get(1))); + } + + @Test + public void testPutAndDelete() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + entityStore.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, false); + assertThrows( + NoSuchEntityException.class, + () -> + entityStore.get( + metalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class)); + } + + @Test + public void testPutAndUpdate() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + assertThrows( + RuntimeException.class, + () -> + entityStore.update( + metalake.nameIdentifier(), + BaseMetalake.class, + Entity.EntityType.METALAKE, + m -> { + BaseMetalake.Builder builder = + new BaseMetalake.Builder() + // Change the id, which is not allowed + .withId(2L) + .withName("test_metalake2") + .withComment("this is test 2") + .withProperties(new HashMap<>()) + .withAuditInfo((AuditInfo) m.auditInfo()) + .withVersion(m.getVersion()); + return builder.build(); + })); + + AuditInfo changedAuditInfo = + AuditInfo.builder().withCreator("changed_creator").withCreateTime(Instant.now()).build(); + BaseMetalake updatedMetalake = + entityStore.update( + metalake.nameIdentifier(), + BaseMetalake.class, + Entity.EntityType.METALAKE, + m -> { + BaseMetalake.Builder builder = + new BaseMetalake.Builder() + .withId(m.id()) + .withName("test_metalake2") + .withComment("this is test 2") + .withProperties(new HashMap<>()) + .withAuditInfo(changedAuditInfo) + .withVersion(m.getVersion()); + return builder.build(); + }); + BaseMetalake storedMetalake = + entityStore.get( + updatedMetalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); + assertEquals(metalake.id(), storedMetalake.id()); + assertEquals("test_metalake2", updatedMetalake.name()); + assertEquals("this is test 2", updatedMetalake.comment()); + assertEquals(changedAuditInfo.creator(), updatedMetalake.auditInfo().creator()); + } + + private static BaseMetalake createMetalake(Long id, String name, String comment) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + return new BaseMetalake.Builder() + .withId(id) + .withName(name) + .withComment(comment) + .withProperties(new HashMap<>()) + .withAuditInfo(auditInfo) + .withVersion(SchemaVersion.V_0_1) + .build(); + } + + private static boolean checkMetalakeEquals(BaseMetalake expected, BaseMetalake actual) { + return expected.id().equals(actual.id()) + && expected.name().equals(actual.name()) + && expected.comment().equals(actual.comment()) + && expected.properties().equals(actual.properties()) + && expected.auditInfo().equals(actual.auditInfo()) + && expected.getVersion().equals(actual.getVersion()); + } + + private static void truncateAllTables() { + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { + try (Connection connection = sqlSession.getConnection()) { + try (Statement statement = connection.createStatement()) { + String query = "SHOW TABLES"; + List tableList = new ArrayList<>(); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + tableList.add(rs.getString(1)); + } + } + for (String table : tableList) { + statement.execute("TRUNCATE TABLE " + table); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Clear table failed", e); + } + } + + private static void dropAllTables() { + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { + try (Connection connection = sqlSession.getConnection()) { + try (Statement statement = connection.createStatement()) { + String query = "SHOW TABLES"; + List tableList = new ArrayList<>(); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + tableList.add(rs.getString(1)); + } + } + for (String table : tableList) { + statement.execute("DROP TABLE " + table); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Drop table failed", e); + } + } +} diff --git a/core/src/test/resources/h2/h2-init.sql b/core/src/test/resources/h2/h2-init.sql new file mode 100644 index 00000000000..7f5590ac8e6 --- /dev/null +++ b/core/src/test/resources/h2/h2-init.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS `metalake_meta` ( + `id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `metalake_name` VARCHAR(128) NOT NULL COMMENT 'metalake name', + `metalake_comment` VARCHAR(256) DEFAULT '' COMMENT 'metalake comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'metalake properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'metalake audit info', + `schema_version` TEXT NOT NULL COMMENT 'metalake schema version info', + PRIMARY KEY (id), + CONSTRAINT uk_mn UNIQUE (metalake_name) +) ENGINE = InnoDB; \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6715033d05e..9217165b73d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -46,6 +46,7 @@ immutables-value = "2.10.0" selenium = "3.141.59" rauschig = "1.2.0" mybatis = "3.5.6" +h2db = "1.4.200" protobuf-plugin = "0.9.2" spotless-plugin = '6.11.0' @@ -151,6 +152,7 @@ sun-activation = { group = "com.sun.activation", name = "javax.activation", vers selenium = { group = "org.seleniumhq.selenium", name = "selenium-java", version.ref = "selenium" } rauschig = { group = "org.rauschig", name = "jarchivelib", version.ref = "rauschig" } mybatis = { group = "org.mybatis", name = "mybatis", version.ref = "mybatis"} +h2db = { group = "com.h2database", name = "h2", version.ref = "h2db"} [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"]