From 9128db0d2f4bc57dbfb410b51db17887e335cba4 Mon Sep 17 00:00:00 2001 From: xloya <982052490@qq.com> Date: Sun, 14 Apr 2024 12:30:27 +0800 Subject: [PATCH] [#2227] improvement(jdbc-backend): Improve the judgment of exception information in JDBC backend (#2862) ### What changes were proposed in this pull request? Determine exceptions more accurately based on SQL Exception error codes. ### Why are the changes needed? Fix: #2227 ### How was this patch tested? Add the unit tests. --------- Co-authored-by: xiaojiebao --- .../storage/relational/JDBCBackend.java | 2 + .../converters/H2ExceptionConverter.java | 31 ++ .../converters/MySQLExceptionConverter.java | 32 ++ .../converters/SQLExceptionConverter.java | 24 + .../SQLExceptionConverterFactory.java | 43 ++ .../service/CatalogMetaService.java | 4 +- .../service/FilesetMetaService.java | 4 +- .../service/MetalakeMetaService.java | 4 +- .../relational/service/SchemaMetaService.java | 4 +- .../relational/service/TableMetaService.java | 4 +- .../relational/service/TopicMetaService.java | 4 +- .../relational/utils/ExceptionUtils.java | 16 +- .../storage/relational/TestJDBCBackend.java | 469 ++++++++++++++++++ 13 files changed, 619 insertions(+), 22 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/converters/H2ExceptionConverter.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/converters/MySQLExceptionConverter.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverter.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java create mode 100644 core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 1a90fa9fcb1..14a0df4fcac 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -21,6 +21,7 @@ import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.meta.TopicEntity; +import com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory; import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; import com.datastrato.gravitino.storage.relational.service.FilesetMetaService; import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; @@ -44,6 +45,7 @@ public class JDBCBackend implements RelationalBackend { @Override public void initialize(Config config) { SqlSessionFactoryHelper.getInstance().init(config); + SQLExceptionConverterFactory.initConverter(config); } @Override diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/H2ExceptionConverter.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/H2ExceptionConverter.java new file mode 100644 index 00000000000..b47f8ce208d --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/H2ExceptionConverter.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.converters; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import java.sql.SQLException; + +/** + * Exception converter to Gravitino exception for H2. The definition of error codes can be found in + * the document: + */ +public class H2ExceptionConverter implements SQLExceptionConverter { + /** It means found a duplicated primary key or unique key entry in H2. */ + private static final int DUPLICATED_ENTRY_ERROR_CODE = 23505; + + @SuppressWarnings("FormatStringAnnotation") + @Override + public GravitinoRuntimeException toGravitinoException( + SQLException se, Entity.EntityType type, String name) { + switch (se.getErrorCode()) { + case DUPLICATED_ENTRY_ERROR_CODE: + return new AlreadyExistsException(se, se.getMessage()); + default: + return new GravitinoRuntimeException(se, se.getMessage()); + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/MySQLExceptionConverter.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/MySQLExceptionConverter.java new file mode 100644 index 00000000000..de8136629cc --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/MySQLExceptionConverter.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.converters; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import java.sql.SQLException; + +/** + * Exception converter to Gravitino exception for MySQL. The definition of error codes can be found + * in the document: + */ +public class MySQLExceptionConverter implements SQLExceptionConverter { + /** It means found a duplicated primary key or unique key entry in MySQL. */ + private static final int DUPLICATED_ENTRY_ERROR_CODE = 1062; + + @SuppressWarnings("FormatStringAnnotation") + @Override + public GravitinoRuntimeException toGravitinoException( + SQLException se, Entity.EntityType type, String name) { + switch (se.getErrorCode()) { + case DUPLICATED_ENTRY_ERROR_CODE: + return new AlreadyExistsException(se, se.getMessage()); + default: + return new GravitinoRuntimeException(se, se.getMessage()); + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverter.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverter.java new file mode 100644 index 00000000000..47fe684cf93 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverter.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.converters; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import java.sql.SQLException; + +/** Interface for converter JDBC SQL exceptions to Gravitino exceptions. */ +public interface SQLExceptionConverter { + /** + * Convert JDBC exception to GravitinoException. + * + * @param sqlException The sql exception to map + * @param type The type of the entity + * @param name The name of the entity + * @return A best attempt at a corresponding jdbc connector exception or generic with the + * SQLException as the cause + */ + GravitinoRuntimeException toGravitinoException( + SQLException sqlException, Entity.EntityType type, String name); +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java new file mode 100644 index 00000000000..3623b3dc3e6 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/converters/SQLExceptionConverterFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.converters; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.google.common.base.Preconditions; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SQLExceptionConverterFactory { + private static final Pattern TYPE_PATTERN = Pattern.compile("jdbc:(\\w+):"); + private static SQLExceptionConverter converter; + + private SQLExceptionConverterFactory() {} + + public static synchronized void initConverter(Config config) { + if (converter == null) { + String jdbcUrl = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL); + Matcher typeMatcher = TYPE_PATTERN.matcher(jdbcUrl); + if (typeMatcher.find()) { + String jdbcType = typeMatcher.group(1); + if (jdbcType.equalsIgnoreCase("mysql")) { + converter = new MySQLExceptionConverter(); + } else if (jdbcType.equalsIgnoreCase("h2")) { + converter = new H2ExceptionConverter(); + } else { + throw new IllegalArgumentException(String.format("Unsupported jdbc type: %s", jdbcType)); + } + } else { + throw new IllegalArgumentException( + String.format("Cannot find jdbc type in jdbc url: %s", jdbcUrl)); + } + } + } + + public static SQLExceptionConverter getConverter() { + Preconditions.checkState(converter != null, "Exception converter is not initialized."); + return converter; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java index 2da33ea27df..a075dbf8d39 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java @@ -112,7 +112,7 @@ public void insertCatalog(CatalogEntity catalogEntity, boolean overwrite) { } }); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.CATALOG, catalogEntity.nameIdentifier().toString()); throw re; } @@ -147,7 +147,7 @@ public CatalogEntity updateCatalog( POConverters.updateCatalogPOWithVersion(oldCatalogPO, newEntity, metalakeId), oldCatalogPO)); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.CATALOG, newEntity.nameIdentifier().toString()); throw re; } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index c30a738c016..eb8b1924ac2 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -122,7 +122,7 @@ public void insertFileset(FilesetEntity filesetEntity, boolean overwrite) { } })); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.FILESET, filesetEntity.nameIdentifier().toString()); throw re; } @@ -177,7 +177,7 @@ public FilesetEntity updateFileset( mapper -> mapper.updateFilesetMeta(newFilesetPO, oldFilesetPO)); } } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.FILESET, newEntity.nameIdentifier().toString()); throw re; } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java index 505762ba285..7c76ab4ebb5 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -90,7 +90,7 @@ public void insertMetalake(BaseMetalake baseMetalake, boolean overwrite) { } }); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.METALAKE, baseMetalake.nameIdentifier().toString()); throw re; } @@ -125,7 +125,7 @@ public BaseMetalake updateMetalake( MetalakeMetaMapper.class, mapper -> mapper.updateMetalakeMeta(newMetalakePO, oldMetalakePO)); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.METALAKE, newMetalakeEntity.nameIdentifier().toString()); throw re; } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java index fddbd4a6029..c1b8ba490d2 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java @@ -109,7 +109,7 @@ public void insertSchema(SchemaEntity schemaEntity, boolean overwrite) { } }); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.SCHEMA, schemaEntity.nameIdentifier().toString()); throw re; } @@ -142,7 +142,7 @@ public SchemaEntity updateSchema( mapper.updateSchemaMeta( POConverters.updateSchemaPOWithVersion(oldSchemaPO, newEntity), oldSchemaPO)); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.SCHEMA, newEntity.nameIdentifier().toString()); throw re; } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java index afb857f4e79..fc6a03db757 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java @@ -102,7 +102,7 @@ public void insertTable(TableEntity tableEntity, boolean overwrite) { } }); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.TABLE, tableEntity.nameIdentifier().toString()); throw re; } @@ -135,7 +135,7 @@ public TableEntity updateTable( mapper.updateTableMeta( POConverters.updateTablePOWithVersion(oldTablePO, newEntity), oldTablePO)); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.TABLE, newEntity.nameIdentifier().toString()); throw re; } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java index 76d76d30d3f..cc60e266f2d 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java @@ -53,7 +53,7 @@ public void insertTopic(TopicEntity topicEntity, boolean overwrite) { }); // TODO: insert topic dataLayout version after supporting it } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.TOPIC, topicEntity.nameIdentifier().toString()); throw re; } @@ -97,7 +97,7 @@ public TopicEntity updateTopic( mapper.updateTopicMeta( POConverters.updateTopicPOWithVersion(oldTopicPO, newEntity), oldTopicPO)); } catch (RuntimeException re) { - ExceptionUtils.checkSQLConstraintException( + ExceptionUtils.checkSQLException( re, Entity.EntityType.TOPIC, newEntity.nameIdentifier().toString()); throw re; } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/ExceptionUtils.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/ExceptionUtils.java index bf37465a249..f11b3c2fc77 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/ExceptionUtils.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/ExceptionUtils.java @@ -5,21 +5,17 @@ package com.datastrato.gravitino.storage.relational.utils; import com.datastrato.gravitino.Entity; -import com.datastrato.gravitino.exceptions.AlreadyExistsException; -import java.sql.SQLIntegrityConstraintViolationException; +import com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory; +import java.sql.SQLException; public class ExceptionUtils { private ExceptionUtils() {} - public static void checkSQLConstraintException( + public static void checkSQLException( RuntimeException re, Entity.EntityType type, String entityName) { - if (re.getCause() != null - && re.getCause() instanceof SQLIntegrityConstraintViolationException) { - // TODO We should make more fine-grained exception judgments - // Usually throwing `SQLIntegrityConstraintViolationException` means that - // SQL violates the constraints of `primary key` and `unique key`. - // We simply think that the entity already exists at this time. - throw new AlreadyExistsException("%s entity: %s already exists", type.name(), entityName); + if (re.getCause() instanceof SQLException) { + throw SQLExceptionConverterFactory.getConverter() + .toGravitinoException((SQLException) re.getCause(), type, entityName); } } } diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java new file mode 100644 index 00000000000..965e6043678 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java @@ -0,0 +1,469 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational; + +import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER; +import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE; +import static com.datastrato.gravitino.Configs.ENTITY_STORE; +import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; +import com.datastrato.gravitino.file.Fileset; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.meta.FilesetEntity; +import com.datastrato.gravitino.meta.SchemaEntity; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.meta.TableEntity; +import com.datastrato.gravitino.meta.TopicEntity; +import com.datastrato.gravitino.storage.RandomIdGenerator; +import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import com.google.common.collect.ImmutableMap; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import org.apache.commons.io.IOUtils; +import org.apache.ibatis.session.SqlSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestJDBCBackend { + private static final String JDBC_STORE_PATH = + "/tmp/gravitino_jdbc_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + private static final String DB_DIR = JDBC_STORE_PATH + "/testdb"; + private static final Config config = Mockito.mock(Config.class); + public static final ImmutableMap RELATIONAL_BACKENDS = + ImmutableMap.of( + Configs.DEFAULT_ENTITY_RELATIONAL_STORE, JDBCBackend.class.getCanonicalName()); + public static RelationalBackend backend; + + @BeforeAll + public static void setup() { + File dir = new File(DB_DIR); + if (dir.exists() || !dir.isDirectory()) { + dir.delete(); + } + dir.mkdirs(); + Mockito.when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE); + Mockito.when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL)) + .thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", DB_DIR)); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("root"); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("123"); + Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); + + String backendName = config.get(ENTITY_RELATIONAL_STORE); + String className = + RELATIONAL_BACKENDS.getOrDefault(backendName, Configs.DEFAULT_ENTITY_RELATIONAL_STORE); + + try { + backend = (RelationalBackend) Class.forName(className).getDeclaredConstructor().newInstance(); + backend.initialize(config); + } catch (Exception e) { + throw new RuntimeException( + "Failed to create and initialize RelationalBackend by name: " + backendName, e); + } + + prepareJdbcTable(); + } + + @AfterAll + public static void tearDown() throws IOException { + dropAllTables(); + File dir = new File(DB_DIR); + if (dir.exists()) { + dir.delete(); + } + backend.close(); + } + + @BeforeEach + public void init() { + truncateAllTables(); + } + + private static void prepareJdbcTable() { + // Read the ddl sql to create table + String scriptPath = "h2/schema-h2.sql"; + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement()) { + StringBuilder ddlBuilder = new StringBuilder(); + IOUtils.readLines( + Objects.requireNonNull( + TestJDBCBackend.class.getClassLoader().getResourceAsStream(scriptPath)), + StandardCharsets.UTF_8) + .forEach(line -> ddlBuilder.append(line).append("\n")); + statement.execute(ddlBuilder.toString()); + } catch (Exception e) { + throw new IllegalStateException("Create tables failed", e); + } + } + + private static void truncateAllTables() { + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { + try (Connection connection = sqlSession.getConnection()) { + try (Statement statement = connection.createStatement()) { + String query = "SHOW TABLES"; + List tableList = new ArrayList<>(); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + tableList.add(rs.getString(1)); + } + } + for (String table : tableList) { + statement.execute("TRUNCATE TABLE " + table); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Truncate table failed", e); + } + } + + private static void dropAllTables() { + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { + try (Connection connection = sqlSession.getConnection()) { + try (Statement statement = connection.createStatement()) { + String query = "SHOW TABLES"; + List tableList = new ArrayList<>(); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + tableList.add(rs.getString(1)); + } + } + for (String table : tableList) { + statement.execute("DROP TABLE " + table); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Drop table failed", e); + } + } + + @Test + public void testInsertAlreadyExistsException() { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + + BaseMetalake metalake = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", auditInfo); + BaseMetalake metalakeCopy = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", auditInfo); + backend.insert(metalake, false); + assertThrows(AlreadyExistsException.class, () -> backend.insert(metalakeCopy, false)); + + CatalogEntity catalog = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("metalake"), + "catalog", + auditInfo); + CatalogEntity catalogCopy = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("metalake"), + "catalog", + auditInfo); + backend.insert(catalog, false); + assertThrows(AlreadyExistsException.class, () -> backend.insert(catalogCopy, false)); + + SchemaEntity schema = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("metalake", "catalog"), + "schema", + auditInfo); + SchemaEntity schemaCopy = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("metalake", "catalog"), + "schema", + auditInfo); + backend.insert(schema, false); + assertThrows(AlreadyExistsException.class, () -> backend.insert(schemaCopy, false)); + + TableEntity table = + createTableEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofTable("metalake", "catalog", "schema"), + "table", + auditInfo); + TableEntity tableCopy = + createTableEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofTable("metalake", "catalog", "schema"), + "table", + auditInfo); + backend.insert(table, false); + assertThrows(AlreadyExistsException.class, () -> backend.insert(tableCopy, false)); + + FilesetEntity fileset = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset", + auditInfo); + FilesetEntity filesetCopy = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset", + auditInfo); + backend.insert(fileset, false); + assertThrows(AlreadyExistsException.class, () -> backend.insert(filesetCopy, false)); + + TopicEntity topic = + createTopicEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "topic", + auditInfo); + TopicEntity topicCopy = + createTopicEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "topic", + auditInfo); + backend.insert(topic, false); + assertThrows(AlreadyExistsException.class, () -> backend.insert(topicCopy, false)); + } + + @Test + public void testUpdateAlreadyExistsException() { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + + BaseMetalake metalake = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", auditInfo); + BaseMetalake metalakeCopy = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake1", auditInfo); + backend.insert(metalake, false); + backend.insert(metalakeCopy, false); + assertThrows( + AlreadyExistsException.class, + () -> + backend.update( + metalakeCopy.nameIdentifier(), + Entity.EntityType.METALAKE, + e -> createBaseMakeLake(metalakeCopy.id(), "metalake", auditInfo))); + + CatalogEntity catalog = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("metalake"), + "catalog", + auditInfo); + CatalogEntity catalogCopy = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("metalake"), + "catalog1", + auditInfo); + backend.insert(catalog, false); + backend.insert(catalogCopy, false); + assertThrows( + AlreadyExistsException.class, + () -> + backend.update( + catalogCopy.nameIdentifier(), + Entity.EntityType.CATALOG, + e -> + createCatalog( + catalogCopy.id(), catalogCopy.namespace(), "catalog", auditInfo))); + + SchemaEntity schema = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("metalake", "catalog"), + "schema", + auditInfo); + SchemaEntity schemaCopy = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("metalake", "catalog"), + "schema1", + auditInfo); + backend.insert(schema, false); + backend.insert(schemaCopy, false); + assertThrows( + AlreadyExistsException.class, + () -> + backend.update( + schemaCopy.nameIdentifier(), + Entity.EntityType.SCHEMA, + e -> + createSchemaEntity( + schemaCopy.id(), schemaCopy.namespace(), "schema", auditInfo))); + + TableEntity table = + createTableEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofTable("metalake", "catalog", "schema"), + "table", + auditInfo); + TableEntity tableCopy = + createTableEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofTable("metalake", "catalog", "schema"), + "table1", + auditInfo); + backend.insert(table, false); + backend.insert(tableCopy, false); + assertThrows( + AlreadyExistsException.class, + () -> + backend.update( + tableCopy.nameIdentifier(), + Entity.EntityType.TABLE, + e -> createTableEntity(tableCopy.id(), tableCopy.namespace(), "table", auditInfo))); + + FilesetEntity fileset = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset", + auditInfo); + FilesetEntity filesetCopy = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset1", + auditInfo); + backend.insert(fileset, false); + backend.insert(filesetCopy, false); + assertThrows( + AlreadyExistsException.class, + () -> + backend.update( + filesetCopy.nameIdentifier(), + Entity.EntityType.FILESET, + e -> + createFilesetEntity( + filesetCopy.id(), filesetCopy.namespace(), "fileset", auditInfo))); + + TopicEntity topic = + createTopicEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "topic", + auditInfo); + TopicEntity topicCopy = + createTopicEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "topic1", + auditInfo); + backend.insert(topic, false); + backend.insert(topicCopy, false); + assertThrows( + AlreadyExistsException.class, + () -> + backend.update( + topicCopy.nameIdentifier(), + Entity.EntityType.TOPIC, + e -> createTopicEntity(topicCopy.id(), topicCopy.namespace(), "topic", auditInfo))); + } + + public static BaseMetalake createBaseMakeLake(Long id, String name, AuditInfo auditInfo) { + return BaseMetalake.builder() + .withId(id) + .withName(name) + .withAuditInfo(auditInfo) + .withComment("") + .withProperties(null) + .withVersion(SchemaVersion.V_0_1) + .build(); + } + + public static CatalogEntity createCatalog( + Long id, Namespace namespace, String name, AuditInfo auditInfo) { + return CatalogEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withType(Catalog.Type.RELATIONAL) + .withProvider("test") + .withComment("") + .withProperties(null) + .withAuditInfo(auditInfo) + .build(); + } + + public static SchemaEntity createSchemaEntity( + Long id, Namespace namespace, String name, AuditInfo auditInfo) { + return SchemaEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withComment("") + .withProperties(null) + .withAuditInfo(auditInfo) + .build(); + } + + public static TableEntity createTableEntity( + Long id, Namespace namespace, String name, AuditInfo auditInfo) { + return TableEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withAuditInfo(auditInfo) + .build(); + } + + public static FilesetEntity createFilesetEntity( + Long id, Namespace namespace, String name, AuditInfo auditInfo) { + return FilesetEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withFilesetType(Fileset.Type.MANAGED) + .withStorageLocation("/tmp") + .withComment("") + .withProperties(null) + .withAuditInfo(auditInfo) + .build(); + } + + public static TopicEntity createTopicEntity( + Long id, Namespace namespace, String name, AuditInfo auditInfo) { + return TopicEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withComment("test comment") + .withProperties(ImmutableMap.of("key", "value")) + .withAuditInfo(auditInfo) + .build(); + } +}