Skip to content

Commit

Permalink
[#5760][#5780] fix(catalog): fix drop catalog error (#5761)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

 before drop the catalog, check all schemas are avaliable

### Why are the changes needed?

some schemas are dropped externally, but still exist in the entity
store, those schemas are invalid

Fix: #5760 
Fix: #5780 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

CI pass
  • Loading branch information
mchades authored Dec 16, 2024
1 parent 8732175 commit fce1bd9
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptyCatalogException;
import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.integration.test.container.ContainerSuite;
Expand Down Expand Up @@ -136,8 +137,8 @@ public void startup() throws IOException, SQLException {

mysqlService = new MysqlService(MYSQL_CONTAINER, TEST_DB_NAME);
createMetalake();
createCatalog();
createSchema();
catalog = createCatalog(catalogName);
createSchema(catalog, schemaName);
}

@AfterAll
Expand All @@ -153,7 +154,7 @@ public void stop() {
@AfterEach
public void resetSchema() {
clearTableAndSchema();
createSchema();
createSchema(catalog, schemaName);
}

private void clearTableAndSchema() {
Expand All @@ -176,7 +177,7 @@ private void createMetalake() {
metalake = loadMetalake;
}

private void createCatalog() throws SQLException {
private Catalog createCatalog(String catalogName) throws SQLException {
Map<String, String> catalogProperties = Maps.newHashMap();

catalogProperties.put(
Expand All @@ -196,10 +197,10 @@ private void createCatalog() throws SQLException {
Catalog loadCatalog = metalake.loadCatalog(catalogName);
Assertions.assertEquals(createdCatalog, loadCatalog);

catalog = loadCatalog;
return loadCatalog;
}

private void createSchema() {
private void createSchema(Catalog catalog, String schemaName) {
Map<String, String> prop = Maps.newHashMap();

Schema createdSchema = catalog.asSchemas().createSchema(schemaName, schema_comment, prop);
Expand Down Expand Up @@ -257,6 +258,25 @@ private Map<String, String> createProperties() {
return properties;
}

@Test
void testDropCatalog() throws SQLException {
// test drop catalog with legacy entity
String catalogName = GravitinoITUtils.genRandomName("drop_catalog_it");
Catalog catalog = createCatalog(catalogName);
String schemaName = GravitinoITUtils.genRandomName("drop_catalog_it");
createSchema(catalog, schemaName);

metalake.disableCatalog(catalogName);
Assertions.assertThrows(
NonEmptyCatalogException.class, () -> metalake.dropCatalog(catalogName));

// drop database externally
String sql = String.format("DROP DATABASE %s", schemaName);
mysqlService.executeQuery(sql);

Assertions.assertTrue(metalake.dropCatalog(catalogName));
}

@Test
void testTestConnection() throws SQLException {
Map<String, String> catalogProperties = Maps.newHashMap();
Expand Down
87 changes: 73 additions & 14 deletions core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -643,24 +643,25 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
"Catalog %s is in use, please disable it first or use force option", ident);
}

List<SchemaEntity> schemas =
store.list(
Namespace.of(ident.namespace().level(0), ident.name()),
SchemaEntity.class,
EntityType.SCHEMA);
Namespace schemaNamespace = Namespace.of(ident.namespace().level(0), ident.name());
CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);

List<SchemaEntity> schemaEntities =
store.list(schemaNamespace, SchemaEntity.class, EntityType.SCHEMA);
CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG, CatalogEntity.class);

if (!schemas.isEmpty() && !force) {
// the Kafka catalog is special, it includes a default schema
if (!catalogEntity.getProvider().equals("kafka") || schemas.size() > 1) {
throw new NonEmptyCatalogException(
"Catalog %s has schemas, please drop them first or use force option", ident);
}
if (containsUserCreatedSchemas(schemaEntities, catalogEntity, catalogWrapper) && !force) {
throw new NonEmptyCatalogException(
"Catalog %s has schemas, please drop them first or use force option", ident);
}

CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
if (includeManagedEntities(catalogEntity)) {
schemas.forEach(
// code reach here in two cases:
// 1. the catalog does not have available schemas
// 2. the catalog has available schemas, and force is true
// for case 1, the forEach block can drop them without any side effect
// for case 2, the forEach block will drop all managed sub-entities
schemaEntities.forEach(
schema -> {
try {
catalogWrapper.doWithSchemaOps(
Expand All @@ -677,11 +678,69 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
} catch (NoSuchMetalakeException | NoSuchCatalogException ignored) {
return false;

} catch (IOException e) {
} catch (GravitinoRuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Check if the given list of schema entities contains any currently existing user-created
* schemas.
*
* <p>This method determines if there are valid user-created schemas by comparing the provided
* schema entities with the actual schemas currently existing in the external data source. It
* excludes:
*
* <ul>
* <li>1. Automatically generated schemas (such as Kafka catalog's "default" schema or
* JDBC-PostgreSQL catalog's "public" schema).
* <li>2. Schemas that have been dropped externally but still exist in the entity store.
* </ul>
*
* @param schemaEntities The list of schema entities to check.
* @param catalogEntity The catalog entity to which the schemas belong.
* @param catalogWrapper The catalog wrapper for the catalog.
* @return True if the list of schema entities contains any valid user-created schemas, false
* otherwise.
* @throws Exception If an error occurs while checking the schemas.
*/
private boolean containsUserCreatedSchemas(
List<SchemaEntity> schemaEntities, CatalogEntity catalogEntity, CatalogWrapper catalogWrapper)
throws Exception {
if (schemaEntities.isEmpty()) {
return false;
}

if (schemaEntities.size() == 1) {
if ("kafka".equals(catalogEntity.getProvider())) {
return false;

} else if ("jdbc-postgresql".equals(catalogEntity.getProvider())) {
// PostgreSQL catalog includes the "public" schema, see
// https://github.com/apache/gravitino/issues/2314
return !schemaEntities.get(0).name().equals("public");
}
}

NameIdentifier[] allSchemas =
catalogWrapper.doWithSchemaOps(
schemaOps ->
schemaOps.listSchemas(
Namespace.of(catalogEntity.namespace().level(0), catalogEntity.name())));
if (allSchemas.length == 0) {
return false;
}

Set<String> availableSchemaNames =
Arrays.stream(allSchemas).map(NameIdentifier::name).collect(Collectors.toSet());

// some schemas are dropped externally, but still exist in the entity store, those schemas are
// invalid
return schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
}

private boolean includeManagedEntities(CatalogEntity catalogEntity) {
return catalogEntity.getType().equals(FILESET);
}
Expand Down

0 comments on commit fce1bd9

Please sign in to comment.