Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2455] feat(core): Support pluggable catalog operations #2477

Merged
merged 11 commits into from
Mar 14, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public String shortName() {

@Override
protected CatalogOperations newOps(Map<String, String> config) {
HadoopCatalogOperations ops = new HadoopCatalogOperations(entity());
ops.initialize(config);
HadoopCatalogOperations ops = new HadoopCatalogOperations();
return ops;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem
private static final HadoopFilesetPropertiesMetadata FILESET_PROPERTIES_METADATA =
new HadoopFilesetPropertiesMetadata();

private final CatalogEntity entity;
private CatalogEntity entity;

private final EntityStore store;

Expand All @@ -77,17 +77,17 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem
@VisibleForTesting Optional<Path> catalogStorageLocation;

// For testing only.
HadoopCatalogOperations(CatalogEntity entity, EntityStore store) {
this.entity = entity;
HadoopCatalogOperations(EntityStore store) {
this.store = store;
}

public HadoopCatalogOperations(CatalogEntity entity) {
this(entity, GravitinoEnv.getInstance().entityStore());
public HadoopCatalogOperations() {
this(GravitinoEnv.getInstance().entityStore());
}

@Override
public void initialize(Map<String, String> config) throws RuntimeException {
public void initialize(Map<String, String> config, CatalogEntity entity) throws RuntimeException {
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
this.entity = entity;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we introduce BaseCatalogOperations to extract common behavior like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's common to extract entity store, properties, but the main logic is catalog-specific, I'm not sure whether it's worth to do this.

// Initialize Hadoop Configuration.
this.hadoopConf = new Configuration();
Map<String, String> bypassConfigs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,22 @@ public static void tearDown() throws IOException {
@Test
public void testHadoopCatalogConfiguration() {
Map<String, String> emptyProps = Maps.newHashMap();
HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store);
ops.initialize(emptyProps);
HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
ops.initialize(emptyProps, null);
Configuration conf = ops.hadoopConf;
String value = conf.get("fs.defaultFS");
Assertions.assertEquals("file:///", value);

emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", "hdfs://localhost:9000");
ops.initialize(emptyProps);
ops.initialize(emptyProps, null);
Configuration conf1 = ops.hadoopConf;
String value1 = conf1.get("fs.defaultFS");
Assertions.assertEquals("hdfs://localhost:9000", value1);

Assertions.assertFalse(ops.catalogStorageLocation.isPresent());

emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, "file:///tmp/catalog");
ops.initialize(emptyProps);
ops.initialize(emptyProps, null);
Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
Path expectedPath = new Path("file:///tmp/catalog");
Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
Expand Down Expand Up @@ -198,8 +198,8 @@ public void testLoadSchema() throws IOException {

Assertions.assertEquals(name, schema.name());

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name));
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
Expand All @@ -226,8 +226,8 @@ public void testListSchema() throws IOException {
createSchema(name, comment, null, null);
createSchema(name1, comment1, null, null);

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Set<NameIdentifier> idents =
Arrays.stream(ops.listSchemas(Namespace.of("m1", "c1"))).collect(Collectors.toSet());
Assertions.assertTrue(idents.size() >= 2);
Expand All @@ -244,8 +244,8 @@ public void testAlterSchema() throws IOException {
Schema schema = createSchema(name, comment, catalogPath, null);
Assertions.assertEquals(name, schema.name());

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name));
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
Expand Down Expand Up @@ -294,8 +294,8 @@ public void testDropSchema() throws IOException {
Assertions.assertEquals(name, schema.name());
NameIdentifier id = NameIdentifier.ofSchema("m1", "c1", name);

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath));
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath), null);
Schema schema1 = ops.loadSchema(id);
Assertions.assertEquals(name, schema1.name());
Assertions.assertEquals(comment, schema1.comment());
Expand Down Expand Up @@ -348,8 +348,8 @@ public void testCreateLoadAndDeleteFilesetWithLocations(
}

NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(catalogProps);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(catalogProps, null);
if (!ops.schemaExists(schemaIdent)) {
createSchema(schemaName, comment, catalogPath, schemaPath);
}
Expand Down Expand Up @@ -400,8 +400,8 @@ public void testCreateFilesetWithExceptions() throws IOException {
+ " when it's catalog and schema "
+ "location are not set",
exception.getMessage());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Throwable e =
Assertions.assertThrows(
NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent));
Expand All @@ -416,8 +416,8 @@ public void testCreateFilesetWithExceptions() throws IOException {
Assertions.assertEquals(
"Storage location must be set for external fileset " + filesetIdent,
exception1.getMessage());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Throwable e =
Assertions.assertThrows(
NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent));
Expand All @@ -436,8 +436,8 @@ public void testListFilesets() throws IOException {
createFileset(fileset, schemaName, comment, Fileset.Type.MANAGED, null, null);
}

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
Set<NameIdentifier> idents =
Arrays.stream(ops.listFilesets(Namespace.of("m1", "c1", schemaName)))
.collect(Collectors.toSet());
Expand Down Expand Up @@ -467,8 +467,8 @@ public void testRenameFileset(
}

NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(catalogProps);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(catalogProps, null);
if (!ops.schemaExists(schemaIdent)) {
createSchema(schemaName, comment, catalogPath, schemaPath);
}
Expand Down Expand Up @@ -513,8 +513,8 @@ public void testAlterFilesetProperties() throws IOException {
FilesetChange change1 = FilesetChange.setProperty("k1", "v1");
FilesetChange change2 = FilesetChange.removeProperty("k1");

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);

Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
Expand Down Expand Up @@ -578,8 +578,8 @@ public void testUpdateFilesetComment() throws IOException {
Fileset fileset = createFileset(name, schemaName, comment, Fileset.Type.MANAGED, null, null);

FilesetChange change1 = FilesetChange.updateComment("comment26_new");
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(Maps.newHashMap());
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), null);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);

