Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Feb 6, 2024
1 parent bd3f04a commit bc7d1e2
Show file tree
Hide file tree
Showing 12 changed files with 423 additions and 163 deletions.
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ dependencies {
testImplementation(libs.mockito.core)

testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.h2db)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.storage.relational.RelationalBackend;
import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessionFactoryHelper;
import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessions;
import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO;
import com.datastrato.gravitino.storage.relation.mysql.utils.POConverters;
import com.datastrato.gravitino.storage.relational.mysql.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relational.mysql.orm.SqlSessionFactoryHelper;
import com.datastrato.gravitino.storage.relational.mysql.orm.SqlSessions;
import com.datastrato.gravitino.storage.relational.mysql.po.MetalakePO;
import com.datastrato.gravitino.storage.relational.mysql.utils.POConverters;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
Expand All @@ -45,54 +46,61 @@ public void initialize(Config config) {
public <E extends Entity & HasIdentifier> List<E> list(
Namespace namespace, Entity.EntityType entityType) {
try (SqlSession session = SqlSessions.getSqlSession()) {
switch (entityType) {
case METALAKE:
List<MetalakePO> metalakePOS =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.listMetalakePOs();
return (List<E>)
metalakePOS.stream()
.map(
metalakePO -> {
try {
return POConverters.fromMetalakePO(metalakePO);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for list operation", entityType));
try {
switch (entityType) {
case METALAKE:
List<MetalakePO> metalakePOS =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.listMetalakePOs();
return metalakePOS != null
? metalakePOS.stream()
.map(
metalakePO -> {
try {
return (E) POConverters.fromMetalakePO(metalakePO);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList())
: new ArrayList<>();
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for list operation", entityType));
}
} catch (Throwable t) {
SqlSessions.closeSqlSession();
throw new RuntimeException(t);
}
} finally {
SqlSessions.closeSqlSession();
}
}

@Override
public boolean exists(NameIdentifier ident, Entity.EntityType entityType) {
try (SqlSession session = SqlSessions.getSqlSession()) {
switch (entityType) {
case METALAKE:
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
return metalakePO != null;
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for exists operation", entityType));
try {
switch (entityType) {
case METALAKE:
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
return metalakePO != null;
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for exists operation", entityType));
}
} catch (Throwable t) {
SqlSessions.closeSqlSession();
throw new RuntimeException(t);
}
} finally {
SqlSessions.closeSqlSession();
}
}

Expand All @@ -102,18 +110,25 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
try (SqlSession session = SqlSessions.getSqlSession()) {
try {
if (e instanceof BaseMetalake) {
MetalakePO metalakePO = POConverters.toMetalakePO((BaseMetalake) e);
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(e.nameIdentifier().name());
if (!overwritten && metalakePO != null) {
throw new EntityAlreadyExistsException(
String.format("Metalake entity: %s already exists", e.nameIdentifier().name()));
}

if (overwritten) {
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.insertMetalakeMetaWithUpdate(metalakePO);
.insertMetalakeMetaWithUpdate(POConverters.toMetalakePO((BaseMetalake) e));
} else {
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.insertMetalakeMeta(metalakePO);
.insertMetalakeMeta(POConverters.toMetalakePO((BaseMetalake) e));
}
SqlSessions.commitAndCloseSqlSession();
} else {
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for put operation", e.getClass()));
String.format("Unsupported entity type: %s for insert operation", e.getClass()));
}
} catch (Throwable t) {
SqlSessions.rollbackAndCloseSqlSession();
Expand All @@ -138,7 +153,7 @@ public <E extends Entity & HasIdentifier> E update(
Preconditions.checkArgument(
Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()),
String.format(
"The updated metalake entity id: %s is not same with the metalake entity id before: %s",
"The updated metalake entity id: %s should same with the metalake entity id before: %s",
newMetalakeEntity.id(), oldMetalakeEntity.id()));
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.updateMetalakeMeta(POConverters.toMetalakePO(newMetalakeEntity), oldMetalakePO);
Expand All @@ -164,22 +179,28 @@ public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType)
throws NoSuchEntityException, IOException {
try (SqlSession session = SqlSessions.getSqlSession()) {
switch (entityType) {
case METALAKE:
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
return (E) POConverters.fromMetalakePO(metalakePO);
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for get operation", entityType));
try {
switch (entityType) {
case METALAKE:
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
if (metalakePO == null) {
return null;
}
return (E) POConverters.fromMetalakePO(metalakePO);
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for get operation", entityType));
}
} catch (Throwable t) {
SqlSessions.closeSqlSession();
throw new RuntimeException(t);
}
} finally {
SqlSessions.closeSqlSession();
}
}

Expand All @@ -197,7 +218,7 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.deleteMetalakeMetaById(metalakeId);
if (cascade) {
// TODO We will cascade delete the metadata of sub-resources under metalake
// TODO We will cascade delete the metadata of sub-resources under the metalake
}
SqlSessions.commitAndCloseSqlSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relation.mysql.mapper;
package com.datastrato.gravitino.storage.relational.mysql.mapper;

import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO;
import com.datastrato.gravitino.storage.relational.mysql.po.MetalakePO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
Expand Down Expand Up @@ -78,7 +78,7 @@ public interface MetalakeMetaMapper {
+ " audit_info = #{newMetalakeMeta.auditInfo},"
+ " schema_version = #{newMetalakeMeta.schemaVersion}"
+ " WHERE id = #{oldMetalakeMeta.id}"
+ " and metalake_name = #{oldMetalakeMeta.metalakeComment}"
+ " and metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " and metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " and properties = #{oldMetalakeMeta.properties}"
+ " and audit_info = #{oldMetalakeMeta.auditInfo}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relation.mysql.orm;
package com.datastrato.gravitino.storage.relational.mysql.orm;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relational.mysql.mapper.MetalakeMetaMapper;
import com.google.common.base.Preconditions;
import java.time.Duration;
import org.apache.commons.dbcp2.BasicDataSource;
Expand All @@ -19,6 +19,11 @@
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;

/**
* SqlSessionFactoryHelper maintains the MyBatis's {@link SqlSessionFactory} object, which is used
* to create the {@link org.apache.ibatis.session.SqlSession} object. It is a singleton class and
* should be initialized only once.
*/
public class SqlSessionFactoryHelper {
private static volatile SqlSessionFactory sqlSessionFactory;
private static final SqlSessionFactoryHelper INSTANCE = new SqlSessionFactoryHelper();
Expand All @@ -27,6 +32,13 @@ public static SqlSessionFactoryHelper getInstance() {
return INSTANCE;
}

private SqlSessionFactoryHelper() {}

/**
* Initialize the SqlSessionFactory object.
*
* @param config Config object to get the MySQL connection details from the config.
*/
@SuppressWarnings("deprecation")
public void init(Config config) {
BasicDataSource dataSource = new BasicDataSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relation.mysql.orm;
package com.datastrato.gravitino.storage.relational.mysql.orm;

import java.io.Closeable;
import java.io.IOException;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.TransactionIsolationLevel;

public final class SqlSessions implements Closeable {
/**
* SqlSessions is a utility class to maintain the MyBatis's {@link SqlSession} object. It is a
* thread local class and should be used to get the {@link SqlSession} object. It also provides the
* methods to commit, rollback and close the {@link SqlSession} object.
*/
public final class SqlSessions {
private static final ThreadLocal<SqlSession> sessions = new ThreadLocal<>();

private SqlSessions() {}

/**
* Get the SqlSession object. If the SqlSession object is not present in the thread local, then
* create a new SqlSession object and set it in the thread local.
*
* @return SqlSession object from the thread local storage.
*/
public static SqlSession getSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession == null) {
Expand All @@ -28,6 +37,10 @@ public static SqlSession getSqlSession() {
return sqlSession;
}

/**
* Commit the SqlSession object and close it. It also removes the SqlSession object from the
* thread local storage.
*/
public static void commitAndCloseSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession != null) {
Expand All @@ -37,6 +50,10 @@ public static void commitAndCloseSqlSession() {
}
}

/**
* Rollback the SqlSession object and close it. It also removes the SqlSession object from the
* thread local storage.
*/
public static void rollbackAndCloseSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession != null) {
Expand All @@ -46,6 +63,7 @@ public static void rollbackAndCloseSqlSession() {
}
}

/** Close the SqlSession object and remove it from the thread local storage. */
public static void closeSqlSession() {
SqlSession sqlSession = sessions.get();
if (sqlSession != null) {
Expand All @@ -54,12 +72,14 @@ public static void closeSqlSession() {
}
}

/**
* Get the Mapper object from the SqlSession object.
*
* @param className the class name of the Mapper object.
* @return the Mapper object.
* @param <T> the type of the Mapper object.
*/
public static <T> T getMapper(Class className) {
return (T) getSqlSession().getMapper(className);
}

@Override
public void close() throws IOException {
sessions.remove();
}
}
Loading

0 comments on commit bc7d1e2

Please sign in to comment.