Skip to content

Commit

Permalink
[#91]feat(catalog): Hive schema entity serde and store support (#208)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1. Add `CommonSchema` class and  `Schema` proto for serde
2. serde and store `HiveSchema` as `CommonSchema`
3. hive schema operations support Graviton store

### Why are the changes needed?

we could store the additional entity information to our own storage.

Fix: #91 

### Does this PR introduce _any_ user-facing change?

Add some check for Graviton store

### How was this patch tested?
UTs added
  • Loading branch information
mchades authored Aug 14, 2023
1 parent 3053303 commit bf288fa
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 78 deletions.
1 change: 1 addition & 0 deletions catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ dependencies {
testImplementation(libs.slf4j.jdk14)
testImplementation(libs.junit.jupiter.api)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.mockito.core)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
*/
package com.datastrato.graviton.catalog.hive;

import static com.datastrato.graviton.Entity.EntityType.SCHEMA;
import static com.datastrato.graviton.Entity.EntityType.TABLE;
import static com.datastrato.graviton.catalog.hive.HiveTable.HMS_TABLE_COMMENT;
import static com.datastrato.graviton.catalog.hive.HiveTable.SUPPORT_TABLE_TYPES;

import com.datastrato.graviton.EntityAlreadyExistsException;
import com.datastrato.graviton.EntityStore;
import com.datastrato.graviton.GravitonEnv;
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.NoSuchEntityException;
import com.datastrato.graviton.catalog.CatalogOperations;
import com.datastrato.graviton.catalog.hive.converter.ToHiveType;
import com.datastrato.graviton.exceptions.NoSuchCatalogException;
Expand All @@ -34,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
Expand Down Expand Up @@ -148,30 +155,33 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map<String,
String.format("Cannot support invalid namespace in Hive Metastore: %s", ident.namespace()));

try {
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveSchema hiveSchema =
new HiveSchema.Builder()
.withId(1L /*TODO. Use ID generator*/)
.withCatalogId(entity.getId())
.withName(ident.name())
.withNamespace(ident.namespace())
.withComment(comment)
.withProperties(metadata)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withConf(hiveConf)
.build();

clientPool.run(
client -> {
client.createDatabase(hiveSchema.toInnerDB());
return null;
});

// TODO. We should also store the customized HiveSchema entity fields into our own
// underlying storage, like id, auditInfo, etc.
store.executeInTransaction(
() -> {
HiveSchema createdSchema =
new HiveSchema.Builder()
.withId(1L /*TODO. Use ID generator*/)
.withCatalogId(entity.getId())
.withName(ident.name())
.withNamespace(ident.namespace())
.withComment(comment)
.withProperties(metadata)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withConf(hiveConf)
.build();
store.put(createdSchema, false);
clientPool.run(
client -> {
client.createDatabase(createdSchema.toInnerDB());
return null;
});
return createdSchema;
});

LOG.info("Created Hive schema (database) {} in Hive Metastore", ident.name());

Expand All @@ -180,13 +190,24 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map<String,
} catch (AlreadyExistsException e) {
throw new SchemaAlreadyExistsException(
String.format(
"Hive schema (database) '%s' already exists in Hive Metastore", ident.name()));
"Hive schema (database) '%s' already exists in Hive Metastore", ident.name()),
e);

} catch (EntityAlreadyExistsException e) {
throw new SchemaAlreadyExistsException(
String.format(
"Hive schema (database) '%s' already exists in Graviton store", ident.name()),
e);

} catch (TException e) {
throw new RuntimeException(
"Failed to create Hive schema (database) " + ident.name() + " in Hive Metastore", e);

} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to create Hive schema (database) " + ident.name() + " in Graviton store", e);

} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand All @@ -211,20 +232,15 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
Database database = clientPool.run(client -> client.getDatabase(ident.name()));
HiveSchema.Builder builder = new HiveSchema.Builder();

// TODO. We should also fetch the customized HiveSchema entity fields from our own
// underlying storage, like id, auditInfo, etc.
EntityStore store = GravitonEnv.getInstance().entityStore();
BaseSchema baseSchema = store.get(ident, SCHEMA, BaseSchema.class);

