Skip to content

Commit

Permalink
Hive schema entity serde and store support
Browse files Browse the repository at this point in the history
  • Loading branch information
mchades committed Aug 10, 2023
1 parent 46b1cb6 commit 7501630
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 89 deletions.
1 change: 1 addition & 0 deletions catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ dependencies {
testImplementation(libs.slf4j.jdk14)
testImplementation(libs.junit.jupiter.api)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.mockito.core)
}

tasks.test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
*/
package com.datastrato.graviton.catalog.hive;

import static com.datastrato.graviton.Entity.EntityType.SCHEMA;
import static com.datastrato.graviton.catalog.hive.HiveTable.HMS_TABLE_COMMENT;
import static com.datastrato.graviton.catalog.hive.HiveTable.SUPPORT_TABLE_TYPES;

import com.datastrato.graviton.EntityAlreadyExistsException;
import com.datastrato.graviton.EntityStore;
import com.datastrato.graviton.GravitonEnv;
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.NoSuchEntityException;
import com.datastrato.graviton.catalog.CatalogOperations;
import com.datastrato.graviton.catalog.hive.converter.ToHiveType;
import com.datastrato.graviton.exceptions.NoSuchCatalogException;
Expand Down Expand Up @@ -148,30 +153,33 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map<String,
String.format("Cannot support invalid namespace in Hive Metastore: %s", ident.namespace()));

try {
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveSchema hiveSchema =
new HiveSchema.Builder()
.withId(1L /*TODO. Use ID generator*/)
.withCatalogId(entity.getId())
.withName(ident.name())
.withNamespace(ident.namespace())
.withComment(comment)
.withProperties(metadata)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withConf(hiveConf)
.build();

clientPool.run(
client -> {
client.createDatabase(hiveSchema.toInnerDB());
return null;
});

// TODO. We should also store the customized HiveSchema entity fields into our own
// underlying storage, like id, auditInfo, etc.
store.executeInTransaction(
() -> {
HiveSchema createdSchema =
new HiveSchema.Builder()
.withId(1L /*TODO. Use ID generator*/)
.withCatalogId(entity.getId())
.withName(ident.name())
.withNamespace(ident.namespace())
.withComment(comment)
.withProperties(metadata)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withConf(hiveConf)
.build();
store.put(createdSchema, false);
clientPool.run(
client -> {
client.createDatabase(createdSchema.toInnerDB());
return null;
});
return createdSchema;
});

LOG.info("Created Hive schema (database) {} in Hive Metastore", ident.name());

Expand All @@ -182,11 +190,18 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map<String,
String.format(
"Hive schema (database) '%s' already exists in Hive Metastore", ident.name()));

} catch (EntityAlreadyExistsException e) {
throw new SchemaAlreadyExistsException(
String.format(
"Hive schema (database) '%s' already exists in Graviton store", ident.name()));
} catch (TException e) {
throw new RuntimeException(
"Failed to create Hive schema (database) " + ident.name() + " in Hive Metastore", e);

} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to create Hive schema (database) " + ident.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand All @@ -211,20 +226,15 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
Database database = clientPool.run(client -> client.getDatabase(ident.name()));
HiveSchema.Builder builder = new HiveSchema.Builder();

// TODO. We should also fetch the customized HiveSchema entity fields from our own
// underlying storage, like id, auditInfo, etc.
EntityStore store = GravitonEnv.getInstance().entityStore();
BaseSchema baseSchema = store.get(ident, SCHEMA, BaseSchema.CommonSchema.class);

