diff --git a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java index 93d23f21bfc..a94333d6a2e 100644 --- a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java +++ b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java @@ -39,6 +39,8 @@ import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.types.Type; import com.datastrato.gravitino.rel.types.Types; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; @@ -232,6 +234,7 @@ public static ObjectMapper objectMapper() { .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true) .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS) .build() + .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) .registerModule(new JavaTimeModule()); } } diff --git a/core/build.gradle.kts b/core/build.gradle.kts index c92350a32cd..63c5f1385ec 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -20,8 +20,10 @@ dependencies { implementation(libs.bundles.log4j) implementation(libs.commons.lang3) implementation(libs.commons.io) + implementation(libs.commons.dbcp2) implementation(libs.caffeine) implementation(libs.rocksdbjni) + implementation(libs.mybatis) implementation(libs.bundles.metrics) implementation(libs.bundles.prometheus) diff --git a/core/src/main/java/com/datastrato/gravitino/Configs.java b/core/src/main/java/com/datastrato/gravitino/Configs.java index 850efb82682..e6e9b77f3f7 100644 --- a/core/src/main/java/com/datastrato/gravitino/Configs.java +++ b/core/src/main/java/com/datastrato/gravitino/Configs.java @@ -20,6 +20,11 @@ public interface Configs { String DEFAULT_ENTITY_RELATION_STORE = "MySQLBackend"; String ENTITY_RELATION_STORE_KEY = "gravitino.entity.store.rel"; + String MYSQL_ENTITY_STORE_URL_KEY = "gravitino.entity.store.mysql.url"; + String MYSQL_ENTITY_STORE_DRIVER_NAME_KEY = "gravitino.entity.store.mysql.driverName"; + String MYSQL_ENTITY_STORE_USERNAME_KEY = "gravitino.entity.store.mysql.username"; + String MYSQL_ENTITY_STORE_PASSWORD_KEY = "gravitino.entity.store.mysql.password"; + String ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY = "gravitino.entity.store.kv.rocksdbPath"; Long DEFAULT_KV_DELETE_AFTER_TIME = 604800000L; // 7 days diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java index 355fc2d2a81..1ff8dca7f39 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java @@ -10,11 +10,23 @@ import com.datastrato.gravitino.HasIdentifier; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.BaseMetalake; import com.datastrato.gravitino.storage.relation.RelationBackend; +import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper; +import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessionFactoryHelper; +import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessions; +import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO; +import com.datastrato.gravitino.storage.relation.mysql.utils.POConverters; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.ibatis.session.SqlSession; /** * {@link MySQLBackend} is a MySQL implementation of RelationBackend interface. If we want to use @@ -26,46 +38,185 @@ public class MySQLBackend implements RelationBackend { /** Initialize the MySQL backend instance. */ @Override public void initialize(Config config) { - throw new UnsupportedOperationException("Unsupported operation now."); + SqlSessionFactoryHelper.getInstance().init(config); } @Override public List list( - Namespace namespace, Entity.EntityType entityType) throws NoSuchEntityException { - throw new UnsupportedOperationException("Unsupported operation now."); + Namespace namespace, Entity.EntityType entityType) { + try (SqlSession session = SqlSessions.getSqlSession()) { + switch (entityType) { + case METALAKE: + List metalakePOS = + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .listMetalakePOs(); + return (List) + metalakePOS.stream() + .map( + metalakePO -> { + try { + return POConverters.fromMetalakePO(metalakePO); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + case CATALOG: + case SCHEMA: + case TABLE: + case FILESET: + default: + throw new IllegalArgumentException( + String.format("Unsupported entity type: %s for list operation", entityType)); + } + } finally { + SqlSessions.closeSqlSession(); + } } @Override public boolean exists(NameIdentifier ident, Entity.EntityType entityType) { - throw new UnsupportedOperationException("Unsupported operation now."); + try (SqlSession session = SqlSessions.getSqlSession()) { + switch (entityType) { + case METALAKE: + MetalakePO metalakePO = + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .selectMetalakeMetaByName(ident.name()); + return metalakePO != null; + case CATALOG: + case SCHEMA: + case TABLE: + case FILESET: + default: + throw new IllegalArgumentException( + String.format("Unsupported entity type: %s for exists operation", entityType)); + } + } finally { + SqlSessions.closeSqlSession(); + } } @Override public void insert(E e, boolean overwritten) throws EntityAlreadyExistsException { - throw new UnsupportedOperationException("Unsupported operation now."); + try (SqlSession session = SqlSessions.getSqlSession()) { + try { + if (e instanceof BaseMetalake) { + MetalakePO metalakePO = POConverters.toMetalakePO((BaseMetalake) e); + if (overwritten) { + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .insertMetalakeMetaWithUpdate(metalakePO); + } else { + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .insertMetalakeMeta(metalakePO); + } + SqlSessions.commitAndCloseSqlSession(); + } else { + throw new IllegalArgumentException( + String.format("Unsupported entity type: %s for put operation", e.getClass())); + } + } catch (Throwable t) { + SqlSessions.rollbackAndCloseSqlSession(); + throw new RuntimeException(t); + } + } } @Override public E update( NameIdentifier ident, Entity.EntityType entityType, Function updater) - throws NoSuchEntityException { - throw new UnsupportedOperationException("Unsupported operation now."); + throws NoSuchEntityException, AlreadyExistsException { + try (SqlSession session = SqlSessions.getSqlSession()) { + try { + switch (entityType) { + case METALAKE: + BaseMetalake oldMetalakeEntity = + POConverters.fromMetalakePO( + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .selectMetalakeMetaByName(ident.name())); + BaseMetalake newMetalakeEntity = (BaseMetalake) updater.apply((E) oldMetalakeEntity); + Preconditions.checkArgument( + Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()), + String.format( + "The updated metalake entity id: %s is not same with the metalake entity id before: %s", + newMetalakeEntity.id(), oldMetalakeEntity.id())); + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .updateMetalakeMeta(POConverters.toMetalakePO(newMetalakeEntity)); + SqlSessions.commitAndCloseSqlSession(); + return (E) newMetalakeEntity; + case CATALOG: + case SCHEMA: + case TABLE: + case FILESET: + default: + throw new IllegalArgumentException( + String.format("Unsupported entity type: %s for update operation", entityType)); + } + } catch (Throwable t) { + SqlSessions.rollbackAndCloseSqlSession(); + throw new RuntimeException(t); + } + } } @Override public E get( - NameIdentifier ident, Entity.EntityType entityType) throws IOException { - throw new UnsupportedOperationException("Unsupported operation now."); + NameIdentifier ident, Entity.EntityType entityType) + throws NoSuchEntityException, IOException { + try (SqlSession session = SqlSessions.getSqlSession()) { + switch (entityType) { + case METALAKE: + MetalakePO metalakePO = + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .selectMetalakeMetaByName(ident.name()); + return (E) POConverters.fromMetalakePO(metalakePO); + case CATALOG: + case SCHEMA: + case TABLE: + case FILESET: + default: + throw new IllegalArgumentException( + String.format("Unsupported entity type: %s for get operation", entityType)); + } + } finally { + SqlSessions.closeSqlSession(); + } } @Override public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) { - throw new UnsupportedOperationException("Unsupported operation now."); + try (SqlSession session = SqlSessions.getSqlSession()) { + try { + switch (entityType) { + case METALAKE: + Long metalakeId = + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .selectMetalakeIdMetaByName(ident.name()); + if (metalakeId != null) { + // delete metalake + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .deleteMetalakeMetaById(metalakeId); + if (cascade) { + // TODO We will cascade delete the metadata of sub-resources under metalake + } + SqlSessions.commitAndCloseSqlSession(); + } + return true; + case CATALOG: + case SCHEMA: + case TABLE: + case FILESET: + default: + throw new IllegalArgumentException( + String.format("Unsupported entity type: %s for delete operation", entityType)); + } + } catch (Throwable t) { + SqlSessions.rollbackAndCloseSqlSession(); + throw new RuntimeException(t); + } + } } @Override - public void close() throws IOException { - throw new UnsupportedOperationException("Unsupported operation now."); - } + public void close() throws IOException {} } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/MetalakeMetaMapper.java new file mode 100644 index 00000000000..2ab836d6578 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/MetalakeMetaMapper.java @@ -0,0 +1,85 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation.mysql.mapper; + +import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO; +import java.util.List; +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +public interface MetalakeMetaMapper { + String TABLE_NAME = "metalake_meta"; + + @Select( + "SELECT id, metalake_name as metalakeName, metalake_comment as metalakeComment," + + " properties, audit_info as auditInfo, schema_version as schemaVersion" + + " FROM " + + TABLE_NAME) + List listMetalakePOs(); + + @Select( + "SELECT id, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion" + + " FROM " + + TABLE_NAME + + " WHERE metalake_name = #{metalakeName}") + MetalakePO selectMetalakeMetaByName(@Param("metalakeName") String name); + + @Select("SELECT id FROM " + TABLE_NAME + " WHERE metalake_name = #{metalakeName}") + Long selectMetalakeIdMetaByName(@Param("metalakeName") String name); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(id, metalake_name, metalake_comment, properties, audit_info, schema_version)" + + " VALUES(" + + " #{metalakeMeta.id}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}" + + " )") + void insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(id, metalake_name, metalake_comment, properties, audit_info, schema_version)" + + " VALUES(" + + " #{metalakeMeta.id}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " metalake_name = #{metalakeMeta.metalakeName}," + + " metalake_comment = #{metalakeMeta.metalakeComment}," + + " properties = #{metalakeMeta.properties}," + + " audit_info = #{metalakeMeta.auditInfo}," + + " schema_version = #{metalakeMeta.schemaVersion}") + void insertMetalakeMetaWithUpdate(@Param("metalakeMeta") MetalakePO metalakePO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET metalake_name = #{metalakeMeta.metalakeName}," + + " metalake_comment = #{metalakeMeta.metalakeComment}," + + " properties = #{metalakeMeta.properties}," + + " audit_info = #{metalakeMeta.auditInfo}," + + " schema_version = #{metalakeMeta.schemaVersion}" + + " WHERE id = #{metalakeMeta.id}") + void updateMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO); + + @Delete("DELETE FROM " + TABLE_NAME + " WHERE id = #{id}") + Integer deleteMetalakeMetaById(@Param("id") Long id); +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessionFactoryHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessionFactoryHelper.java new file mode 100644 index 00000000000..9985bc5ab4d --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessionFactoryHelper.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation.mysql.orm; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper; +import com.google.common.base.Preconditions; +import java.time.Duration; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.pool2.impl.BaseObjectPoolConfig; +import org.apache.ibatis.mapping.Environment; +import org.apache.ibatis.session.Configuration; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.SqlSessionFactoryBuilder; +import org.apache.ibatis.transaction.TransactionFactory; +import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; + +public class SqlSessionFactoryHelper { + private static volatile SqlSessionFactory sqlSessionFactory; + private static final SqlSessionFactoryHelper INSTANCE = new SqlSessionFactoryHelper(); + + public static SqlSessionFactoryHelper getInstance() { + return INSTANCE; + } + + @SuppressWarnings("deprecation") + public void init(Config config) { + BasicDataSource dataSource = new BasicDataSource(); + dataSource.setUrl(config.getRawString(Configs.MYSQL_ENTITY_STORE_URL_KEY)); + dataSource.setDriverClassName(config.getRawString(Configs.MYSQL_ENTITY_STORE_DRIVER_NAME_KEY)); + dataSource.setUsername(config.getRawString(Configs.MYSQL_ENTITY_STORE_USERNAME_KEY, "")); + dataSource.setPassword(config.getRawString(Configs.MYSQL_ENTITY_STORE_PASSWORD_KEY, "")); + // close the auto commit, so that need manual commit + dataSource.setDefaultAutoCommit(false); + dataSource.setMaxWaitMillis(1000L); + dataSource.setMaxTotal(20); + dataSource.setMaxIdle(5); + dataSource.setMinIdle(0); + dataSource.setLogAbandoned(true); + dataSource.setRemoveAbandonedOnBorrow(true); + dataSource.setRemoveAbandonedTimeout(60); + dataSource.setTimeBetweenEvictionRunsMillis(Duration.ofMillis(10 * 60 * 1000L).toMillis()); + dataSource.setTestOnBorrow(BaseObjectPoolConfig.DEFAULT_TEST_ON_BORROW); + dataSource.setTestWhileIdle(BaseObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE); + dataSource.setMinEvictableIdleTimeMillis(1000); + dataSource.setNumTestsPerEvictionRun(BaseObjectPoolConfig.DEFAULT_NUM_TESTS_PER_EVICTION_RUN); + dataSource.setTestOnReturn(BaseObjectPoolConfig.DEFAULT_TEST_ON_RETURN); + dataSource.setSoftMinEvictableIdleTimeMillis( + BaseObjectPoolConfig.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME.toMillis()); + dataSource.setLifo(BaseObjectPoolConfig.DEFAULT_LIFO); + TransactionFactory transactionFactory = new JdbcTransactionFactory(); + Environment environment = new Environment("development", transactionFactory, dataSource); + Configuration configuration = new Configuration(environment); + configuration.addMapper(MetalakeMetaMapper.class); + if (sqlSessionFactory == null) { + synchronized (SqlSessionFactoryHelper.class) { + if (sqlSessionFactory == null) { + sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration); + } + } + } + } + + public SqlSessionFactory getSqlSessionFactory() { + Preconditions.checkState(sqlSessionFactory != null, "SqlSessionFactory is not initialized."); + return sqlSessionFactory; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessions.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessions.java new file mode 100644 index 00000000000..13db581d41a --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessions.java @@ -0,0 +1,65 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation.mysql.orm; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.TransactionIsolationLevel; + +public final class SqlSessions implements Closeable { + private static final ThreadLocal sessions = new ThreadLocal<>(); + + private SqlSessions() {} + + public static SqlSession getSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession == null) { + sqlSession = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .openSession(TransactionIsolationLevel.READ_COMMITTED); + sessions.set(sqlSession); + return sqlSession; + } + return sqlSession; + } + + public static void commitAndCloseSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession != null) { + sqlSession.commit(); + sqlSession.close(); + sessions.remove(); + } + } + + public static void rollbackAndCloseSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession != null) { + sqlSession.rollback(); + sqlSession.close(); + sessions.remove(); + } + } + + public static void closeSqlSession() { + SqlSession sqlSession = sessions.get(); + if (sqlSession != null) { + sqlSession.close(); + sessions.remove(); + } + } + + public static T getMapper(Class className) { + return (T) getSqlSession().getMapper(className); + } + + @Override + public void close() throws IOException { + sessions.remove(); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/po/MetalakePO.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/po/MetalakePO.java new file mode 100644 index 00000000000..11328b8083d --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/po/MetalakePO.java @@ -0,0 +1,141 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation.mysql.po; + +import com.datastrato.gravitino.json.JsonUtils; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Objects; +import java.util.Map; + +public class MetalakePO { + private Long id; + private String metalakeName; + private String metalakeComment; + private String properties; + private String auditInfo; + private String schemaVersion; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getMetalakeName() { + return metalakeName; + } + + public void setMetalakeName(String metalakeName) { + this.metalakeName = metalakeName; + } + + public String getMetalakeComment() { + return metalakeComment; + } + + public void setMetalakeComment(String metalakeComment) { + this.metalakeComment = metalakeComment; + } + + public String getProperties() { + return properties; + } + + public void setProperties(String properties) { + this.properties = properties; + } + + public String getAuditInfo() { + return auditInfo; + } + + public void setAuditInfo(String auditInfo) { + this.auditInfo = auditInfo; + } + + public String getSchemaVersion() { + return schemaVersion; + } + + public void setSchemaVersion(String schemaVersion) { + this.schemaVersion = schemaVersion; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MetalakePO)) { + return false; + } + MetalakePO that = (MetalakePO) o; + return Objects.equal(getId(), that.getId()) + && Objects.equal(getMetalakeName(), that.getMetalakeName()) + && Objects.equal(getMetalakeComment(), that.getMetalakeComment()) + && Objects.equal(getProperties(), that.getProperties()) + && Objects.equal(getAuditInfo(), that.getAuditInfo()) + && Objects.equal(getSchemaVersion(), that.getSchemaVersion()); + } + + @Override + public int hashCode() { + return Objects.hashCode( + getId(), + getMetalakeName(), + getMetalakeComment(), + getProperties(), + getAuditInfo(), + getSchemaVersion()); + } + + public static class Builder { + private final MetalakePO metalakePO; + + public Builder() { + metalakePO = new MetalakePO(); + } + + public MetalakePO.Builder withId(Long id) { + metalakePO.id = id; + return this; + } + + public MetalakePO.Builder withMetalakeName(String name) { + metalakePO.metalakeName = name; + return this; + } + + public MetalakePO.Builder withMetalakeComment(String comment) { + metalakePO.metalakeComment = comment; + return this; + } + + public MetalakePO.Builder withProperties(Map properties) + throws JsonProcessingException { + metalakePO.properties = JsonUtils.objectMapper().writeValueAsString(properties); + return this; + } + + public MetalakePO.Builder withAuditInfo(AuditInfo auditInfo) throws JsonProcessingException { + metalakePO.auditInfo = JsonUtils.objectMapper().writeValueAsString(auditInfo); + return this; + } + + public MetalakePO.Builder withVersion(SchemaVersion version) throws JsonProcessingException { + metalakePO.schemaVersion = JsonUtils.objectMapper().writeValueAsString(version); + return this; + } + + public MetalakePO build() { + return metalakePO; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/utils/POConverters.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/utils/POConverters.java new file mode 100644 index 00000000000..0c366b847fb --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/utils/POConverters.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation.mysql.utils; + +import com.datastrato.gravitino.json.JsonUtils; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.Map; + +public class POConverters { + private POConverters() {} + + public static MetalakePO toMetalakePO(BaseMetalake baseMetalake) throws JsonProcessingException { + return new MetalakePO.Builder() + .withId(baseMetalake.id()) + .withMetalakeName(baseMetalake.name()) + .withMetalakeComment(baseMetalake.comment()) + .withProperties(baseMetalake.properties()) + .withAuditInfo((AuditInfo) baseMetalake.auditInfo()) + .withVersion(baseMetalake.getVersion()) + .build(); + } + + public static BaseMetalake fromMetalakePO(MetalakePO metalakePO) throws JsonProcessingException { + return new BaseMetalake.Builder() + .withId(metalakePO.getId()) + .withName(metalakePO.getMetalakeName()) + .withComment(metalakePO.getMetalakeComment()) + .withProperties(JsonUtils.objectMapper().readValue(metalakePO.getProperties(), Map.class)) + .withAuditInfo( + JsonUtils.objectMapper().readValue(metalakePO.getAuditInfo(), AuditInfo.class)) + .withVersion( + JsonUtils.objectMapper().readValue(metalakePO.getSchemaVersion(), SchemaVersion.class)) + .build(); + } +} diff --git a/core/src/main/resources/mysql_init/mysql_backend_init.sql b/core/src/main/resources/mysql_init/mysql_backend_init.sql new file mode 100644 index 00000000000..1e9c0760410 --- /dev/null +++ b/core/src/main/resources/mysql_init/mysql_backend_init.sql @@ -0,0 +1,13 @@ +CREATE DATABASE IF NOT EXISTS `gravitino_meta` DEFAULT CHARACTER SET utf8mb4; +USE `gravitino_meta`; +CREATE TABLE IF NOT EXISTS `metalake_meta` +( + `id` bigint(20) unsigned NOT NULL COMMENT 'metalake id', + `metalake_name` varchar(128) NOT NULL COMMENT 'metalake name', + `metalake_comment` varchar(256) DEFAULT '' COMMENT 'metalake comment', + `properties` mediumtext DEFAULT NULL COMMENT 'metalake properties', + `audit_info` mediumtext NOT NULL COMMENT 'metalake audit info', + `schema_version` text NOT NULL COMMENT 'metalake schema version info', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_mn` (`metalake_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'metalake metadata'; \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index adec13499bd..6715033d05e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -45,6 +45,7 @@ postgresql = "42.6.0" immutables-value = "2.10.0" selenium = "3.141.59" rauschig = "1.2.0" +mybatis = "3.5.6" protobuf-plugin = "0.9.2" spotless-plugin = '6.11.0' @@ -149,6 +150,7 @@ sun-activation = { group = "com.sun.activation", name = "javax.activation", vers selenium = { group = "org.seleniumhq.selenium", name = "selenium-java", version.ref = "selenium" } rauschig = { group = "org.rauschig", name = "jarchivelib", version.ref = "rauschig" } +mybatis = { group = "org.mybatis", name = "mybatis", version.ref = "mybatis"} [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"]