diff --git a/catalog-hive/build.gradle.kts b/catalog-hive/build.gradle.kts index ad6a716356b..03af7edf7b4 100644 --- a/catalog-hive/build.gradle.kts +++ b/catalog-hive/build.gradle.kts @@ -76,4 +76,5 @@ dependencies { testImplementation(libs.slf4j.jdk14) testImplementation(libs.junit.jupiter.api) testRuntimeOnly(libs.junit.jupiter.engine) + testImplementation(libs.mockito.core) } diff --git a/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java b/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java index 101fbe5eb28..20827c9fcba 100644 --- a/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java +++ b/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java @@ -4,11 +4,17 @@ */ package com.datastrato.graviton.catalog.hive; +import static com.datastrato.graviton.Entity.EntityType.SCHEMA; +import static com.datastrato.graviton.Entity.EntityType.TABLE; import static com.datastrato.graviton.catalog.hive.HiveTable.HMS_TABLE_COMMENT; import static com.datastrato.graviton.catalog.hive.HiveTable.SUPPORT_TABLE_TYPES; +import com.datastrato.graviton.EntityAlreadyExistsException; +import com.datastrato.graviton.EntityStore; +import com.datastrato.graviton.GravitonEnv; import com.datastrato.graviton.NameIdentifier; import com.datastrato.graviton.Namespace; +import com.datastrato.graviton.NoSuchEntityException; import com.datastrato.graviton.catalog.CatalogOperations; import com.datastrato.graviton.catalog.hive.converter.ToHiveType; import com.datastrato.graviton.exceptions.NoSuchCatalogException; @@ -34,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; @@ -148,30 +155,33 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map { - client.createDatabase(hiveSchema.toInnerDB()); - return null; - }); - - // TODO. We should also store the customized HiveSchema entity fields into our own - // underlying storage, like id, auditInfo, etc. + store.executeInTransaction( + () -> { + HiveSchema createdSchema = + new HiveSchema.Builder() + .withId(1L /*TODO. Use ID generator*/) + .withCatalogId(entity.getId()) + .withName(ident.name()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withProperties(metadata) + .withAuditInfo( + new AuditInfo.Builder() + .withCreator(currentUser()) + .withCreateTime(Instant.now()) + .build()) + .withConf(hiveConf) + .build(); + store.put(createdSchema, false); + clientPool.run( + client -> { + client.createDatabase(createdSchema.toInnerDB()); + return null; + }); + return createdSchema; + }); LOG.info("Created Hive schema (database) {} in Hive Metastore", ident.name()); @@ -180,13 +190,24 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map client.getDatabase(ident.name())); HiveSchema.Builder builder = new HiveSchema.Builder(); - // TODO. We should also fetch the customized HiveSchema entity fields from our own - // underlying storage, like id, auditInfo, etc. + EntityStore store = GravitonEnv.getInstance().entityStore(); + BaseSchema baseSchema = store.get(ident, SCHEMA, BaseSchema.class); builder = builder - .withId(1L /* TODO. Fetch id from underlying storage */) - .withCatalogId(entity.getId()) + .withId(baseSchema.getId()) + .withCatalogId(baseSchema.getCatalogId()) .withNamespace(ident.namespace()) - .withAuditInfo( - /* TODO. Fetch audit info from underlying storage */ - new AuditInfo.Builder() - .withCreator(currentUser()) - .withCreateTime(Instant.now()) - .build()) + .withAuditInfo(baseSchema.auditInfo()) .withConf(hiveConf); HiveSchema hiveSchema = HiveSchema.fromInnerDB(database, builder); @@ -233,12 +249,17 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException return hiveSchema; } catch (NoSuchObjectException | UnknownDBException e) { + deleteSchemaFromStore(ident); throw new NoSuchSchemaException( String.format( "Hive schema (database) does not exist: %s in Hive Metastore", ident.name()), e); - // TODO. We should also delete Hive schema (database) from our own underlying storage + } catch (NoSuchEntityException e) { + throw new NoSuchSchemaException( + String.format( + "Hive schema (database) does not exist: %s in Graviton store", ident.name()), + e); } catch (TException e) { throw new RuntimeException( @@ -246,6 +267,10 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException } catch (InterruptedException e) { throw new RuntimeException(e); + + } catch (IOException ioe) { + LOG.error("Failed to load hive schema {}", ident, ioe); + throw new RuntimeException(ioe); } } @@ -293,38 +318,64 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes) Database alteredDatabase = database.deepCopy(); alteredDatabase.setParameters(metadata); - clientPool.run( - client -> { - client.alterDatabase(ident.name(), alteredDatabase); - return null; - }); - - // TODO. We should also update the customized HiveSchema entity fields into our own if - // necessary - HiveSchema.Builder builder = new HiveSchema.Builder(); - builder = - builder - .withId(1L /* TODO. Fetch id from underlying storage */) - .withCatalogId(entity.getId()) - .withNamespace(ident.namespace()) - .withAuditInfo( - /* TODO. Fetch audit info from underlying storage */ - new AuditInfo.Builder() - .withCreator(currentUser()) - .withCreateTime(Instant.now()) - .withLastModifier(currentUser()) - .withLastModifiedTime(Instant.now()) - .build()) - .withConf(hiveConf); - HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder); + // update store transactionally + EntityStore store = GravitonEnv.getInstance().entityStore(); + HiveSchema alteredHiveSchema = + store.executeInTransaction( + () -> { + BaseSchema oldSchema = store.get(ident, SCHEMA, BaseSchema.class); + HiveSchema.Builder builder = new HiveSchema.Builder(); + builder = + builder + .withId(oldSchema.getId()) + .withCatalogId(oldSchema.getCatalogId()) + .withNamespace(ident.namespace()) + .withAuditInfo( + new AuditInfo.Builder() + .withCreator(oldSchema.auditInfo().creator()) + .withCreateTime(oldSchema.auditInfo().createTime()) + .withLastModifier(currentUser()) + .withLastModifiedTime(Instant.now()) + .build()) + .withConf(hiveConf); + HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder); + + // To be on the safe side, here uses delete before put (although hive schema does + // not support rename yet) + store.delete(ident, SCHEMA); + store.put(hiveSchema, false); + clientPool.run( + client -> { + client.alterDatabase(ident.name(), alteredDatabase); + return null; + }); + return hiveSchema; + }); LOG.info("Altered Hive schema (database) {} in Hive Metastore", ident.name()); // todo(xun): hive does not support renaming database name directly, // perhaps we can use namespace to mapping the database names indirectly - return hiveSchema; + return alteredHiveSchema; + + } catch (NoSuchObjectException e) { + throw new NoSuchSchemaException( + String.format("Hive schema (database) %s does not exist in Hive Metastore", ident.name()), + e); + + } catch (EntityAlreadyExistsException e) { + throw new NoSuchSchemaException( + "The new Hive schema (database) name already exist in Graviton store", e); } catch (TException | InterruptedException e) { + throw new RuntimeException( + "Failed to alter Hive schema (database) " + ident.name() + " in Hive metastore", e); + + } catch (IOException e) { + throw new RuntimeException( + "Failed to alter Hive schema (database) " + ident.name() + " in Graviton store", e); + + } catch (Exception e) { throw new RuntimeException(e); } } @@ -348,15 +399,39 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty return false; } + EntityStore store = GravitonEnv.getInstance().entityStore(); + Namespace schemaNamespace = + Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())); + if (!cascade) { + if (listTables(schemaNamespace).length > 0) { + throw new NonEmptySchemaException( + String.format( + "Hive schema (database) %s is not empty. One or more tables exist in Hive metastore.", + ident.name())); + } + // TODO(minghuang): check if there are tables in Graviton store after we support hive table + // serde + } + try { - clientPool.run( - client -> { - client.dropDatabase(ident.name(), false, false, cascade); + store.executeInTransaction( + () -> { + store.delete(ident, SCHEMA); + for (BaseTable t : + store.list( + Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())), + HiveTable.class, + TABLE)) { + store.delete(NameIdentifier.of(schemaNamespace, t.name()), TABLE); + } + clientPool.run( + client -> { + client.dropDatabase(ident.name(), false, false, cascade); + return null; + }); return null; }); - // TODO. we should also delete the Hive schema (database) from our own underlying storage - LOG.info("Dropped Hive schema (database) {}", ident.name()); return true; @@ -367,6 +442,7 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty e); } catch (NoSuchObjectException e) { + deleteSchemaFromStore(ident); LOG.warn("Hive schema (database) {} does not exist in Hive Metastore", ident.name()); return false; @@ -374,11 +450,24 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty throw new RuntimeException( "Failed to drop Hive schema (database) " + ident.name() + " in Hive Metastore", e); - } catch (InterruptedException e) { + } catch (IOException e) { + throw new RuntimeException( + "Failed to drop Hive schema (database) " + ident.name() + " in Graviton store", e); + + } catch (Exception e) { throw new RuntimeException(e); } } + private void deleteSchemaFromStore(NameIdentifier ident) { + EntityStore store = GravitonEnv.getInstance().entityStore(); + try { + store.delete(ident, SCHEMA); + } catch (IOException ex) { + LOG.error("Failed to delete hive schema {} from Graviton store", ident, ex); + } + } + /** * Lists all the tables under the specified namespace. * @@ -415,9 +504,11 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep } catch (UnknownDBException e) { throw new NoSuchSchemaException( "Schema (database) does not exist " + namespace + " in Hive Metastore"); + } catch (TException e) { throw new RuntimeException( "Failed to list all tables under the namespace : " + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -469,6 +560,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException { } catch (TException e) { throw new NoSuchTableException( String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e); + } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveSchemaTest.java b/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveSchemaTest.java index 4c2ca83df67..a2c0aae6b8a 100644 --- a/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveSchemaTest.java +++ b/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveSchemaTest.java @@ -4,21 +4,65 @@ */ package com.datastrato.graviton.catalog.hive; +import static com.datastrato.graviton.Configs.DEFUALT_ENTITY_KV_STORE; +import static com.datastrato.graviton.Configs.ENTITY_KV_STORE; +import static com.datastrato.graviton.Configs.ENTITY_STORE; +import static com.datastrato.graviton.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH; +import static com.datastrato.graviton.Entity.EntityType.SCHEMA; + +import com.datastrato.graviton.Config; +import com.datastrato.graviton.Configs; +import com.datastrato.graviton.EntityStore; +import com.datastrato.graviton.GravitonEnv; import com.datastrato.graviton.NameIdentifier; import com.datastrato.graviton.Namespace; import com.datastrato.graviton.catalog.hive.miniHMS.MiniHiveMetastoreService; +import com.datastrato.graviton.exceptions.SchemaAlreadyExistsException; import com.datastrato.graviton.meta.AuditInfo; import com.datastrato.graviton.meta.CatalogEntity; import com.datastrato.graviton.rel.Schema; import com.datastrato.graviton.rel.SchemaChange; import com.google.common.collect.Maps; +import java.io.IOException; import java.time.Instant; import java.util.Arrays; import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class HiveSchemaTest extends MiniHiveMetastoreService { + + private static final String ROCKS_DB_STORE_PATH = "/tmp/graviton/test_hive_schema"; + + private static EntityStore store; + + @BeforeAll + private static void setup() { + Config config = Mockito.mock(Config.class); + Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv"); + Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFUALT_ENTITY_KV_STORE); + Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto"); + Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH); + Mockito.when(config.get(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS)) + .thenReturn(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS.getDefaultValue()); + + GravitonEnv.getInstance().initialize(config); + store = GravitonEnv.getInstance().entityStore(); + } + + @AfterAll + private static void tearDown() { + try { + FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH)); + } catch (Exception e) { + // Ignore + } + } + HiveCatalog initHiveCatalog() { AuditInfo auditInfo = new AuditInfo.Builder().withCreator("creator").withCreateTime(Instant.now()).build(); @@ -41,7 +85,7 @@ HiveCatalog initHiveCatalog() { } @Test - public void testCreateHiveSchema() { + public void testCreateHiveSchema() throws IOException { HiveCatalog hiveCatalog = initHiveCatalog(); NameIdentifier ident = NameIdentifier.of("metalake", hiveCatalog.name(), genRandomName()); @@ -59,6 +103,7 @@ public void testCreateHiveSchema() { NameIdentifier[] idents = hiveCatalog.asSchemas().listSchemas(ident.namespace()); Assertions.assertTrue(Arrays.asList(idents).contains(ident)); + Assertions.assertTrue(store.exists(ident, SCHEMA)); // Test illegal identifier NameIdentifier ident1 = NameIdentifier.of("metalake", hiveCatalog.name()); @@ -70,6 +115,15 @@ public void testCreateHiveSchema() { }); Assertions.assertTrue( exception.getMessage().contains("Cannot support invalid namespace in Hive Metastore")); + + // Test schema already exists + exception = + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> { + hiveCatalog.asSchemas().createSchema(ident, comment, properties); + }); + Assertions.assertTrue(exception.getMessage().contains("already exists in Graviton store")); } @Test @@ -111,7 +165,7 @@ public void testAlterSchema() { } @Test - public void testDropSchema() { + public void testDropSchema() throws IOException { HiveCatalog hiveCatalog = initHiveCatalog(); NameIdentifier ident = NameIdentifier.of("metalake", hiveCatalog.name(), genRandomName()); @@ -124,5 +178,6 @@ public void testDropSchema() { Assertions.assertTrue(hiveCatalog.asSchemas().schemaExists(ident)); hiveCatalog.asSchemas().dropSchema(ident, true); Assertions.assertFalse(hiveCatalog.asSchemas().schemaExists(ident)); + Assertions.assertFalse(store.exists(ident, SCHEMA)); } } diff --git a/core/src/main/java/com/datastrato/graviton/GravitonEnv.java b/core/src/main/java/com/datastrato/graviton/GravitonEnv.java index 0daca2aa941..962e1d27de8 100644 --- a/core/src/main/java/com/datastrato/graviton/GravitonEnv.java +++ b/core/src/main/java/com/datastrato/graviton/GravitonEnv.java @@ -7,6 +7,7 @@ import com.datastrato.graviton.catalog.CatalogManager; import com.datastrato.graviton.catalog.CatalogOperationDispatcher; import com.datastrato.graviton.meta.MetalakeManager; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +97,7 @@ public EntitySerDe entitySerDe() { * @return The EntityStore instance. */ public EntityStore entityStore() { + Preconditions.checkNotNull(entityStore, "GravitonEnv is not initialized."); return entityStore; } diff --git a/core/src/main/java/com/datastrato/graviton/meta/rel/BaseSchema.java b/core/src/main/java/com/datastrato/graviton/meta/rel/BaseSchema.java index 64730614ae2..f51fbd801e2 100644 --- a/core/src/main/java/com/datastrato/graviton/meta/rel/BaseSchema.java +++ b/core/src/main/java/com/datastrato/graviton/meta/rel/BaseSchema.java @@ -16,12 +16,13 @@ import java.util.Map; import javax.annotation.Nullable; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.ToString; /** An abstract class representing a base schema in a relational database. */ @EqualsAndHashCode @ToString -public abstract class BaseSchema implements Schema, Entity, HasIdentifier { +public class BaseSchema implements Schema, Entity, HasIdentifier { public static final Field ID = Field.required("id", Long.class, "The schema's unique identifier"); public static final Field CATALOG_ID = @@ -34,15 +35,15 @@ public abstract class BaseSchema implements Schema, Entity, HasIdentifier { public static final Field AUDIT_INFO = Field.required("audit_info", AuditInfo.class, "The audit details of the schema"); - protected Long id; + @Getter protected Long id; - protected Long catalogId; + @Getter protected Long catalogId; protected String name; - @Nullable protected String comment; + @Nullable @Getter protected String comment; - @Nullable protected Map properties; + @Nullable @Getter protected Map properties; protected AuditInfo auditInfo; @@ -269,4 +270,20 @@ private SELF self() { protected abstract T internalBuild(); } + + public static class SchemaBuilder extends BaseSchemaBuilder { + + @Override + protected BaseSchema internalBuild() { + BaseSchema baseSchema = new BaseSchema(); + baseSchema.id = id; + baseSchema.catalogId = catalogId; + baseSchema.name = name; + baseSchema.comment = comment; + baseSchema.properties = properties; + baseSchema.auditInfo = auditInfo; + + return baseSchema; + } + } } diff --git a/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java b/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java index a6a11b07e4a..42296bb26dd 100644 --- a/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java +++ b/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java @@ -24,13 +24,22 @@ public class ProtoEntitySerDe implements EntitySerDe { "com.datastrato.graviton.meta.BaseMetalake", "com.datastrato.graviton.proto.BaseMetalakeSerDe", "com.datastrato.graviton.meta.CatalogEntity", - "com.datastrato.graviton.proto.CatalogEntitySerDe"); + "com.datastrato.graviton.proto.CatalogEntitySerDe", + "com.datastrato.graviton.meta.rel.BaseSchema", + "com.datastrato.graviton.proto.SchemaEntitySerDe", + "com.datastrato.graviton.catalog.hive.HiveSchema", + "com.datastrato.graviton.proto.SchemaEntitySerDe"); private static final Map ENTITY_TO_PROTO = ImmutableMap.of( - "com.datastrato.graviton.meta.AuditInfo", "com.datastrato.graviton.proto.AuditInfo", - "com.datastrato.graviton.meta.BaseMetalake", "com.datastrato.graviton.proto.Metalake", - "com.datastrato.graviton.meta.CatalogEntity", "com.datastrato.graviton.proto.Catalog"); + "com.datastrato.graviton.meta.AuditInfo", + "com.datastrato.graviton.proto.AuditInfo", + "com.datastrato.graviton.meta.BaseMetalake", + "com.datastrato.graviton.proto.Metalake", + "com.datastrato.graviton.meta.CatalogEntity", + "com.datastrato.graviton.proto.Catalog", + "com.datastrato.graviton.meta.rel.BaseSchema", + "com.datastrato.graviton.proto.Schema"); private final Map, ProtoSerDe> entityToSerDe; diff --git a/core/src/main/java/com/datastrato/graviton/proto/SchemaEntitySerDe.java b/core/src/main/java/com/datastrato/graviton/proto/SchemaEntitySerDe.java new file mode 100644 index 00000000000..1cf18b51f87 --- /dev/null +++ b/core/src/main/java/com/datastrato/graviton/proto/SchemaEntitySerDe.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.proto; + +import com.datastrato.graviton.meta.rel.BaseSchema; + +public class SchemaEntitySerDe implements ProtoSerDe { + @Override + public Schema serialize(BaseSchema schemaEntity) { + return Schema.newBuilder() + .setId(schemaEntity.getId()) + .setCatalogId(schemaEntity.getCatalogId()) + .setName(schemaEntity.name()) + .setAuditInfo(new AuditInfoSerDe().serialize(schemaEntity.auditInfo())) + .build(); + } + + @Override + public BaseSchema deserialize(Schema p) { + return new BaseSchema.SchemaBuilder() + .withId(p.getId()) + .withCatalogId(p.getCatalogId()) + .withName(p.getName()) + .withAuditInfo(new AuditInfoSerDe().deserialize(p.getAuditInfo())) + .build(); + } +} diff --git a/meta/src/main/proto/graviton_meta.proto b/meta/src/main/proto/graviton_meta.proto index 2c055e4864c..9be83f30b43 100644 --- a/meta/src/main/proto/graviton_meta.proto +++ b/meta/src/main/proto/graviton_meta.proto @@ -55,4 +55,11 @@ message Catalog { optional string comment = 5; map properties = 6; AuditInfo audit_info = 7; +} + +message Schema { + uint64 id = 1; + uint64 catalog_id = 2; + string name = 3; + AuditInfo audit_info = 4; } \ No newline at end of file