From bb2919574123b64af156e9ac0b46c58969f08a1b Mon Sep 17 00:00:00 2001 From: mchades Date: Mon, 14 Aug 2023 17:33:35 +0800 Subject: [PATCH] [#209]feat(catalog): Hive table entity serde and store support (#233) ### What changes were proposed in this pull request? SerDe Hive table entity as **Table Proto** in Graviton store while operating Hive table ### Why are the changes needed? we could store the additional entity information to our own storage. Fix: #209 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UTs added --- .../catalog/hive/HiveCatalogOperations.java | 188 +++++++++++------- .../graviton/catalog/hive/HiveTableTest.java | 53 ++++- .../graviton/meta/rel/BaseTable.java | 28 ++- .../graviton/proto/ProtoEntitySerDe.java | 38 ++-- .../graviton/proto/TableEntitySerde.java | 29 +++ meta/src/main/proto/graviton_meta.proto | 7 + 6 files changed, 243 insertions(+), 100 deletions(-) create mode 100644 core/src/main/java/com/datastrato/graviton/proto/TableEntitySerde.java diff --git a/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java b/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java index 20827c9fcba..5aa79776e1c 100644 --- a/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java +++ b/catalog-hive/src/main/java/com/datastrato/graviton/catalog/hive/HiveCatalogOperations.java @@ -35,6 +35,7 @@ import com.datastrato.graviton.rel.TableChange; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import java.io.IOException; import java.time.Instant; import java.util.List; @@ -402,6 +403,7 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty EntityStore store = GravitonEnv.getInstance().entityStore(); Namespace schemaNamespace = Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())); + List tables = Lists.newArrayList(); if (!cascade) { if (listTables(schemaNamespace).length > 0) { throw new NonEmptySchemaException( @@ -409,19 +411,25 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty "Hive schema (database) %s is not empty. One or more tables exist in Hive metastore.", ident.name())); } - // TODO(minghuang): check if there are tables in Graviton store after we support hive table - // serde + + try { + tables.addAll(store.list(schemaNamespace, BaseTable.class, TABLE)); + } catch (IOException e) { + throw new RuntimeException("Failed to list table from Graviton store", e); + } + if (!tables.isEmpty()) { + throw new NonEmptySchemaException( + String.format( + "Hive schema (database) %s is not empty. One or more tables exist in Graviton store.", + ident.name())); + } } try { store.executeInTransaction( () -> { store.delete(ident, SCHEMA); - for (BaseTable t : - store.list( - Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())), - HiveTable.class, - TABLE)) { + for (BaseTable t : tables) { store.delete(NameIdentifier.of(schemaNamespace, t.name()), TABLE); } clientPool.run( @@ -531,38 +539,37 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException { String.format( "Cannot support invalid namespace in Hive Metastore: %s", schemaIdent.namespace())); - HiveSchema schema = loadSchema(schemaIdent); try { org.apache.hadoop.hive.metastore.api.Table hiveTable = clientPool.run(c -> c.getTable(schemaIdent.name(), tableIdent.name())); HiveTable.Builder builder = new HiveTable.Builder(); - // TODO: We should also fetch the customized HiveTable entity fields from our own - // underlying storage, like id, auditInfo, etc. + EntityStore store = GravitonEnv.getInstance().entityStore(); + BaseTable baseTable = store.get(tableIdent, TABLE, BaseTable.class); builder = builder - .withId(1L /* TODO: Fetch id from underlying storage */) - .withSchemaId((Long) schema.fields().get(BaseSchema.ID)) + .withId(baseTable.getId()) + .withSchemaId(baseTable.getSchemaId()) .withName(tableIdent.name()) .withNameSpace(tableIdent.namespace()) - .withAuditInfo( - /* TODO: Fetch audit info from underlying storage */ - new AuditInfo.Builder() - .withCreator(currentUser()) - .withCreateTime(Instant.now()) - .build()); + .withAuditInfo(baseTable.auditInfo()); HiveTable table = HiveTable.fromInnerTable(hiveTable, builder); LOG.info("Loaded Hive table {} from Hive Metastore ", tableIdent.name()); return table; - } catch (TException e) { + } catch (NoSuchObjectException e) { throw new NoSuchTableException( String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } catch (InterruptedException | TException e) { + throw new RuntimeException( + "Failed to load Hive table " + tableIdent.name() + " from Hive metastore", e); + + } catch (IOException e) { + throw new RuntimeException( + "Failed to load Hive table " + tableIdent.name() + " from Graviton store", e); } } @@ -593,40 +600,47 @@ public Table createTable( try { HiveSchema schema = loadSchema(schemaIdent); - HiveTable table = - new HiveTable.Builder() - .withId(1L /* TODO: Use ID generator*/) - .withSchemaId((Long) schema.fields().get(BaseSchema.ID)) - .withName(tableIdent.name()) - .withNameSpace(tableIdent.namespace()) - .withColumns(columns) - .withComment(comment) - .withProperties(properties) - .withAuditInfo( - new AuditInfo.Builder() - .withCreator(currentUser()) - .withCreateTime(Instant.now()) - .build()) - .build(); - clientPool.run( - c -> { - c.createTable(table.toInnerTable()); - return null; - }); - - // TODO. We should also store the customized HiveTable entity fields into our own - // underlying storage, like id, auditInfo, etc. + EntityStore store = GravitonEnv.getInstance().entityStore(); + HiveTable hiveTable = + store.executeInTransaction( + () -> { + HiveTable createdTable = + new HiveTable.Builder() + .withId(1L /* TODO: Use ID generator*/) + .withSchemaId(schema.getId()) + .withName(tableIdent.name()) + .withNameSpace(tableIdent.namespace()) + .withColumns(columns) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + new AuditInfo.Builder() + .withCreator(currentUser()) + .withCreateTime(Instant.now()) + .build()) + .build(); + store.put(createdTable, false); + clientPool.run( + c -> { + c.createTable(createdTable.toInnerTable()); + return null; + }); + return createdTable; + }); LOG.info("Created Hive table {} in Hive Metastore", tableIdent.name()); - return table; + return hiveTable; - } catch (AlreadyExistsException e) { + } catch (AlreadyExistsException | EntityAlreadyExistsException e) { throw new TableAlreadyExistsException("Table already exists: " + tableIdent.name(), e); - } catch (TException e) { + } catch (TException | InterruptedException e) { throw new RuntimeException( "Failed to create Hive table " + tableIdent.name() + " in Hive Metastore", e); - } catch (InterruptedException e) { + } catch (IOException e) { + throw new RuntimeException( + "Failed to create Hive table " + tableIdent.name() + " in Graviton store", e); + } catch (Exception e) { throw new RuntimeException(e); } } @@ -704,36 +718,49 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes) } } - clientPool.run( - c -> { - c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable); - return null; - }); + EntityStore store = GravitonEnv.getInstance().entityStore(); + HiveTable updatedTable = + store.executeInTransaction( + () -> { + HiveTable.Builder builder = new HiveTable.Builder(); + builder = + builder + .withId(table.getId()) + .withSchemaId(table.getSchemaId()) + .withName(alteredHiveTable.getTableName()) + .withNameSpace(tableIdent.namespace()) + .withAuditInfo( + new AuditInfo.Builder() + .withCreator(table.auditInfo().creator()) + .withCreateTime(table.auditInfo().createTime()) + .withLastModifier(currentUser()) + .withLastModifiedTime(Instant.now()) + .build()); + HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder); + store.delete(tableIdent, TABLE); + store.put(alteredTable, false); + clientPool.run( + c -> { + c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable); + return null; + }); + return alteredTable; + }); - // TODO(@Minghuang). We should also update the customized HiveTable entity fields into our own - // if necessary - HiveTable.Builder builder = new HiveTable.Builder(); - builder = - builder - .withId((Long) table.fields().get(BaseTable.ID)) - .withSchemaId((Long) table.fields().get(BaseTable.SCHEMA_ID)) - .withName(alteredHiveTable.getTableName()) - .withNameSpace(tableIdent.namespace()) - .withAuditInfo( - /* TODO(@Minghuang): Fetch audit info from underlying storage */ - new AuditInfo.Builder() - .withCreator(currentUser()) - .withCreateTime(Instant.now()) - .build()); - HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder); LOG.info("Altered Hive table {} in Hive Metastore", tableIdent.name()); - return alteredTable; + return updatedTable; } catch (NoSuchObjectException e) { throw new NoSuchTableException( String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e); } catch (TException | InterruptedException e) { + throw new RuntimeException( + "Failed to alter Hive table " + tableIdent.name() + " in Hive metastore", e); + } catch (IOException e) { + throw new RuntimeException( + "Failed to alter Hive table " + tableIdent.name() + " in Graviton store", e); + } catch (Exception e) { throw new RuntimeException(e); } } @@ -886,24 +913,31 @@ private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boo "Cannot support invalid namespace in Hive Metastore: %s", schemaIdent.namespace())); try { - clientPool.run( - c -> { - c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge); + EntityStore store = GravitonEnv.getInstance().entityStore(); + store.executeInTransaction( + () -> { + store.delete(tableIdent, TABLE); + clientPool.run( + c -> { + c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge); + return null; + }); return null; }); - // TODO. we should also delete the Hive table from our own underlying storage - LOG.info("Dropped Hive table {}", tableIdent.name()); return true; } catch (NoSuchObjectException e) { LOG.warn("Hive table {} does not exist in Hive Metastore", tableIdent.name()); return false; - } catch (TException e) { + } catch (TException | InterruptedException e) { throw new RuntimeException( "Failed to drop Hive table " + tableIdent.name() + " in Hive Metastore", e); - } catch (InterruptedException e) { + } catch (IOException e) { + throw new RuntimeException( + "Failed to drop Hive table " + tableIdent.name() + " in Graviton store", e); + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveTableTest.java b/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveTableTest.java index bf7e178e3ae..4383d929ebf 100644 --- a/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveTableTest.java +++ b/catalog-hive/src/test/java/com/datastrato/graviton/catalog/hive/HiveTableTest.java @@ -4,6 +4,16 @@ */ package com.datastrato.graviton.catalog.hive; +import static com.datastrato.graviton.Configs.DEFUALT_ENTITY_KV_STORE; +import static com.datastrato.graviton.Configs.ENTITY_KV_STORE; +import static com.datastrato.graviton.Configs.ENTITY_STORE; +import static com.datastrato.graviton.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH; +import static com.datastrato.graviton.Entity.EntityType.TABLE; + +import com.datastrato.graviton.Config; +import com.datastrato.graviton.Configs; +import com.datastrato.graviton.EntityStore; +import com.datastrato.graviton.GravitonEnv; import com.datastrato.graviton.NameIdentifier; import com.datastrato.graviton.Namespace; import com.datastrato.graviton.catalog.hive.miniHMS.MiniHiveMetastoreService; @@ -15,13 +25,17 @@ import com.datastrato.graviton.rel.TableChange; import com.google.common.collect.Maps; import io.substrait.type.TypeCreator; +import java.io.IOException; import java.time.Instant; import java.util.Arrays; import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class HiveTableTest extends MiniHiveMetastoreService { @@ -30,21 +44,47 @@ public class HiveTableTest extends MiniHiveMetastoreService { private static final String HIVE_CATALOG_NAME = "test_catalog"; private static final String HIVE_SCHEMA_NAME = "test_schema"; private static final String HIVE_COMMENT = "test_comment"; + private static final String ROCKS_DB_STORE_PATH = "/tmp/graviton/test_hive_table"; private static HiveCatalog hiveCatalog; private static HiveSchema hiveSchema; + private static EntityStore store; + @BeforeAll private static void setup() { + mockStore(); initHiveCatalog(); initHiveSchema(); } + @AfterAll + private static void tearDown() { + try { + FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH)); + } catch (Exception e) { + // Ignore + } + } + @AfterEach private void resetSchema() { hiveCatalog.asSchemas().dropSchema(hiveSchema.nameIdentifier(), true); initHiveSchema(); } + private static void mockStore() { + Config config = Mockito.mock(Config.class); + Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv"); + Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFUALT_ENTITY_KV_STORE); + Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto"); + Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH); + Mockito.when(config.get(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS)) + .thenReturn(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS.getDefaultValue()); + + GravitonEnv.getInstance().initialize(config); + store = GravitonEnv.getInstance().entityStore(); + } + private static void initHiveSchema() { NameIdentifier schemaIdent = NameIdentifier.of(META_LAKE_NAME, hiveCatalog.name(), HIVE_SCHEMA_NAME); @@ -76,8 +116,7 @@ private static void initHiveCatalog() { } @Test - public void testCreateHiveTable() { - Long tableId = 1L; + public void testCreateHiveTable() throws IOException { String hiveTableName = "test_hive_table"; NameIdentifier tableIdentifier = NameIdentifier.of(META_LAKE_NAME, hiveCatalog.name(), hiveSchema.name(), hiveTableName); @@ -111,6 +150,7 @@ public void testCreateHiveTable() { NameIdentifier[] tableIdents = hiveCatalog.asTableCatalog().listTables(tableIdentifier.namespace()); Assertions.assertTrue(Arrays.asList(tableIdents).contains(tableIdentifier)); + Assertions.assertTrue(store.exists(tableIdentifier, TABLE)); // Test exception Throwable exception = @@ -134,8 +174,7 @@ public void testCreateHiveTable() { } @Test - public void testDropHiveTable() { - Long tableId = 1L; + public void testDropHiveTable() throws IOException { NameIdentifier tableIdentifier = NameIdentifier.of(META_LAKE_NAME, hiveCatalog.name(), hiveSchema.name(), genRandomName()); Map properties = Maps.newHashMap(); @@ -161,6 +200,7 @@ public void testDropHiveTable() { Assertions.assertTrue(hiveCatalog.asTableCatalog().tableExists(tableIdentifier)); hiveCatalog.asTableCatalog().dropTable(tableIdentifier); Assertions.assertFalse(hiveCatalog.asTableCatalog().tableExists(tableIdentifier)); + Assertions.assertFalse(store.exists(tableIdentifier, TABLE)); } @Test @@ -182,7 +222,7 @@ public void testListTableException() { } @Test - public void testAlterHiveTable() { + public void testAlterHiveTable() throws IOException { // create a table with random name NameIdentifier tableIdentifier = NameIdentifier.of(META_LAKE_NAME, hiveCatalog.name(), hiveSchema.name(), genRandomName()); @@ -240,6 +280,9 @@ public void testAlterHiveTable() { Assertions.assertFalse(alteredTable.properties().containsKey("key1")); Assertions.assertEquals(alteredTable.properties().get("key2"), "val2_new"); + Assertions.assertFalse(store.exists(tableIdentifier, TABLE)); + Assertions.assertTrue(store.exists(((HiveTable) alteredTable).nameIdentifier(), TABLE)); + Column[] expected = new Column[] { new HiveColumn.Builder() diff --git a/core/src/main/java/com/datastrato/graviton/meta/rel/BaseTable.java b/core/src/main/java/com/datastrato/graviton/meta/rel/BaseTable.java index 5619f484454..c72a5858d23 100644 --- a/core/src/main/java/com/datastrato/graviton/meta/rel/BaseTable.java +++ b/core/src/main/java/com/datastrato/graviton/meta/rel/BaseTable.java @@ -5,7 +5,6 @@ package com.datastrato.graviton.meta.rel; -import com.datastrato.graviton.Audit; import com.datastrato.graviton.Entity; import com.datastrato.graviton.Field; import com.datastrato.graviton.HasIdentifier; @@ -17,12 +16,13 @@ import java.util.Map; import javax.annotation.Nullable; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.ToString; /** An abstract class representing a base table in a relational database. */ @EqualsAndHashCode @ToString -public abstract class BaseTable implements Table, Entity, HasIdentifier { +public class BaseTable implements Table, Entity, HasIdentifier { public static final Field ID = Field.required("id", Long.class, "The table's unique identifier"); public static final Field SCHEMA_ID = @@ -34,12 +34,14 @@ public abstract class BaseTable implements Table, Entity, HasIdentifier { Field.optional("properties", Map.class, "The associated properties of the table"); public static final Field AUDIT_INFO = Field.required("audit_info", AuditInfo.class, "The audit details of the table"); + + // we do not save columns in Graviton store, so it is an optional field public static final Field COLUMNS = - Field.required("columns", Column[].class, "The columns that make up the table"); + Field.optional("columns", Column[].class, "The columns that make up the table"); - protected Long id; + @Getter protected Long id; - protected Long schemaId; + @Getter protected Long schemaId; protected String name; @@ -78,7 +80,7 @@ public Map fields() { * @return The audit details of the table. */ @Override - public Audit auditInfo() { + public AuditInfo auditInfo() { return auditInfo; } @@ -300,4 +302,18 @@ private SELF self() { protected abstract T internalBuild(); } + + public static class TableBuilder extends BaseTableBuilder { + @Override + protected BaseTable internalBuild() { + BaseTable table = new BaseTable(); + table.id = id; + table.schemaId = schemaId; + table.name = name; + table.comment = comment; + table.properties = properties; + table.auditInfo = auditInfo; + return table; + } + } } diff --git a/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java b/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java index 42296bb26dd..4e389782cdc 100644 --- a/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java +++ b/core/src/main/java/com/datastrato/graviton/proto/ProtoEntitySerDe.java @@ -18,17 +18,29 @@ public class ProtoEntitySerDe implements EntitySerDe { // The implementation of different entities should also register its class to this map, // otherwise ProtoEntitySerDe will not be able to deserialize the entity. private static final Map ENTITY_TO_SERDE = - ImmutableMap.of( - "com.datastrato.graviton.meta.AuditInfo", - "com.datastrato.graviton.proto.AuditInfoSerDe", - "com.datastrato.graviton.meta.BaseMetalake", - "com.datastrato.graviton.proto.BaseMetalakeSerDe", - "com.datastrato.graviton.meta.CatalogEntity", - "com.datastrato.graviton.proto.CatalogEntitySerDe", - "com.datastrato.graviton.meta.rel.BaseSchema", - "com.datastrato.graviton.proto.SchemaEntitySerDe", - "com.datastrato.graviton.catalog.hive.HiveSchema", - "com.datastrato.graviton.proto.SchemaEntitySerDe"); + ImmutableMap.builder() + .put( + "com.datastrato.graviton.meta.AuditInfo", + "com.datastrato.graviton.proto.AuditInfoSerDe") + .put( + "com.datastrato.graviton.meta.BaseMetalake", + "com.datastrato.graviton.proto.BaseMetalakeSerDe") + .put( + "com.datastrato.graviton.meta.CatalogEntity", + "com.datastrato.graviton.proto.CatalogEntitySerDe") + .put( + "com.datastrato.graviton.meta.rel.BaseSchema", + "com.datastrato.graviton.proto.SchemaEntitySerDe") + .put( + "com.datastrato.graviton.catalog.hive.HiveSchema", + "com.datastrato.graviton.proto.SchemaEntitySerDe") + .put( + "com.datastrato.graviton.meta.rel.BaseTable", + "com.datastrato.graviton.proto.TableEntitySerde") + .put( + "com.datastrato.graviton.catalog.hive.HiveTable", + "com.datastrato.graviton.proto.TableEntitySerde") + .build(); private static final Map ENTITY_TO_PROTO = ImmutableMap.of( @@ -39,7 +51,9 @@ public class ProtoEntitySerDe implements EntitySerDe { "com.datastrato.graviton.meta.CatalogEntity", "com.datastrato.graviton.proto.Catalog", "com.datastrato.graviton.meta.rel.BaseSchema", - "com.datastrato.graviton.proto.Schema"); + "com.datastrato.graviton.proto.Schema", + "com.datastrato.graviton.meta.rel.BaseTable", + "com.datastrato.graviton.proto.Table"); private final Map, ProtoSerDe> entityToSerDe; diff --git a/core/src/main/java/com/datastrato/graviton/proto/TableEntitySerde.java b/core/src/main/java/com/datastrato/graviton/proto/TableEntitySerde.java new file mode 100644 index 00000000000..02c6c96846d --- /dev/null +++ b/core/src/main/java/com/datastrato/graviton/proto/TableEntitySerde.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.graviton.proto; + +import com.datastrato.graviton.meta.rel.BaseTable; + +public class TableEntitySerde implements ProtoSerDe { + @Override + public Table serialize(BaseTable tableEntity) { + return Table.newBuilder() + .setId(tableEntity.getId()) + .setSchemaId(tableEntity.getSchemaId()) + .setName(tableEntity.name()) + .setAuditInfo(new AuditInfoSerDe().serialize(tableEntity.auditInfo())) + .build(); + } + + @Override + public BaseTable deserialize(Table p) { + return new BaseTable.TableBuilder() + .withId(p.getId()) + .withSchemaId(p.getSchemaId()) + .withName(p.getName()) + .withAuditInfo(new AuditInfoSerDe().deserialize(p.getAuditInfo())) + .build(); + } +} diff --git a/meta/src/main/proto/graviton_meta.proto b/meta/src/main/proto/graviton_meta.proto index 9be83f30b43..f46282b3602 100644 --- a/meta/src/main/proto/graviton_meta.proto +++ b/meta/src/main/proto/graviton_meta.proto @@ -62,4 +62,11 @@ message Schema { uint64 catalog_id = 2; string name = 3; AuditInfo audit_info = 4; +} + +message Table { + uint64 id = 1; + uint64 schema_id = 2; + string name = 3; + AuditInfo audit_info = 4; } \ No newline at end of file