Skip to content

Commit

Permalink
[#2455] feat(core): Support pluggable catalog operations (#2477)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1. add a hacky catalog config 'ops-impl' to specify the custom catalog
operation
2. some changes to CatalogOperation
a. move `entity` from XXCatalogOperation constructor to initialize(), to
construct custom catalog operation using reflect easily.
b. move CatalogOperation#initialize() from XXCatalogOperation#newOps()
to BaseCatalog#Ops(), because initialize is the API, it should be called
explicitly by the framework not by specific XXCatalogOperation.

### Why are the changes needed?

Fix: #2455 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
UT and IT
  • Loading branch information
FANNG1 authored Mar 14, 2024
1 parent 8c58d67 commit 94912a7
Show file tree
Hide file tree
Showing 24 changed files with 238 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public String shortName() {

@Override
protected CatalogOperations newOps(Map<String, String> config) {
HadoopCatalogOperations ops = new HadoopCatalogOperations(entity());
ops.initialize(config);
HadoopCatalogOperations ops = new HadoopCatalogOperations();
return ops;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem
private static final HadoopFilesetPropertiesMetadata FILESET_PROPERTIES_METADATA =
new HadoopFilesetPropertiesMetadata();

private final CatalogEntity entity;
private CatalogEntity entity;

private final EntityStore store;

Expand All @@ -77,17 +77,17 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem
@VisibleForTesting Optional<Path> catalogStorageLocation;

// For testing only.
HadoopCatalogOperations(CatalogEntity entity, EntityStore store) {
this.entity = entity;
HadoopCatalogOperations(EntityStore store) {
this.store = store;
}

public HadoopCatalogOperations(CatalogEntity entity) {
this(entity, GravitinoEnv.getInstance().entityStore());
public HadoopCatalogOperations() {
this(GravitinoEnv.getInstance().entityStore());
}

@Override
public void initialize(Map<String, String> config) throws RuntimeException {
public void initialize(Map<String, String> config, CatalogEntity entity) throws RuntimeException {
this.entity = entity;
// Initialize Hadoop Configuration.
this.hadoopConf = new Configuration();
Map<String, String> bypassConfigs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,22 @@ public static void tearDown() throws IOException {
@Test
public void testHadoopCatalogConfiguration() {
Map<String, String> emptyProps = Maps.newHashMap();
HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store);
ops.initialize(emptyProps);
HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
ops.initialize(emptyProps, null);
Configuration conf = ops.hadoopConf;
String value = conf.get("fs.defaultFS");
Assertions.assertEquals("file:///", value);

emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", "hdfs://localhost:9000");
ops.initialize(emptyProps);
ops.initialize(emptyProps, null);
Configuration conf1 = ops.hadoopConf;
String value1 = conf1.get("fs.defaultFS");
Assertions.assertEquals("hdfs://localhost:9000", value1);

Assertions.assertFalse(ops.catalogStorageLocation.isPresent());

emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, "file:///tmp/catalog");
ops.initialize(emptyProps);
ops.initialize(emptyProps, null);
Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
Path expectedPath = new Path("file:///tmp/catalog");
Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
Expand Down Expand Up @@ -198,8 +198,8 @@ public void testLoadSchema() throws IOException {

Assertions.assertEquals(name, schema.name());

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name));
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
Expand All @@ -226,8 +226,8 @@ public void testListSchema() throws IOException {
createSchema(name, comment, null, null);
createSchema(name1, comment1, null, null);

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Set<NameIdentifier> idents =
Arrays.stream(ops.listSchemas(Namespace.of("m1", "c1"))).collect(Collectors.toSet());
Assertions.assertTrue(idents.size() >= 2);
Expand All @@ -244,8 +244,8 @@ public void testAlterSchema() throws IOException {
Schema schema = createSchema(name, comment, catalogPath, null);
Assertions.assertEquals(name, schema.name());

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name));
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
Expand Down Expand Up @@ -294,8 +294,8 @@ public void testDropSchema() throws IOException {
Assertions.assertEquals(name, schema.name());
NameIdentifier id = NameIdentifier.ofSchema("m1", "c1", name);

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath));
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath), null);
Schema schema1 = ops.loadSchema(id);
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
Expand Down Expand Up @@ -348,8 +348,8 @@ public void testCreateLoadAndDeleteFilesetWithLocations(
}

NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(catalogProps);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(catalogProps, null);
if (!ops.schemaExists(schemaIdent)) {
createSchema(schemaName, comment, catalogPath, schemaPath);
}
Expand Down Expand Up @@ -400,8 +400,8 @@ public void testCreateFilesetWithExceptions() throws IOException {
+ " when it's catalog and schema "
+ "location are not set",
exception.getMessage());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Throwable e =
Assertions.assertThrows(
NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent));
Expand All @@ -416,8 +416,8 @@ public void testCreateFilesetWithExceptions() throws IOException {
Assertions.assertEquals(
"Storage location must be set for external fileset " + filesetIdent,
exception1.getMessage());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Throwable e =
Assertions.assertThrows(
NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent));
Expand All @@ -436,8 +436,8 @@ public void testListFilesets() throws IOException {
createFileset(fileset, schemaName, comment, Fileset.Type.MANAGED, null, null);
}

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Set<NameIdentifier> idents =
Arrays.stream(ops.listFilesets(Namespace.of("m1", "c1", schemaName)))
.collect(Collectors.toSet());
Expand Down Expand Up @@ -467,8 +467,8 @@ public void testRenameFileset(
}

NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(catalogProps);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(catalogProps, null);
if (!ops.schemaExists(schemaIdent)) {
createSchema(schemaName, comment, catalogPath, schemaPath);
}
Expand Down Expand Up @@ -513,8 +513,8 @@ public void testAlterFilesetProperties() throws IOException {
FilesetChange change1 = FilesetChange.setProperty("k1", "v1");
FilesetChange change2 = FilesetChange.removeProperty("k1");

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);

Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
Expand Down Expand Up @@ -578,8 +578,8 @@ public void testUpdateFilesetComment() throws IOException {
Fileset fileset = createFileset(name, schemaName, comment, Fileset.Type.MANAGED, null, null);

FilesetChange change1 = FilesetChange.updateComment("comment26_new");
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);

Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
Expand Down Expand Up @@ -830,8 +830,8 @@ private Schema createSchema(String name, String comment, String catalogPath, Str
props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(props);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(props, null);

NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", name);
Map<String, String> schemaProps = Maps.newHashMap();
Expand Down Expand Up @@ -859,8 +859,8 @@ private Fileset createFileset(
props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(props);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(props, null);

NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);
Map<String, String> filesetProps = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public String shortName() {
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
HiveCatalogOperations ops = new HiveCatalogOperations(entity());
ops.initialize(config);
HiveCatalogOperations ops = new HiveCatalogOperations();
return ops;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas

@VisibleForTesting HiveConf hiveConf;

private final CatalogEntity entity;
private CatalogEntity entity;

private HiveTablePropertiesMetadata tablePropertiesMetadata;

Expand All @@ -111,23 +111,16 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas
PRINCIPAL,
ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);

/**
* Constructs a new instance of HiveCatalogOperations.
*
* @param entity The catalog entity associated with this operations instance.
*/
public HiveCatalogOperations(CatalogEntity entity) {
this.entity = entity;
}

/**
* Initializes the Hive catalog operations with the provided configuration.
*
* @param conf The configuration map for the Hive catalog operations.
* @param entity The catalog entity associated with this operations instance.
* @throws RuntimeException if initialization fails.
*/
@Override
public void initialize(Map<String, String> conf) throws RuntimeException {
public void initialize(Map<String, String> conf, CatalogEntity entity) throws RuntimeException {
this.entity = entity;
this.tablePropertiesMetadata = new HiveTablePropertiesMetadata();
this.catalogPropertiesMetadata = new HiveCatalogPropertiesMeta();
this.schemaPropertiesMetadata = new HiveSchemaPropertiesMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void testListDatabases() throws TException, InterruptedException {
Map<String, String> conf = Maps.newHashMap();
metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue()));

try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) {
ops.initialize(conf);
try (HiveCatalogOperations ops = new HiveCatalogOperations()) {
ops.initialize(conf, entity);
List<String> dbs = ops.clientPool.run(IMetaStoreClient::getAllDatabases);
Assertions.assertEquals(2, dbs.size());
Assertions.assertTrue(dbs.contains("default"));
Expand Down Expand Up @@ -68,8 +68,8 @@ void testCatalogProperty() {

metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue()));

try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) {
ops.initialize(conf);
try (HiveCatalogOperations ops = new HiveCatalogOperations()) {
ops.initialize(conf, entity);
PropertiesMetadata metadata = ops.catalogPropertiesMetadata();

Assertions.assertDoesNotThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.PRINCIPAL;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.PropertyEntry;
import com.google.common.collect.Maps;
import java.util.Map;
Expand All @@ -29,47 +30,48 @@ class TestHiveCatalogOperations {
void testGetClientPoolSize() {
Map<String, String> maps = Maps.newHashMap();
maps.put(CLIENT_POOL_SIZE, "10");
HiveCatalogOperations op = new HiveCatalogOperations(null);
op.initialize(maps);
HiveCatalogOperations op = new HiveCatalogOperations();
op.initialize(maps, null);
Assertions.assertEquals(10, op.getClientPoolSize(maps));

maps.clear();
maps.put(CLIENT_POOL_SIZE + "_wrong_mark", "10");
op = new HiveCatalogOperations(null);
op.initialize(maps);
op = new HiveCatalogOperations();
op.initialize(maps, null);
Assertions.assertNotEquals(10, op.getClientPoolSize(maps));

maps.put(CLIENT_POOL_SIZE, "1");
op = new HiveCatalogOperations(null);
op.initialize(maps);
op = new HiveCatalogOperations();
op.initialize(maps, null);
Assertions.assertEquals(1, op.getClientPoolSize(maps));
}

@Test
void testInitialize() throws NoSuchFieldException, IllegalAccessException {
Map<String, String> properties = Maps.newHashMap();
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null);
hiveCatalogOperations.initialize(properties);
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations();
hiveCatalogOperations.initialize(properties, null);
String v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces");
Assertions.assertEquals("10", v);

// Test If we can override the value in hive-site.xml
properties.put(CATALOG_BYPASS_PREFIX + "mapreduce.job.reduces", "20");
hiveCatalogOperations.initialize(properties);
hiveCatalogOperations.initialize(properties, null);
v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces");
Assertions.assertEquals("20", v);
}

@Test
void testPropertyMeta() {
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null);
hiveCatalogOperations.initialize(Maps.newHashMap());
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations();
hiveCatalogOperations.initialize(Maps.newHashMap(), null);

Map<String, PropertyEntry<?>> propertyEntryMap =
hiveCatalogOperations.catalogPropertiesMetadata().propertyEntries();
Assertions.assertEquals(11, propertyEntryMap.size());
Assertions.assertEquals(12, propertyEntryMap.size());
Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS));
Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE));
Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL));
Assertions.assertTrue(propertyEntryMap.containsKey(CLIENT_POOL_SIZE));
Assertions.assertTrue(propertyEntryMap.containsKey(IMPERSONATION_ENABLE));

