Skip to content

Commit

Permalink
[#209]feat(catalog): Hive table entity serde and store support (#233)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

SerDe Hive table entity as **Table Proto** in Graviton store while
operating Hive table

### Why are the changes needed?

we could store the additional entity information to our own storage.

Fix: #209 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
UTs added
  • Loading branch information
mchades authored Aug 14, 2023
1 parent bf288fa commit bb29195
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.datastrato.graviton.rel.TableChange;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -402,26 +403,33 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
EntityStore store = GravitonEnv.getInstance().entityStore();
Namespace schemaNamespace =
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name()));
List<BaseTable> tables = Lists.newArrayList();
if (!cascade) {
if (listTables(schemaNamespace).length > 0) {
throw new NonEmptySchemaException(
String.format(
"Hive schema (database) %s is not empty. One or more tables exist in Hive metastore.",
ident.name()));
}
// TODO(minghuang): check if there are tables in Graviton store after we support hive table
// serde

try {
tables.addAll(store.list(schemaNamespace, BaseTable.class, TABLE));
} catch (IOException e) {
throw new RuntimeException("Failed to list table from Graviton store", e);
}
if (!tables.isEmpty()) {
throw new NonEmptySchemaException(
String.format(
"Hive schema (database) %s is not empty. One or more tables exist in Graviton store.",
ident.name()));
}
}

try {
store.executeInTransaction(
() -> {
store.delete(ident, SCHEMA);
for (BaseTable t :
store.list(
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())),
HiveTable.class,
TABLE)) {
for (BaseTable t : tables) {
store.delete(NameIdentifier.of(schemaNamespace, t.name()), TABLE);
}
clientPool.run(
Expand Down Expand Up @@ -531,38 +539,37 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
String.format(
"Cannot support invalid namespace in Hive Metastore: %s", schemaIdent.namespace()));

HiveSchema schema = loadSchema(schemaIdent);
try {
org.apache.hadoop.hive.metastore.api.Table hiveTable =
clientPool.run(c -> c.getTable(schemaIdent.name(), tableIdent.name()));
HiveTable.Builder builder = new HiveTable.Builder();

// TODO: We should also fetch the customized HiveTable entity fields from our own
// underlying storage, like id, auditInfo, etc.
EntityStore store = GravitonEnv.getInstance().entityStore();
BaseTable baseTable = store.get(tableIdent, TABLE, BaseTable.class);

builder =
builder
.withId(1L /* TODO: Fetch id from underlying storage */)
.withSchemaId((Long) schema.fields().get(BaseSchema.ID))
.withId(baseTable.getId())
.withSchemaId(baseTable.getSchemaId())
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
/* TODO: Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build());
.withAuditInfo(baseTable.auditInfo());
HiveTable table = HiveTable.fromInnerTable(hiveTable, builder);

LOG.info("Loaded Hive table {} from Hive Metastore ", tableIdent.name());

return table;
} catch (TException e) {
} catch (NoSuchObjectException e) {
throw new NoSuchTableException(
String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e);

} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (InterruptedException | TException e) {
throw new RuntimeException(
"Failed to load Hive table " + tableIdent.name() + " from Hive metastore", e);

} catch (IOException e) {
throw new RuntimeException(
"Failed to load Hive table " + tableIdent.name() + " from Graviton store", e);
}
}

Expand Down Expand Up @@ -593,40 +600,47 @@ public Table createTable(
try {
HiveSchema schema = loadSchema(schemaIdent);

HiveTable table =
new HiveTable.Builder()
.withId(1L /* TODO: Use ID generator*/)
.withSchemaId((Long) schema.fields().get(BaseSchema.ID))
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withColumns(columns)
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.build();
clientPool.run(
c -> {
c.createTable(table.toInnerTable());
return null;
});

// TODO. We should also store the customized HiveTable entity fields into our own
// underlying storage, like id, auditInfo, etc.
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveTable hiveTable =
store.executeInTransaction(
() -> {
HiveTable createdTable =
new HiveTable.Builder()
.withId(1L /* TODO: Use ID generator*/)
.withSchemaId(schema.getId())
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withColumns(columns)
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.build();
store.put(createdTable, false);
clientPool.run(
c -> {
c.createTable(createdTable.toInnerTable());
return null;
});
return createdTable;
});

LOG.info("Created Hive table {} in Hive Metastore", tableIdent.name());

return table;
return hiveTable;

} catch (AlreadyExistsException e) {
} catch (AlreadyExistsException | EntityAlreadyExistsException e) {
throw new TableAlreadyExistsException("Table already exists: " + tableIdent.name(), e);
} catch (TException e) {
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to create Hive table " + tableIdent.name() + " in Hive Metastore", e);
} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to create Hive table " + tableIdent.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -704,36 +718,49 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
}
}

clientPool.run(
c -> {
c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable);
return null;
});
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveTable updatedTable =
store.executeInTransaction(
() -> {
HiveTable.Builder builder = new HiveTable.Builder();
builder =
builder
.withId(table.getId())
.withSchemaId(table.getSchemaId())
.withName(alteredHiveTable.getTableName())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(table.auditInfo().creator())
.withCreateTime(table.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build());
HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder);
store.delete(tableIdent, TABLE);
store.put(alteredTable, false);
clientPool.run(
c -> {
c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable);
return null;
});
return alteredTable;
});

// TODO(@Minghuang). We should also update the customized HiveTable entity fields into our own
// if necessary
HiveTable.Builder builder = new HiveTable.Builder();
builder =
builder
.withId((Long) table.fields().get(BaseTable.ID))
.withSchemaId((Long) table.fields().get(BaseTable.SCHEMA_ID))
.withName(alteredHiveTable.getTableName())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
/* TODO(@Minghuang): Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build());
HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder);
LOG.info("Altered Hive table {} in Hive Metastore", tableIdent.name());

return alteredTable;
return updatedTable;

} catch (NoSuchObjectException e) {
throw new NoSuchTableException(
String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e);
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to alter Hive table " + tableIdent.name() + " in Hive metastore", e);
} catch (IOException e) {
throw new RuntimeException(
"Failed to alter Hive table " + tableIdent.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -886,24 +913,31 @@ private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boo
"Cannot support invalid namespace in Hive Metastore: %s", schemaIdent.namespace()));

try {
clientPool.run(
c -> {
c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge);
EntityStore store = GravitonEnv.getInstance().entityStore();
store.executeInTransaction(
() -> {
store.delete(tableIdent, TABLE);
clientPool.run(
c -> {
c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge);
return null;
});
return null;
});

// TODO. we should also delete the Hive table from our own underlying storage

LOG.info("Dropped Hive table {}", tableIdent.name());
return true;

} catch (NoSuchObjectException e) {
LOG.warn("Hive table {} does not exist in Hive Metastore", tableIdent.name());
return false;
} catch (TException e) {
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to drop Hive table " + tableIdent.name() + " in Hive Metastore", e);
} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to drop Hive table " + tableIdent.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down
Loading

0 comments on commit bb29195

Please sign in to comment.