Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Feb 5, 2024
1 parent 1c7021a commit 1fc8cd0
Show file tree
Hide file tree
Showing 12 changed files with 399 additions and 45 deletions.
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ dependencies {
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.mockito.core)
testImplementation(libs.h2db)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -50,17 +51,18 @@ public <E extends Entity & HasIdentifier> List<E> list(
List<MetalakePO> metalakePOS =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.listMetalakePOs();
return (List<E>)
metalakePOS.stream()
return metalakePOS != null
? metalakePOS.stream()
.map(
metalakePO -> {
try {
return POConverters.fromMetalakePO(metalakePO);
return (E) POConverters.fromMetalakePO(metalakePO);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
.collect(Collectors.toList())
: new ArrayList<>();
case CATALOG:
case SCHEMA:
case TABLE:
Expand Down Expand Up @@ -102,16 +104,24 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
try (SqlSession session = SqlSessions.getSqlSession()) {
try {
if (e instanceof BaseMetalake) {
MetalakePO metalakePO = POConverters.toMetalakePO((BaseMetalake) e);
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(e.nameIdentifier().name());
if (!overwritten && metalakePO != null) {
throw new EntityAlreadyExistsException(
String.format("Metalake entity: %s already exists", e.nameIdentifier().name()));
}

if (overwritten) {
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.insertMetalakeMetaWithUpdate(metalakePO);
.insertMetalakeMetaWithUpdate(POConverters.toMetalakePO((BaseMetalake) e));
} else {
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.insertMetalakeMeta(metalakePO);
.insertMetalakeMeta(POConverters.toMetalakePO((BaseMetalake) e));
}
SqlSessions.commitAndCloseSqlSession();
} else {
SqlSessions.closeSqlSession();
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for put operation", e.getClass()));
}
Expand All @@ -138,7 +148,7 @@ public <E extends Entity & HasIdentifier> E update(
Preconditions.checkArgument(
Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()),
String.format(
"The updated metalake entity id: %s is not same with the metalake entity id before: %s",
"The updated metalake entity id: %s should same with the metalake entity id before: %s",
newMetalakeEntity.id(), oldMetalakeEntity.id()));
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.updateMetalakeMeta(POConverters.toMetalakePO(newMetalakeEntity), oldMetalakePO);
Expand Down Expand Up @@ -169,6 +179,9 @@ public <E extends Entity & HasIdentifier> E get(
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
if (metalakePO == null) {
return null;
}
return (E) POConverters.fromMetalakePO(metalakePO);
case CATALOG:
case SCHEMA:
Expand Down Expand Up @@ -197,7 +210,7 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.deleteMetalakeMetaById(metalakeId);
if (cascade) {
// TODO We will cascade delete the metadata of sub-resources under metalake
// TODO We will cascade delete the metadata of sub-resources under the metalake
}
SqlSessions.commitAndCloseSqlSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public interface MetalakeMetaMapper {
+ " audit_info = #{newMetalakeMeta.auditInfo},"
+ " schema_version = #{newMetalakeMeta.schemaVersion}"
+ " WHERE id = #{oldMetalakeMeta.id}"
+ " and metalake_name = #{oldMetalakeMeta.metalakeComment}"
+ " and metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " and metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " and properties = #{oldMetalakeMeta.properties}"
+ " and audit_info = #{oldMetalakeMeta.auditInfo}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;

/**
* SqlSessionFactoryHelper maintains the MyBatis's {@link SqlSessionFactory} object, which is used
* to create the {@link org.apache.ibatis.session.SqlSession} object. It is a singleton class and
* should be initialized only once.
*/
public class SqlSessionFactoryHelper {
private static volatile SqlSessionFactory sqlSessionFactory;
private static final SqlSessionFactoryHelper INSTANCE = new SqlSessionFactoryHelper();
Expand All @@ -27,6 +32,11 @@ public static SqlSessionFactoryHelper getInstance() {
return INSTANCE;
}

/**
* Initialize the SqlSessionFactory object.
*
* @param config Config object to get the MySQL connection details from the config.
*/
@SuppressWarnings("deprecation")
public void init(Config config) {
BasicDataSource dataSource = new BasicDataSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,25 @@

package com.datastrato.gravitino.storage.relation.mysql.orm;

import java.io.Closeable;
import java.io.IOException;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.TransactionIsolationLevel;

public final class SqlSessions implements Closeable {
/**
* SqlSessions is a utility class to maintain the MyBatis's {@link SqlSession} object. It is a
* thread local class and should be used to get the {@link SqlSession} object. It also provides the
* methods to commit, rollback and close the {@link SqlSession} object.
*/
public final class SqlSessions {
private static final ThreadLocal<SqlSession> sessions = new ThreadLocal<>();

private SqlSessions() {}

/**
* Get the SqlSession object. If the SqlSession object is not present in the thread local, then
* create a new SqlSession object and set it in the thread local.
*
* @return SqlSession object from the thread local storage.
*/
public static SqlSession getSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession == null) {
Expand All @@ -28,6 +37,10 @@ public static SqlSession getSqlSession() {
return sqlSession;
}

/**
* Commit the SqlSession object and close it. It also removes the SqlSession object from the
* thread local storage.
*/
public static void commitAndCloseSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession != null) {
Expand All @@ -37,6 +50,10 @@ public static void commitAndCloseSqlSession() {
}
}

/**
* Rollback the SqlSession object and close it. It also removes the SqlSession object from the
* thread local storage.
*/
public static void rollbackAndCloseSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession != null) {
Expand All @@ -46,6 +63,7 @@ public static void rollbackAndCloseSqlSession() {
}
}

/** Close the SqlSession object and remove it from the thread local storage. */
public static void closeSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession != null) {
Expand All @@ -54,12 +72,14 @@ public static void closeSqlSession() {
}
}

/**
* Get the Mapper object from the SqlSession object.
*
* @param className the class name of the Mapper object.
* @return the Mapper object.
* @param <T> the type of the Mapper object.
*/
public static <T> T getMapper(Class className) {
return (T) getSqlSession().getMapper(className);
}

@Override
public void close() throws IOException {
sessions.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

package com.datastrato.gravitino.storage.relation.mysql.po;

import com.datastrato.gravitino.json.JsonUtils;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.SchemaVersion;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Objects;
import java.util.Map;

public class MetalakePO {
private Long id;
Expand Down Expand Up @@ -118,19 +114,18 @@ public MetalakePO.Builder withMetalakeComment(String comment) {
return this;
}

public MetalakePO.Builder withProperties(Map<String, String> properties)
throws JsonProcessingException {
metalakePO.properties = JsonUtils.objectMapper().writeValueAsString(properties);
public MetalakePO.Builder withProperties(String properties) throws JsonProcessingException {
metalakePO.properties = properties;
return this;
}

public MetalakePO.Builder withAuditInfo(AuditInfo auditInfo) throws JsonProcessingException {
metalakePO.auditInfo = JsonUtils.objectMapper().writeValueAsString(auditInfo);
public MetalakePO.Builder withAuditInfo(String auditInfo) throws JsonProcessingException {
metalakePO.auditInfo = auditInfo;
return this;
}

public MetalakePO.Builder withVersion(SchemaVersion version) throws JsonProcessingException {
metalakePO.schemaVersion = JsonUtils.objectMapper().writeValueAsString(version);
public MetalakePO.Builder withVersion(String version) throws JsonProcessingException {
metalakePO.schemaVersion = version;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,35 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.Map;

/** POConverters is a utility class to convert PO to Base and vice versa. */
public class POConverters {
private POConverters() {}

/**
* Convert {@link BaseMetalake} to {@link MetalakePO}
*
* @param baseMetalake BaseMetalake object
* @return MetalakePO object from BaseMetalake object
* @throws JsonProcessingException
*/
public static MetalakePO toMetalakePO(BaseMetalake baseMetalake) throws JsonProcessingException {
return new MetalakePO.Builder()
.withId(baseMetalake.id())
.withMetalakeName(baseMetalake.name())
.withMetalakeComment(baseMetalake.comment())
.withProperties(baseMetalake.properties())
.withAuditInfo((AuditInfo) baseMetalake.auditInfo())
.withVersion(baseMetalake.getVersion())
.withProperties(JsonUtils.objectMapper().writeValueAsString(baseMetalake.properties()))
.withAuditInfo(JsonUtils.objectMapper().writeValueAsString(baseMetalake.auditInfo()))
.withVersion(JsonUtils.objectMapper().writeValueAsString(baseMetalake.getVersion()))
.build();
}

/**
* Convert {@link MetalakePO} to {@link BaseMetalake}
*
* @param metalakePO MetalakePO object
* @return BaseMetalake object from MetalakePO object
* @throws JsonProcessingException
*/
public static BaseMetalake fromMetalakePO(MetalakePO metalakePO) throws JsonProcessingException {
return new BaseMetalake.Builder()
.withId(metalakePO.getId())
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/resources/mysql/mysql_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS `metalake_meta`
(
`id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
`metalake_name` VARCHAR(128) NOT NULL COMMENT 'metalake name',
`metalake_comment` VARCHAR(256) DEFAULT '' COMMENT 'metalake comment',
`properties` MEDIUMTEXT DEFAULT NULL COMMENT 'metalake properties',
`audit_info` MEDIUMTEXT NOT NULL COMMENT 'metalake audit info',
`schema_version` TEXT NOT NULL COMMENT 'metalake schema version info',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_mn` (`metalake_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'metalake metadata';
13 changes: 0 additions & 13 deletions core/src/main/resources/mysql_init/mysql_backend_init.sql

This file was deleted.

Loading

0 comments on commit 1fc8cd0

Please sign in to comment.