Expand Down Expand Up @@ -98,8 +100,8 @@ void testPropertyOverwrite() {
maps.put(METASTORE_URIS, "url1");
maps.put(ConfVars.METASTOREURIS.varname, "url2");
maps.put(CATALOG_BYPASS_PREFIX + ConfVars.METASTOREURIS.varname, "url3");
HiveCatalogOperations op = new HiveCatalogOperations(null);
op.initialize(maps);
HiveCatalogOperations op = new HiveCatalogOperations();
op.initialize(maps, null);

Assertions.assertEquals("v2", op.hiveConf.get("a.b"));
Assertions.assertEquals("v4", op.hiveConf.get("c.d"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.datastrato.gravitino.MetalakeChange;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.auth.AuthConstants;
import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.hive.HiveCatalogOperations;
import com.datastrato.gravitino.catalog.hive.HiveClientPool;
import com.datastrato.gravitino.catalog.hive.HiveSchemaPropertiesMetadata;
import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
Expand Down Expand Up @@ -1364,4 +1366,29 @@ public void testPurgeHiveExternalTable() throws TException, InterruptedException
Assertions.assertTrue(
hdfs.listStatus(tableDirectory).length > 0, "The table should not be empty");
}

@Test
void testCustomCatalogOperations() {
String catalogName = "custom_catalog";
Assertions.assertDoesNotThrow(
() -> createCatalogWithCustomOperation(catalogName, HiveCatalogOperations.class.getName()));
Assertions.assertThrowsExactly(
RuntimeException.class,
() ->
createCatalogWithCustomOperation(
catalogName + "_not_exists", "com.datastrato.gravitino.catalog.not.exists"));
}

private static void createCatalogWithCustomOperation(String catalogName, String customImpl) {
Map<String, String> properties = Maps.newHashMap();
properties.put(METASTORE_URIS, HIVE_METASTORE_URIS);
properties.put(BaseCatalog.CATALOG_OPERATION_IMPL, customImpl);

metalake.createCatalog(
NameIdentifier.of(metalakeName, catalogName),
Catalog.Type.RELATIONAL,
provider,
"comment",
properties);
}
}
Loading

0 comments on commit 94912a7

Please sign in to comment.