builder =
builder
.withId(1L /* TODO. Fetch id from underlying storage */)
.withCatalogId(entity.getId())
.withId(baseSchema.getId())
.withCatalogId(baseSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
/* TODO. Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withAuditInfo(baseSchema.auditInfo())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(database, builder);

Expand All @@ -233,19 +243,26 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
return hiveSchema;

} catch (NoSuchObjectException | UnknownDBException e) {
deleteSchemaFromStore(ident);
throw new NoSuchSchemaException(
String.format(
"Hive schema (database) does not exist: %s in Hive Metastore", ident.name()),
e);

// TODO. We should also delete Hive schema (database) from our own underlying storage

} catch (NoSuchEntityException e) {
throw new NoSuchSchemaException(
String.format(
"Hive schema (database) does not exist: %s in Graviton store", ident.name()),
e);
} catch (TException e) {
throw new RuntimeException(
"Failed to load Hive schema (database) " + ident.name() + " from Hive Metastore", e);

} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (IOException ioe) {
LOG.error("Failed to load hive schema {}", ident, ioe);
throw new RuntimeException(ioe);
}
}

Expand Down Expand Up @@ -293,38 +310,61 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
Database alteredDatabase = database.deepCopy();
alteredDatabase.setParameters(metadata);

clientPool.run(
client -> {
client.alterDatabase(ident.name(), alteredDatabase);
return null;
});

// TODO. We should also update the customized HiveSchema entity fields into our own if
// necessary
HiveSchema.Builder builder = new HiveSchema.Builder();
builder =
builder
.withId(1L /* TODO. Fetch id from underlying storage */)
.withCatalogId(entity.getId())
.withNamespace(ident.namespace())
.withAuditInfo(
/* TODO. Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);
// update store transactionally
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveSchema alteredHiveSchema =
store.executeInTransaction(
() -> {
BaseSchema.CommonSchema oldSchema =
store.get(ident, SCHEMA, BaseSchema.CommonSchema.class);
HiveSchema.Builder builder = new HiveSchema.Builder();
builder =
builder
.withId(oldSchema.getId())
.withCatalogId(oldSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(oldSchema.auditInfo().creator())
.withCreateTime(oldSchema.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);

// To be on the safe side, here uses delete before put (although hive schema does
// not support rename yet)
store.delete(ident, SCHEMA);
store.put(hiveSchema, false);
clientPool.run(
client -> {
client.alterDatabase(ident.name(), alteredDatabase);
return null;
});
return hiveSchema;
});

LOG.info("Altered Hive schema (database) {} in Hive Metastore", ident.name());
// todo(xun): hive does not support renaming database name directly,
// perhaps we can use namespace to mapping the database names indirectly

return hiveSchema;
return alteredHiveSchema;

} catch (NoSuchObjectException e) {
throw new NoSuchSchemaException(
String.format("Hive schema (database) %s does not exist in Hive Metastore", ident.name()),
e);
} catch (EntityAlreadyExistsException e) {
throw new NoSuchSchemaException(
"The new Hive schema (database) name already exist in Graviton store", e);
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to alter Hive schema (database) " + ident.name() + " in Hive metastore", e);
} catch (IOException e) {
throw new RuntimeException(
"Failed to alter Hive schema (database) " + ident.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand All @@ -348,15 +388,30 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
return false;
}

EntityStore store = GravitonEnv.getInstance().entityStore();
if (!cascade) {
if (listSchemas(ident.namespace()).length > 0) {
throw new NonEmptySchemaException(
String.format(
"Hive schema (database) %s is not empty. One or more tables exist in Hive metastore.",
ident.name()));
}
// TODO(minghuang): check if there are tables in Graviton store after we support hive table
// serde
}

try {
clientPool.run(
client -> {
client.dropDatabase(ident.name(), false, false, cascade);
store.executeInTransaction(
() -> {
store.delete(ident, SCHEMA);
clientPool.run(
client -> {
client.dropDatabase(ident.name(), false, false, cascade);
return null;
});
return null;
});

// TODO. we should also delete the Hive schema (database) from our own underlying storage

LOG.info("Dropped Hive schema (database) {}", ident.name());
return true;

Expand All @@ -367,18 +422,31 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
e);

} catch (NoSuchObjectException e) {
deleteSchemaFromStore(ident);
LOG.warn("Hive schema (database) {} does not exist in Hive Metastore", ident.name());
return false;

} catch (TException e) {
throw new RuntimeException(
"Failed to drop Hive schema (database) " + ident.name() + " in Hive Metastore", e);

} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to drop Hive schema (database) " + ident.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void deleteSchemaFromStore(NameIdentifier ident) {
EntityStore store = GravitonEnv.getInstance().entityStore();
try {
store.delete(ident, SCHEMA);
} catch (IOException ex) {
LOG.error("Failed to delete hive schema {} from Graviton store", ident, ex);
}
}

/**
* Lists all the tables under the specified namespace.
*
Expand Down Expand Up @@ -542,7 +610,7 @@ public Table createTable(
/**
* Not supported in this implementation. Throws UnsupportedOperationException.
*
* @param ident The identifier of the table to alter.
* @param tableIdent The identifier of the table to alter.
* @param changes The changes to apply to the table.
* @return This method always throws UnsupportedOperationException.
* @throws NoSuchTableException This exception will not be thrown in this method.
Expand Down Expand Up @@ -778,12 +846,6 @@ public boolean purgeTable(NameIdentifier tableIdent) throws UnsupportedOperation
return dropHiveTable(tableIdent, true, true);
}

/**
* Checks if the given namespace is a valid namespace for the Hive schema.
*
* @param namespace The namespace to validate.
* @return true if the namespace is valid; otherwise, false.
*/
private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boolean ifPurge) {
Preconditions.checkArgument(!tableIdent.name().isEmpty(), "Cannot drop table with empty name");

Expand Down
Loading

0 comments on commit 7501630

Please sign in to comment.