Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#91]feat(catalog): Hive schema entity serde and store support #208

Merged
merged 6 commits into from
Aug 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -76,4 +76,5 @@ dependencies {
testImplementation(libs.slf4j.jdk14)
testImplementation(libs.junit.jupiter.api)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.mockito.core)
}
Original file line number Diff line number Diff line change
@@ -4,11 +4,17 @@
*/
package com.datastrato.graviton.catalog.hive;

import static com.datastrato.graviton.Entity.EntityType.SCHEMA;
import static com.datastrato.graviton.Entity.EntityType.TABLE;
import static com.datastrato.graviton.catalog.hive.HiveTable.HMS_TABLE_COMMENT;
import static com.datastrato.graviton.catalog.hive.HiveTable.SUPPORT_TABLE_TYPES;

import com.datastrato.graviton.EntityAlreadyExistsException;
import com.datastrato.graviton.EntityStore;
import com.datastrato.graviton.GravitonEnv;
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.NoSuchEntityException;
import com.datastrato.graviton.catalog.CatalogOperations;
import com.datastrato.graviton.catalog.hive.converter.ToHiveType;
import com.datastrato.graviton.exceptions.NoSuchCatalogException;
@@ -34,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -148,30 +155,33 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map<String,
String.format("Cannot support invalid namespace in Hive Metastore: %s", ident.namespace()));

try {
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveSchema hiveSchema =
new HiveSchema.Builder()
.withId(1L /*TODO. Use ID generator*/)
.withCatalogId(entity.getId())
.withName(ident.name())
.withNamespace(ident.namespace())
.withComment(comment)
.withProperties(metadata)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withConf(hiveConf)
.build();

clientPool.run(
client -> {
client.createDatabase(hiveSchema.toInnerDB());
return null;
});

// TODO. We should also store the customized HiveSchema entity fields into our own
// underlying storage, like id, auditInfo, etc.
store.executeInTransaction(
() -> {
HiveSchema createdSchema =
new HiveSchema.Builder()
.withId(1L /*TODO. Use ID generator*/)
.withCatalogId(entity.getId())
.withName(ident.name())
.withNamespace(ident.namespace())
.withComment(comment)
.withProperties(metadata)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withConf(hiveConf)
.build();
store.put(createdSchema, false);
clientPool.run(
mchades marked this conversation as resolved.
Show resolved Hide resolved
client -> {
client.createDatabase(createdSchema.toInnerDB());
return null;
});
return createdSchema;
});

LOG.info("Created Hive schema (database) {} in Hive Metastore", ident.name());

@@ -180,13 +190,24 @@ public HiveSchema createSchema(NameIdentifier ident, String comment, Map<String,
} catch (AlreadyExistsException e) {
throw new SchemaAlreadyExistsException(
String.format(
"Hive schema (database) '%s' already exists in Hive Metastore", ident.name()));
"Hive schema (database) '%s' already exists in Hive Metastore", ident.name()),
e);

} catch (EntityAlreadyExistsException e) {
throw new SchemaAlreadyExistsException(
String.format(
"Hive schema (database) '%s' already exists in Graviton store", ident.name()),
e);

} catch (TException e) {
throw new RuntimeException(
"Failed to create Hive schema (database) " + ident.name() + " in Hive Metastore", e);

} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to create Hive schema (database) " + ident.name() + " in Graviton store", e);

} catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -211,20 +232,15 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
Database database = clientPool.run(client -> client.getDatabase(ident.name()));
HiveSchema.Builder builder = new HiveSchema.Builder();

// TODO. We should also fetch the customized HiveSchema entity fields from our own
// underlying storage, like id, auditInfo, etc.
EntityStore store = GravitonEnv.getInstance().entityStore();
BaseSchema baseSchema = store.get(ident, SCHEMA, BaseSchema.class);