builder =
builder
.withId(1L /* TODO. Fetch id from underlying storage */)
.withCatalogId(entity.getId())
.withId(baseSchema.getId())
.withCatalogId(baseSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
/* TODO. Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withAuditInfo(baseSchema.auditInfo())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(database, builder);

Expand All @@ -233,19 +249,28 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
return hiveSchema;

} catch (NoSuchObjectException | UnknownDBException e) {
deleteSchemaFromStore(ident);
throw new NoSuchSchemaException(
String.format(
"Hive schema (database) does not exist: %s in Hive Metastore", ident.name()),
e);

// TODO. We should also delete Hive schema (database) from our own underlying storage
} catch (NoSuchEntityException e) {
throw new NoSuchSchemaException(
String.format(
"Hive schema (database) does not exist: %s in Graviton store", ident.name()),
e);

} catch (TException e) {
throw new RuntimeException(
"Failed to load Hive schema (database) " + ident.name() + " from Hive Metastore", e);

} catch (InterruptedException e) {
throw new RuntimeException(e);

} catch (IOException ioe) {
LOG.error("Failed to load hive schema {}", ident, ioe);
throw new RuntimeException(ioe);
}
}

Expand Down Expand Up @@ -293,38 +318,64 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
Database alteredDatabase = database.deepCopy();
alteredDatabase.setParameters(metadata);

clientPool.run(
client -> {
client.alterDatabase(ident.name(), alteredDatabase);
return null;
});

// TODO. We should also update the customized HiveSchema entity fields into our own if
// necessary
HiveSchema.Builder builder = new HiveSchema.Builder();
builder =
builder
.withId(1L /* TODO. Fetch id from underlying storage */)
.withCatalogId(entity.getId())
.withNamespace(ident.namespace())
.withAuditInfo(
/* TODO. Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);
// update store transactionally
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveSchema alteredHiveSchema =
store.executeInTransaction(
() -> {
BaseSchema oldSchema = store.get(ident, SCHEMA, BaseSchema.class);
HiveSchema.Builder builder = new HiveSchema.Builder();
builder =
builder
.withId(oldSchema.getId())
.withCatalogId(oldSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(oldSchema.auditInfo().creator())
.withCreateTime(oldSchema.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);

// To be on the safe side, here uses delete before put (although hive schema does
// not support rename yet)
store.delete(ident, SCHEMA);
store.put(hiveSchema, false);
clientPool.run(
client -> {
client.alterDatabase(ident.name(), alteredDatabase);
return null;
});
return hiveSchema;
});

LOG.info("Altered Hive schema (database) {} in Hive Metastore", ident.name());
// todo(xun): hive does not support renaming database name directly,
// perhaps we can use namespace to mapping the database names indirectly

return hiveSchema;
return alteredHiveSchema;

} catch (NoSuchObjectException e) {
throw new NoSuchSchemaException(
String.format("Hive schema (database) %s does not exist in Hive Metastore", ident.name()),
e);

} catch (EntityAlreadyExistsException e) {
throw new NoSuchSchemaException(
"The new Hive schema (database) name already exist in Graviton store", e);

} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to alter Hive schema (database) " + ident.name() + " in Hive metastore", e);

} catch (IOException e) {
throw new RuntimeException(
"Failed to alter Hive schema (database) " + ident.name() + " in Graviton store", e);

} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand All @@ -348,15 +399,39 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
return false;
}

EntityStore store = GravitonEnv.getInstance().entityStore();
Namespace schemaNamespace =
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name()));
if (!cascade) {
if (listTables(schemaNamespace).length > 0) {
throw new NonEmptySchemaException(
String.format(
"Hive schema (database) %s is not empty. One or more tables exist in Hive metastore.",
ident.name()));
}
// TODO(minghuang): check if there are tables in Graviton store after we support hive table
// serde
}

try {
clientPool.run(
client -> {
client.dropDatabase(ident.name(), false, false, cascade);
store.executeInTransaction(
() -> {
store.delete(ident, SCHEMA);
for (BaseTable t :
store.list(
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())),
HiveTable.class,
TABLE)) {
store.delete(NameIdentifier.of(schemaNamespace, t.name()), TABLE);
}
clientPool.run(
client -> {
client.dropDatabase(ident.name(), false, false, cascade);
return null;
});
return null;
});

// TODO. we should also delete the Hive schema (database) from our own underlying storage

LOG.info("Dropped Hive schema (database) {}", ident.name());
return true;

Expand All @@ -367,18 +442,32 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
e);

} catch (NoSuchObjectException e) {
deleteSchemaFromStore(ident);
LOG.warn("Hive schema (database) {} does not exist in Hive Metastore", ident.name());
return false;

} catch (TException e) {
throw new RuntimeException(
"Failed to drop Hive schema (database) " + ident.name() + " in Hive Metastore", e);

} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to drop Hive schema (database) " + ident.name() + " in Graviton store", e);

} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void deleteSchemaFromStore(NameIdentifier ident) {
EntityStore store = GravitonEnv.getInstance().entityStore();
try {
store.delete(ident, SCHEMA);
} catch (IOException ex) {
LOG.error("Failed to delete hive schema {} from Graviton store", ident, ex);
}
}

/**
* Lists all the tables under the specified namespace.
*
Expand Down Expand Up @@ -415,9 +504,11 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep
} catch (UnknownDBException e) {
throw new NoSuchSchemaException(
"Schema (database) does not exist " + namespace + " in Hive Metastore");

} catch (TException e) {
throw new RuntimeException(
"Failed to list all tables under the namespace : " + namespace + " in Hive Metastore", e);

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -469,6 +560,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
} catch (TException e) {
throw new NoSuchTableException(
String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e);

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit bf288fa

Please sign in to comment.