Fileset fileset1 = ops.alterFileset(filesetIdent, change1);
Expand Down Expand Up @@ -830,8 +830,8 @@ private Schema createSchema(String name, String comment, String catalogPath, Str
props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(props);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(props, null);

NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", name);
Map<String, String> schemaProps = Maps.newHashMap();
Expand Down Expand Up @@ -859,8 +859,8 @@ private Fileset createFileset(
props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
}

try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) {
ops.initialize(props);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) {
ops.initialize(props, null);

NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);
Map<String, String> filesetProps = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public String shortName() {
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
HiveCatalogOperations ops = new HiveCatalogOperations(entity());
ops.initialize(config);
HiveCatalogOperations ops = new HiveCatalogOperations();
return ops;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas

@VisibleForTesting HiveConf hiveConf;

private final CatalogEntity entity;
private CatalogEntity entity;

private HiveTablePropertiesMetadata tablePropertiesMetadata;

Expand All @@ -111,23 +111,16 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas
PRINCIPAL,
ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);

/**
* Constructs a new instance of HiveCatalogOperations.
*
* @param entity The catalog entity associated with this operations instance.
*/
public HiveCatalogOperations(CatalogEntity entity) {
this.entity = entity;
}

/**
* Initializes the Hive catalog operations with the provided configuration.
*
* @param conf The configuration map for the Hive catalog operations.
* @param entity The catalog entity associated with this operations instance.
* @throws RuntimeException if initialization fails.
*/
@Override
public void initialize(Map<String, String> conf) throws RuntimeException {
public void initialize(Map<String, String> conf, CatalogEntity entity) throws RuntimeException {
this.entity = entity;
this.tablePropertiesMetadata = new HiveTablePropertiesMetadata();
this.catalogPropertiesMetadata = new HiveCatalogPropertiesMeta();
this.schemaPropertiesMetadata = new HiveSchemaPropertiesMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void testListDatabases() throws TException, InterruptedException {
Map<String, String> conf = Maps.newHashMap();
metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue()));

try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) {
ops.initialize(conf);
try (HiveCatalogOperations ops = new HiveCatalogOperations()) {
ops.initialize(conf, entity);
List<String> dbs = ops.clientPool.run(IMetaStoreClient::getAllDatabases);
Assertions.assertEquals(2, dbs.size());
Assertions.assertTrue(dbs.contains("default"));
Expand Down Expand Up @@ -68,8 +68,8 @@ void testCatalogProperty() {

metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue()));

try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) {
ops.initialize(conf);
try (HiveCatalogOperations ops = new HiveCatalogOperations()) {
ops.initialize(conf, entity);
PropertiesMetadata metadata = ops.catalogPropertiesMetadata();

Assertions.assertDoesNotThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.PRINCIPAL;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.PropertyEntry;
import com.google.common.collect.Maps;
import java.util.Map;
Expand All @@ -29,47 +30,48 @@ class TestHiveCatalogOperations {
void testGetClientPoolSize() {
Map<String, String> maps = Maps.newHashMap();
maps.put(CLIENT_POOL_SIZE, "10");
HiveCatalogOperations op = new HiveCatalogOperations(null);
op.initialize(maps);
HiveCatalogOperations op = new HiveCatalogOperations();
op.initialize(maps, null);
Assertions.assertEquals(10, op.getClientPoolSize(maps));

maps.clear();
maps.put(CLIENT_POOL_SIZE + "_wrong_mark", "10");
op = new HiveCatalogOperations(null);
op.initialize(maps);
op = new HiveCatalogOperations();
op.initialize(maps, null);
Assertions.assertNotEquals(10, op.getClientPoolSize(maps));

maps.put(CLIENT_POOL_SIZE, "1");
op = new HiveCatalogOperations(null);
op.initialize(maps);
op = new HiveCatalogOperations();
op.initialize(maps, null);
Assertions.assertEquals(1, op.getClientPoolSize(maps));
}

@Test
void testInitialize() throws NoSuchFieldException, IllegalAccessException {
Map<String, String> properties = Maps.newHashMap();
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null);
hiveCatalogOperations.initialize(properties);
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations();
hiveCatalogOperations.initialize(properties, null);
String v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces");
Assertions.assertEquals("10", v);

// Test If we can override the value in hive-site.xml
properties.put(CATALOG_BYPASS_PREFIX + "mapreduce.job.reduces", "20");
hiveCatalogOperations.initialize(properties);
hiveCatalogOperations.initialize(properties, null);
v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces");
Assertions.assertEquals("20", v);
}

@Test
void testPropertyMeta() {
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null);
hiveCatalogOperations.initialize(Maps.newHashMap());
HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations();
hiveCatalogOperations.initialize(Maps.newHashMap(), null);

Map<String, PropertyEntry<?>> propertyEntryMap =
hiveCatalogOperations.catalogPropertiesMetadata().propertyEntries();
Assertions.assertEquals(11, propertyEntryMap.size());
Assertions.assertEquals(12, propertyEntryMap.size());
Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS));
Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE));
Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL));
Assertions.assertTrue(propertyEntryMap.containsKey(CLIENT_POOL_SIZE));
Assertions.assertTrue(propertyEntryMap.containsKey(IMPERSONATION_ENABLE));