builder =
builder
.withId(1L /* TODO. Fetch id from underlying storage */)
.withCatalogId(entity.getId())
.withId(baseSchema.getId())
.withCatalogId(baseSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
/* TODO. Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.withAuditInfo(baseSchema.auditInfo())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(database, builder);

@@ -233,19 +249,28 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
return hiveSchema;

} catch (NoSuchObjectException | UnknownDBException e) {
deleteSchemaFromStore(ident);
throw new NoSuchSchemaException(
String.format(
"Hive schema (database) does not exist: %s in Hive Metastore", ident.name()),
e);

// TODO. We should also delete Hive schema (database) from our own underlying storage
} catch (NoSuchEntityException e) {
throw new NoSuchSchemaException(
String.format(
"Hive schema (database) does not exist: %s in Graviton store", ident.name()),
e);

} catch (TException e) {
throw new RuntimeException(
"Failed to load Hive schema (database) " + ident.name() + " from Hive Metastore", e);

} catch (InterruptedException e) {
throw new RuntimeException(e);

} catch (IOException ioe) {
LOG.error("Failed to load hive schema {}", ident, ioe);
throw new RuntimeException(ioe);
}
}

@@ -293,38 +318,64 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
Database alteredDatabase = database.deepCopy();
alteredDatabase.setParameters(metadata);

clientPool.run(
client -> {
client.alterDatabase(ident.name(), alteredDatabase);
return null;
});

// TODO. We should also update the customized HiveSchema entity fields into our own if
// necessary
HiveSchema.Builder builder = new HiveSchema.Builder();
builder =
builder
.withId(1L /* TODO. Fetch id from underlying storage */)
.withCatalogId(entity.getId())
.withNamespace(ident.namespace())
.withAuditInfo(
/* TODO. Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);
// update store transactionally
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveSchema alteredHiveSchema =
store.executeInTransaction(
() -> {
BaseSchema oldSchema = store.get(ident, SCHEMA, BaseSchema.class);
HiveSchema.Builder builder = new HiveSchema.Builder();
builder =
builder
.withId(oldSchema.getId())
.withCatalogId(oldSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(oldSchema.auditInfo().creator())
.withCreateTime(oldSchema.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);

// To be on the safe side, here uses delete before put (although hive schema does
// not support rename yet)
store.delete(ident, SCHEMA);
store.put(hiveSchema, false);
jerryshao marked this conversation as resolved.
Show resolved Hide resolved
clientPool.run(
client -> {
client.alterDatabase(ident.name(), alteredDatabase);
return null;
});
return hiveSchema;
});

LOG.info("Altered Hive schema (database) {} in Hive Metastore", ident.name());
// todo(xun): hive does not support renaming database name directly,
// perhaps we can use namespace to mapping the database names indirectly

return hiveSchema;
return alteredHiveSchema;

} catch (NoSuchObjectException e) {
throw new NoSuchSchemaException(
String.format("Hive schema (database) %s does not exist in Hive Metastore", ident.name()),
e);

} catch (EntityAlreadyExistsException e) {
throw new NoSuchSchemaException(
"The new Hive schema (database) name already exist in Graviton store", e);

} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to alter Hive schema (database) " + ident.name() + " in Hive metastore", e);

} catch (IOException e) {
throw new RuntimeException(
"Failed to alter Hive schema (database) " + ident.name() + " in Graviton store", e);

} catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -348,15 +399,39 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
return false;
}

EntityStore store = GravitonEnv.getInstance().entityStore();
Namespace schemaNamespace =
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name()));
if (!cascade) {
if (listTables(schemaNamespace).length > 0) {
mchades marked this conversation as resolved.
Show resolved Hide resolved
throw new NonEmptySchemaException(
String.format(
"Hive schema (database) %s is not empty. One or more tables exist in Hive metastore.",
ident.name()));
}
// TODO(minghuang): check if there are tables in Graviton store after we support hive table
// serde
}

try {
clientPool.run(
client -> {
client.dropDatabase(ident.name(), false, false, cascade);
store.executeInTransaction(
() -> {
store.delete(ident, SCHEMA);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to delete all tables under this schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, delete tables operation added.

for (BaseTable t :
store.list(
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())),
HiveTable.class,
TABLE)) {
store.delete(NameIdentifier.of(schemaNamespace, t.name()), TABLE);
}
clientPool.run(
client -> {
client.dropDatabase(ident.name(), false, false, cascade);
return null;
});
return null;
});

// TODO. we should also delete the Hive schema (database) from our own underlying storage

LOG.info("Dropped Hive schema (database) {}", ident.name());
return true;

@@ -367,18 +442,32 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
e);

} catch (NoSuchObjectException e) {
deleteSchemaFromStore(ident);
LOG.warn("Hive schema (database) {} does not exist in Hive Metastore", ident.name());
return false;

} catch (TException e) {
throw new RuntimeException(
"Failed to drop Hive schema (database) " + ident.name() + " in Hive Metastore", e);

} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to drop Hive schema (database) " + ident.name() + " in Graviton store", e);

} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void deleteSchemaFromStore(NameIdentifier ident) {
EntityStore store = GravitonEnv.getInstance().entityStore();
try {
store.delete(ident, SCHEMA);
} catch (IOException ex) {
LOG.error("Failed to delete hive schema {} from Graviton store", ident, ex);
}
}

/**
* Lists all the tables under the specified namespace.
*
@@ -415,9 +504,11 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep
} catch (UnknownDBException e) {
throw new NoSuchSchemaException(
"Schema (database) does not exist " + namespace + " in Hive Metastore");

} catch (TException e) {
throw new RuntimeException(
"Failed to list all tables under the namespace : " + namespace + " in Hive Metastore", e);

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -469,6 +560,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
} catch (TException e) {
throw new NoSuchTableException(
String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e);

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Loading