Skip to content

Commit

Permalink
[#5739] feat(model-catalog): Implement the model catalog logic (#5848)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds the model catalog implementation.

### Why are the changes needed?

This is a part of work to support model management in Gravitino.

Fix: #5739 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added UTs to cover the changes.
  • Loading branch information
jerryshao authored Dec 18, 2024
1 parent 5e9919e commit a4190e1
Show file tree
Hide file tree
Showing 24 changed files with 1,812 additions and 187 deletions.
20 changes: 12 additions & 8 deletions api/src/main/java/org/apache/gravitino/model/ModelCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ default boolean modelExists(NameIdentifier ident) {
* @param properties The properties of the model. The properties are optional and can be null or
* empty.
* @return The registered model object.
* @throws NoSuchSchemaException If the schema does not exist.
* @throws ModelAlreadyExistsException If the model already registered.
*/
default Model registerModel(NameIdentifier ident, String comment, Map<String, String> properties)
throws ModelAlreadyExistsException {
return registerModel(ident, null, new String[0], comment, properties);
}
Model registerModel(NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchSchemaException, ModelAlreadyExistsException;

/**
* Register a model in the catalog if the model is not existed, otherwise the {@link
Expand All @@ -99,16 +98,22 @@ default Model registerModel(NameIdentifier ident, String comment, Map<String, St
* @param properties The properties of the model. The properties are optional and can be null or
* empty.
* @return The registered model object.
* @throws NoSuchSchemaException If the schema does not exist when register a model.
* @throws ModelAlreadyExistsException If the model already registered.
* @throws ModelVersionAliasesAlreadyExistException If the aliases already exist in the model.
*/
Model registerModel(
default Model registerModel(
NameIdentifier ident,
String uri,
String[] aliases,
String comment,
Map<String, String> properties)
throws ModelAlreadyExistsException, ModelVersionAliasesAlreadyExistException;
throws NoSuchSchemaException, ModelAlreadyExistsException,
ModelVersionAliasesAlreadyExistException {
Model model = registerModel(ident, comment, properties);
linkModelVersion(ident, uri, aliases, comment, properties);
return model;
}

/**
* Delete the model from the catalog. If the model does not exist, return false. Otherwise, return
Expand Down Expand Up @@ -197,11 +202,10 @@ default boolean modelVersionExists(NameIdentifier ident, String alias) {
* @param comment The comment of the model version. The comment is optional and can be null.
* @param properties The properties of the model version. The properties are optional and can be
* null or empty.
* @return The model version object.
* @throws NoSuchModelException If the model does not exist.
* @throws ModelVersionAliasesAlreadyExistException If the aliases already exist in the model.
*/
ModelVersion linkModelVersion(
void linkModelVersion(
NameIdentifier ident,
String uri,
String[] aliases,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.catalog.ManagedSchemaOperations;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
Expand All @@ -74,7 +74,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopCatalogOperations implements CatalogOperations, SupportsSchemas, FilesetCatalog {
public class HadoopCatalogOperations extends ManagedSchemaOperations
implements CatalogOperations, FilesetCatalog {
private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist";
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist";
private static final String SLASH = "/";
Expand Down Expand Up @@ -104,7 +105,8 @@ public HadoopCatalogOperations() {
this(GravitinoEnv.getInstance().entityStore());
}

public EntityStore getStore() {
@Override
public EntityStore store() {
return store;
}

Expand Down Expand Up @@ -451,19 +453,6 @@ public String getFileLocation(NameIdentifier ident, String subPath)
return fileLocation;
}

@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
try {
List<SchemaEntity> schemas =
store.list(namespace, SchemaEntity.class, Entity.EntityType.SCHEMA);
return schemas.stream()
.map(s -> NameIdentifier.of(namespace, s.name()))
.toArray(NameIdentifier[]::new);
} catch (IOException e) {
throw new RuntimeException("Failed to list schemas under namespace " + namespace, e);
}
}

@Override
public Schema createSchema(NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
Expand Down Expand Up @@ -496,53 +485,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map<String, Str
}
}

StringIdentifier stringId = StringIdentifier.fromProperties(properties);
Preconditions.checkNotNull(stringId, "Property String identifier should not be null");

SchemaEntity schemaEntity =
SchemaEntity.builder()
.withName(ident.name())
.withId(stringId.id())
.withNamespace(ident.namespace())
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();
try {
store.put(schemaEntity, true /* overwrite */);
} catch (IOException ioe) {
throw new RuntimeException("Failed to create schema " + ident, ioe);
}

return HadoopSchema.builder()
.withName(ident.name())
.withComment(comment)
.withProperties(schemaEntity.properties())
.withAuditInfo(schemaEntity.auditInfo())
.build();
}

@Override
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
try {
SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);

return HadoopSchema.builder()
.withName(ident.name())
.withComment(schemaEntity.comment())
.withProperties(schemaEntity.properties())
.withAuditInfo(schemaEntity.auditInfo())
.build();

} catch (NoSuchEntityException exception) {
throw new NoSuchSchemaException(exception, SCHEMA_DOES_NOT_EXIST_MSG, ident);
} catch (IOException ioe) {
throw new RuntimeException("Failed to load schema " + ident, ioe);
}
return super.createSchema(ident, comment, properties);
}

@Override
Expand All @@ -556,32 +499,7 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
throw new RuntimeException("Failed to check if schema " + ident + " exists", ioe);
}

try {
SchemaEntity entity =
store.update(
ident,
SchemaEntity.class,
Entity.EntityType.SCHEMA,
schemaEntity -> updateSchemaEntity(ident, schemaEntity, changes));

return HadoopSchema.builder()
.withName(ident.name())
.withComment(entity.comment())
.withProperties(entity.properties())
.withAuditInfo(entity.auditInfo())
.build();

} catch (IOException ioe) {
throw new RuntimeException("Failed to update schema " + ident, ioe);
} catch (NoSuchEntityException nsee) {
throw new NoSuchSchemaException(nsee, SCHEMA_DOES_NOT_EXIST_MSG, ident);
} catch (AlreadyExistsException aee) {
throw new RuntimeException(
"Schema with the same name "
+ ident.name()
+ " already exists, this is unexpected because schema doesn't support rename",
aee);
}
return super.alterSchema(ident, changes);
}

@Override
Expand All @@ -600,6 +518,16 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
throw new NonEmptySchemaException("Schema %s is not empty", ident);
}

SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
Map<String, String> properties =
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
Path schemaPath = getSchemaPath(ident.name(), properties);

boolean dropped = super.dropSchema(ident, cascade);
if (!dropped) {
return false;
}

// Delete all the managed filesets no matter whether the storage location is under the
// schema path or not.
// The reason why we delete the managed fileset's storage location one by one is because we
Expand Down Expand Up @@ -635,30 +563,21 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
}
});

SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
Map<String, String> properties =
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());

// Delete the schema path if it exists and is empty.
Path schemaPath = getSchemaPath(ident.name(), properties);
// Nothing to delete if the schema path is not set.
if (schemaPath == null) {
return false;
}

FileSystem fs = getFileSystem(schemaPath, conf);
// Nothing to delete if the schema path does not exist.
if (!fs.exists(schemaPath)) {
return false;
}

FileStatus[] statuses = fs.listStatus(schemaPath);
if (statuses.length == 0) {
if (fs.delete(schemaPath, true)) {
LOG.info("Deleted schema {} location {}", ident, schemaPath);
} else {
LOG.warn("Failed to delete schema {} location {}", ident, schemaPath);
return false;
if (schemaPath != null) {
FileSystem fs = getFileSystem(schemaPath, conf);
if (fs.exists(schemaPath)) {
FileStatus[] statuses = fs.listStatus(schemaPath);
if (statuses.length == 0) {
if (fs.delete(schemaPath, true)) {
LOG.info("Deleted schema {} location {}", ident, schemaPath);
} else {
LOG.warn(
"Failed to delete schema {} because it has files/folders under location {}",
ident,
schemaPath);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public boolean dropFileset(NameIdentifier ident) {
try {
filesetEntity =
hadoopCatalogOperations
.getStore()
.store()
.get(ident, Entity.EntityType.FILESET, FilesetEntity.class);
} catch (NoSuchEntityException e) {
LOG.warn("Fileset {} does not exist", ident);
Expand Down Expand Up @@ -143,9 +143,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map<String, Str
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
try {
SchemaEntity schemaEntity =
hadoopCatalogOperations
.getStore()
.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
hadoopCatalogOperations.store().get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
Map<String, String> properties =
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ public void testDropSchema() throws IOException {
Assertions.assertFalse(fs.exists(schemaPath));

// Test drop non-empty schema with cascade = false
createSchema(name, comment, catalogPath, null);
Fileset fs1 = createFileset("fs1", name, "comment", Fileset.Type.MANAGED, catalogPath, null);
Path fs1Path = new Path(fs1.storageLocation());

Expand All @@ -459,6 +460,7 @@ public void testDropSchema() throws IOException {
Assertions.assertFalse(fs.exists(fs1Path));

// Test drop both managed and external filesets
createSchema(name, comment, catalogPath, null);
Fileset fs2 = createFileset("fs2", name, "comment", Fileset.Type.MANAGED, catalogPath, null);
Path fs2Path = new Path(fs2.storageLocation());

Expand All @@ -472,6 +474,7 @@ public void testDropSchema() throws IOException {
Assertions.assertTrue(fs.exists(fs3Path));

// Test drop schema with different storage location
createSchema(name, comment, catalogPath, null);
Path fs4Path = new Path(TEST_ROOT_PATH + "/fs4");
createFileset("fs4", name, "comment", Fileset.Type.MANAGED, catalogPath, fs4Path.toString());
ops.dropSchema(id, true);
Expand Down
10 changes: 8 additions & 2 deletions catalogs/catalog-model/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ dependencies {
exclude(group = "*")
}

compileOnly(libs.guava)

implementation(libs.guava)
implementation(libs.slf4j.api)

testImplementation(project(":clients:client-java"))
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))

testImplementation(libs.bundles.log4j)
testImplementation(libs.commons.io)
testImplementation(libs.commons.lang3)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.inline)
testImplementation(libs.junit.jupiter.api)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import java.util.Map;
import org.apache.gravitino.CatalogProvider;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.capability.Capability;

public class ModelCatalog extends BaseCatalog<ModelCatalog> {
public class ModelCatalogImpl extends BaseCatalog<ModelCatalogImpl> {

private static final ModelCatalogPropertiesMetadata CATALOG_PROPERTIES_META =
new ModelCatalogPropertiesMetadata();
Expand All @@ -43,7 +45,8 @@ public String shortName() {

@Override
protected CatalogOperations newOps(Map<String, String> config) {
return null;
EntityStore store = GravitinoEnv.getInstance().entityStore();
return new ModelCatalogOperations(store);
}

@Override
Expand Down
Loading

0 comments on commit a4190e1

Please sign in to comment.