Expand Down Expand Up @@ -98,8 +100,8 @@ void testPropertyOverwrite() {
maps.put(METASTORE_URIS, "url1");
maps.put(ConfVars.METASTOREURIS.varname, "url2");
maps.put(CATALOG_BYPASS_PREFIX + ConfVars.METASTOREURIS.varname, "url3");
HiveCatalogOperations op = new HiveCatalogOperations(null);
op.initialize(maps);
HiveCatalogOperations op = new HiveCatalogOperations();
op.initialize(maps, null);

Assertions.assertEquals("v2", op.hiveConf.get("a.b"));
Assertions.assertEquals("v4", op.hiveConf.get("c.d"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.datastrato.gravitino.MetalakeChange;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.auth.AuthConstants;
import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.hive.HiveCatalogOperations;
import com.datastrato.gravitino.catalog.hive.HiveClientPool;
import com.datastrato.gravitino.catalog.hive.HiveSchemaPropertiesMetadata;
import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
Expand Down Expand Up @@ -1364,4 +1366,29 @@ public void testPurgeHiveExternalTable() throws TException, InterruptedException
Assertions.assertTrue(
hdfs.listStatus(tableDirectory).length > 0, "The table should not be empty");
}

@Test
void testCustomCatalogOperations() {
String catalogName = "custom_catalog";
Assertions.assertDoesNotThrow(
() -> createCatalogWithCustomOperation(catalogName, HiveCatalogOperations.class.getName()));
Assertions.assertThrowsExactly(
RuntimeException.class,
() ->
createCatalogWithCustomOperation(
catalogName + "_not_exists", "com.datastrato.gravitino.catalog.not.exists"));
}

private static void createCatalogWithCustomOperation(String catalogName, String customImpl) {
Map<String, String> properties = Maps.newHashMap();
properties.put(METASTORE_URIS, HIVE_METASTORE_URIS);
properties.put(BaseCatalog.CATALOG_OPERATION_IMPL, customImpl);

metalake.createCatalog(
NameIdentifier.of(metalakeName, catalogName),
Catalog.Type.RELATIONAL,
provider,
"comment",
properties);
}
}
Loading
Loading