diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java index ca3684981a0..e9d9128003a 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessions.java @@ -50,9 +50,12 @@ public static SqlSession getSqlSession() { public static void commitAndCloseSqlSession() { SqlSession sqlSession = sessions.get(); if (sqlSession != null) { - sqlSession.commit(); - sqlSession.close(); - sessions.remove(); + try { + sqlSession.commit(); + sqlSession.close(); + } finally { + sessions.remove(); + } } } @@ -63,9 +66,12 @@ public static void commitAndCloseSqlSession() { public static void rollbackAndCloseSqlSession() { SqlSession sqlSession = sessions.get(); if (sqlSession != null) { - sqlSession.rollback(); - sqlSession.close(); - sessions.remove(); + try { + sqlSession.rollback(); + sqlSession.close(); + } finally { + sessions.remove(); + } } } @@ -73,8 +79,11 @@ public static void rollbackAndCloseSqlSession() { public static void closeSqlSession() { SqlSession sqlSession = sessions.get(); if (sqlSession != null) { - sqlSession.close(); - sessions.remove(); + try { + sqlSession.close(); + } finally { + sessions.remove(); + } } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java index 9510408a932..679f143f1f5 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java @@ -62,6 +62,13 @@ public static MetalakePO initializeMetalakePOVersion(MetalakePO metalakePO) { .build(); } + /** + * Update MetalakePO version + * + * @param oldMetalakePO the old MetalakePO object + * @param newMetalakePO the new MetalakePO object + * @return MetalakePO object with updated version + */ public static MetalakePO updateMetalakePOVersion( MetalakePO oldMetalakePO, MetalakePO newMetalakePO) { Long lastVersion = oldMetalakePO.getLastVersion(); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java index ef4db551a2f..2d9009acfa8 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/SessionUtils.java @@ -29,7 +29,7 @@ private SessionUtils() {} public static void doWithCommit(Class mapperClazz, Consumer consumer) { try (SqlSession session = SqlSessions.getSqlSession()) { try { - T mapper = session.getMapper(mapperClazz); + T mapper = SqlSessions.getMapper(mapperClazz); consumer.accept(mapper); SqlSessions.commitAndCloseSqlSession(); } catch (Throwable t) { @@ -52,7 +52,7 @@ public static void doWithCommit(Class mapperClazz, Consumer consumer) public static R doWithCommitAndFetchResult(Class mapperClazz, Function func) { try (SqlSession session = SqlSessions.getSqlSession()) { try { - T mapper = session.getMapper(mapperClazz); + T mapper = SqlSessions.getMapper(mapperClazz); R result = func.apply(mapper); SqlSessions.commitAndCloseSqlSession(); return result; @@ -93,7 +93,7 @@ public static void doWithoutCommit(Class mapperClazz, Consumer consume public static R getWithoutCommit(Class mapperClazz, Function func) { try (SqlSession session = SqlSessions.getSqlSession()) { try { - T mapper = session.getMapper(mapperClazz); + T mapper = SqlSessions.getMapper(mapperClazz); return func.apply(mapper); } catch (Throwable t) { throw new RuntimeException(t); diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java index 1723f75a1c0..736bbd91595 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java @@ -16,7 +16,10 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; @@ -43,6 +46,52 @@ public void testFromMetalakePO() throws JsonProcessingException { assertEquals(expectedMetalake.getVersion(), convertedMetalake.getVersion()); } + @Test + public void testFromMetalakePOs() throws JsonProcessingException { + MetalakePO metalakePO1 = createMetalakePO(1L, "test", "this is test"); + MetalakePO metalakePO2 = createMetalakePO(2L, "test2", "this is test2"); + List metalakePOs = new ArrayList<>(Arrays.asList(metalakePO1, metalakePO2)); + List convertedMetalakes = POConverters.fromMetalakePOs(metalakePOs); + + BaseMetalake expectedMetalake1 = createMetalake(1L, "test", "this is test"); + BaseMetalake expectedMetalake2 = createMetalake(2L, "test2", "this is test2"); + List expectedMetalakes = + new ArrayList<>(Arrays.asList(expectedMetalake1, expectedMetalake2)); + + // Assert + int index = 0; + for (BaseMetalake metalake : convertedMetalakes) { + assertEquals(expectedMetalakes.get(index).id(), metalake.id()); + assertEquals(expectedMetalakes.get(index).name(), metalake.name()); + assertEquals(expectedMetalakes.get(index).comment(), metalake.comment()); + assertEquals( + expectedMetalakes.get(index).properties().get("key"), metalake.properties().get("key")); + assertEquals( + expectedMetalakes.get(index).auditInfo().creator(), metalake.auditInfo().creator()); + assertEquals(expectedMetalakes.get(index).getVersion(), metalake.getVersion()); + index++; + } + } + + @Test + public void testInitMetalakePOVersion() throws JsonProcessingException { + MetalakePO metalakePO = createMetalakePO(1L, "test", "this is test"); + MetalakePO initPO = POConverters.initializeMetalakePOVersion(metalakePO); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + } + + @Test + public void testUpdateMetalakePOVersion() throws JsonProcessingException { + MetalakePO metalakePO = createMetalakePO(1L, "test", "this is test"); + MetalakePO initPO = POConverters.initializeMetalakePOVersion(metalakePO); + MetalakePO updatePO = POConverters.updateMetalakePOVersion(initPO, initPO); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + } + @Test public void testToMetalakePO() throws JsonProcessingException { BaseMetalake metalake = createMetalake(1L, "test", "this is test");