From 94912a71ab70b9084aa2021689fbff413fb47946 Mon Sep 17 00:00:00 2001 From: FANNG Date: Thu, 14 Mar 2024 23:46:50 +0800 Subject: [PATCH] [#2455] feat(core): Support pluggable catalog operations (#2477) ### What changes were proposed in this pull request? 1. add a hacky catalog config 'ops-impl' to specify the custom catalog operation 2. some changes to CatalogOperation a. move `entity` from XXCatalogOperation constructor to initialize(), to construct custom catalog operation using reflect easily. b. move CatalogOperation#initialize() from XXCatalogOperation#newOps() to BaseCatalog#Ops(), because initialize is the API, it should be called explicitly by the framework not by specific XXCatalogOperation. ### Why are the changes needed? Fix: #2455 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UT and IT --- .../catalog/hadoop/HadoopCatalog.java | 3 +- .../hadoop/HadoopCatalogOperations.java | 12 ++-- .../hadoop/TestHadoopCatalogOperations.java | 60 +++++++++---------- .../gravitino/catalog/hive/HiveCatalog.java | 3 +- .../catalog/hive/HiveCatalogOperations.java | 15 ++--- .../catalog/hive/TestHiveCatalog.java | 8 +-- .../hive/TestHiveCatalogOperations.java | 30 +++++----- .../hive/integration/test/CatalogHiveIT.java | 27 +++++++++ .../gravitino/catalog/jdbc/JdbcCatalog.java | 2 - .../catalog/jdbc/JdbcCatalogOperations.java | 9 ++- .../lakehouse/iceberg/IcebergCatalog.java | 3 +- .../iceberg/IcebergCatalogOperations.java | 15 ++--- .../lakehouse/iceberg/TestIcebergCatalog.java | 6 +- .../lakehouse/iceberg/TestIcebergSchema.java | 4 +- .../lakehouse/iceberg/TestIcebergTable.java | 4 +- .../gravitino/catalog/kafka/KafkaCatalog.java | 3 +- .../catalog/kafka/KafkaCatalogOperations.java | 8 +-- .../gravitino/catalog/BaseCatalog.java | 32 +++++++++- .../BaseCatalogPropertiesMetadata.java | 7 +++ .../gravitino/catalog/CatalogOperations.java | 4 +- .../gravitino/TestCatalogOperations.java | 4 +- .../catalog/DummyCatalogOperations.java | 45 ++++++++++++++ .../gravitino/catalog/TestBaseCatalog.java | 36 +++++++++++ .../server/web/rest/TestCatalog.java | 4 +- 24 files changed, 238 insertions(+), 106 deletions(-) create mode 100644 core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java create mode 100644 core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java index 6aef14bb74e..2ec9b26b827 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java @@ -24,8 +24,7 @@ public String shortName() { @Override protected CatalogOperations newOps(Map config) { - HadoopCatalogOperations ops = new HadoopCatalogOperations(entity()); - ops.initialize(config); + HadoopCatalogOperations ops = new HadoopCatalogOperations(); return ops; } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index d0e48745eb1..22281931112 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -68,7 +68,7 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem private static final HadoopFilesetPropertiesMetadata FILESET_PROPERTIES_METADATA = new HadoopFilesetPropertiesMetadata(); - private final CatalogEntity entity; + private CatalogEntity entity; private final EntityStore store; @@ -77,17 +77,17 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem @VisibleForTesting Optional catalogStorageLocation; // For testing only. - HadoopCatalogOperations(CatalogEntity entity, EntityStore store) { - this.entity = entity; + HadoopCatalogOperations(EntityStore store) { this.store = store; } - public HadoopCatalogOperations(CatalogEntity entity) { - this(entity, GravitinoEnv.getInstance().entityStore()); + public HadoopCatalogOperations() { + this(GravitinoEnv.getInstance().entityStore()); } @Override - public void initialize(Map config) throws RuntimeException { + public void initialize(Map config, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // Initialize Hadoop Configuration. this.hadoopConf = new Configuration(); Map bypassConfigs = diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java index e3aadbc06c3..8cb67450a56 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java @@ -98,14 +98,14 @@ public static void tearDown() throws IOException { @Test public void testHadoopCatalogConfiguration() { Map emptyProps = Maps.newHashMap(); - HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store); - ops.initialize(emptyProps); + HadoopCatalogOperations ops = new HadoopCatalogOperations(store); + ops.initialize(emptyProps, null); Configuration conf = ops.hadoopConf; String value = conf.get("fs.defaultFS"); Assertions.assertEquals("file:///", value); emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", "hdfs://localhost:9000"); - ops.initialize(emptyProps); + ops.initialize(emptyProps, null); Configuration conf1 = ops.hadoopConf; String value1 = conf1.get("fs.defaultFS"); Assertions.assertEquals("hdfs://localhost:9000", value1); @@ -113,7 +113,7 @@ public void testHadoopCatalogConfiguration() { Assertions.assertFalse(ops.catalogStorageLocation.isPresent()); emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, "file:///tmp/catalog"); - ops.initialize(emptyProps); + ops.initialize(emptyProps, null); Assertions.assertTrue(ops.catalogStorageLocation.isPresent()); Path expectedPath = new Path("file:///tmp/catalog"); Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get()); @@ -198,8 +198,8 @@ public void testLoadSchema() throws IOException { Assertions.assertEquals(name, schema.name()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name)); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -226,8 +226,8 @@ public void testListSchema() throws IOException { createSchema(name, comment, null, null); createSchema(name1, comment1, null, null); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Set idents = Arrays.stream(ops.listSchemas(Namespace.of("m1", "c1"))).collect(Collectors.toSet()); Assertions.assertTrue(idents.size() >= 2); @@ -244,8 +244,8 @@ public void testAlterSchema() throws IOException { Schema schema = createSchema(name, comment, catalogPath, null); Assertions.assertEquals(name, schema.name()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name)); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -294,8 +294,8 @@ public void testDropSchema() throws IOException { Assertions.assertEquals(name, schema.name()); NameIdentifier id = NameIdentifier.ofSchema("m1", "c1", name); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath)); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath), null); Schema schema1 = ops.loadSchema(id); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -348,8 +348,8 @@ public void testCreateLoadAndDeleteFilesetWithLocations( } NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(catalogProps); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(catalogProps, null); if (!ops.schemaExists(schemaIdent)) { createSchema(schemaName, comment, catalogPath, schemaPath); } @@ -400,8 +400,8 @@ public void testCreateFilesetWithExceptions() throws IOException { + " when it's catalog and schema " + "location are not set", exception.getMessage()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Throwable e = Assertions.assertThrows( NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent)); @@ -416,8 +416,8 @@ public void testCreateFilesetWithExceptions() throws IOException { Assertions.assertEquals( "Storage location must be set for external fileset " + filesetIdent, exception1.getMessage()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Throwable e = Assertions.assertThrows( NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent)); @@ -436,8 +436,8 @@ public void testListFilesets() throws IOException { createFileset(fileset, schemaName, comment, Fileset.Type.MANAGED, null, null); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Set idents = Arrays.stream(ops.listFilesets(Namespace.of("m1", "c1", schemaName))) .collect(Collectors.toSet()); @@ -467,8 +467,8 @@ public void testRenameFileset( } NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(catalogProps); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(catalogProps, null); if (!ops.schemaExists(schemaIdent)) { createSchema(schemaName, comment, catalogPath, schemaPath); } @@ -513,8 +513,8 @@ public void testAlterFilesetProperties() throws IOException { FilesetChange change1 = FilesetChange.setProperty("k1", "v1"); FilesetChange change2 = FilesetChange.removeProperty("k1"); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Fileset fileset1 = ops.alterFileset(filesetIdent, change1); @@ -578,8 +578,8 @@ public void testUpdateFilesetComment() throws IOException { Fileset fileset = createFileset(name, schemaName, comment, Fileset.Type.MANAGED, null, null); FilesetChange change1 = FilesetChange.updateComment("comment26_new"); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Fileset fileset1 = ops.alterFileset(filesetIdent, change1); @@ -830,8 +830,8 @@ private Schema createSchema(String name, String comment, String catalogPath, Str props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(props); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(props, null); NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", name); Map schemaProps = Maps.newHashMap(); @@ -859,8 +859,8 @@ private Fileset createFileset( props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(props); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(props, null); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Map filesetProps = Maps.newHashMap(); diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java index 8e12ba0beb3..57b85d03737 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java @@ -33,8 +33,7 @@ public String shortName() { */ @Override protected CatalogOperations newOps(Map config) { - HiveCatalogOperations ops = new HiveCatalogOperations(entity()); - ops.initialize(config); + HiveCatalogOperations ops = new HiveCatalogOperations(); return ops; } diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java index 48dac74a301..08d1c227a9a 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java @@ -89,7 +89,7 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas @VisibleForTesting HiveConf hiveConf; - private final CatalogEntity entity; + private CatalogEntity entity; private HiveTablePropertiesMetadata tablePropertiesMetadata; @@ -111,23 +111,16 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas PRINCIPAL, ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); - /** - * Constructs a new instance of HiveCatalogOperations. - * - * @param entity The catalog entity associated with this operations instance. - */ - public HiveCatalogOperations(CatalogEntity entity) { - this.entity = entity; - } - /** * Initializes the Hive catalog operations with the provided configuration. * * @param conf The configuration map for the Hive catalog operations. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if initialization fails. */ @Override - public void initialize(Map conf) throws RuntimeException { + public void initialize(Map conf, CatalogEntity entity) throws RuntimeException { + this.entity = entity; this.tablePropertiesMetadata = new HiveTablePropertiesMetadata(); this.catalogPropertiesMetadata = new HiveCatalogPropertiesMeta(); this.schemaPropertiesMetadata = new HiveSchemaPropertiesMetadata(); diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java index 715cba578b3..ab349b7e256 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java @@ -39,8 +39,8 @@ public void testListDatabases() throws TException, InterruptedException { Map conf = Maps.newHashMap(); metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue())); - try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) { - ops.initialize(conf); + try (HiveCatalogOperations ops = new HiveCatalogOperations()) { + ops.initialize(conf, entity); List dbs = ops.clientPool.run(IMetaStoreClient::getAllDatabases); Assertions.assertEquals(2, dbs.size()); Assertions.assertTrue(dbs.contains("default")); @@ -68,8 +68,8 @@ void testCatalogProperty() { metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue())); - try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) { - ops.initialize(conf); + try (HiveCatalogOperations ops = new HiveCatalogOperations()) { + ops.initialize(conf, entity); PropertiesMetadata metadata = ops.catalogPropertiesMetadata(); Assertions.assertDoesNotThrow( diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java index 09cb7670a79..5833931a048 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java @@ -16,6 +16,7 @@ import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.PRINCIPAL; import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.catalog.BaseCatalog; import com.datastrato.gravitino.catalog.PropertyEntry; import com.google.common.collect.Maps; import java.util.Map; @@ -29,47 +30,48 @@ class TestHiveCatalogOperations { void testGetClientPoolSize() { Map maps = Maps.newHashMap(); maps.put(CLIENT_POOL_SIZE, "10"); - HiveCatalogOperations op = new HiveCatalogOperations(null); - op.initialize(maps); + HiveCatalogOperations op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertEquals(10, op.getClientPoolSize(maps)); maps.clear(); maps.put(CLIENT_POOL_SIZE + "_wrong_mark", "10"); - op = new HiveCatalogOperations(null); - op.initialize(maps); + op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertNotEquals(10, op.getClientPoolSize(maps)); maps.put(CLIENT_POOL_SIZE, "1"); - op = new HiveCatalogOperations(null); - op.initialize(maps); + op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertEquals(1, op.getClientPoolSize(maps)); } @Test void testInitialize() throws NoSuchFieldException, IllegalAccessException { Map properties = Maps.newHashMap(); - HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null); - hiveCatalogOperations.initialize(properties); + HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(); + hiveCatalogOperations.initialize(properties, null); String v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces"); Assertions.assertEquals("10", v); // Test If we can override the value in hive-site.xml properties.put(CATALOG_BYPASS_PREFIX + "mapreduce.job.reduces", "20"); - hiveCatalogOperations.initialize(properties); + hiveCatalogOperations.initialize(properties, null); v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces"); Assertions.assertEquals("20", v); } @Test void testPropertyMeta() { - HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null); - hiveCatalogOperations.initialize(Maps.newHashMap()); + HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(); + hiveCatalogOperations.initialize(Maps.newHashMap(), null); Map> propertyEntryMap = hiveCatalogOperations.catalogPropertiesMetadata().propertyEntries(); - Assertions.assertEquals(11, propertyEntryMap.size()); + Assertions.assertEquals(12, propertyEntryMap.size()); Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS)); Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE)); + Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL)); Assertions.assertTrue(propertyEntryMap.containsKey(CLIENT_POOL_SIZE)); Assertions.assertTrue(propertyEntryMap.containsKey(IMPERSONATION_ENABLE)); @@ -98,8 +100,8 @@ void testPropertyOverwrite() { maps.put(METASTORE_URIS, "url1"); maps.put(ConfVars.METASTOREURIS.varname, "url2"); maps.put(CATALOG_BYPASS_PREFIX + ConfVars.METASTOREURIS.varname, "url3"); - HiveCatalogOperations op = new HiveCatalogOperations(null); - op.initialize(maps); + HiveCatalogOperations op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertEquals("v2", op.hiveConf.get("a.b")); Assertions.assertEquals("v4", op.hiveConf.get("c.d")); diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java index c99adf6c495..f3663f15a02 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java @@ -32,6 +32,8 @@ import com.datastrato.gravitino.MetalakeChange; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.auth.AuthConstants; +import com.datastrato.gravitino.catalog.BaseCatalog; +import com.datastrato.gravitino.catalog.hive.HiveCatalogOperations; import com.datastrato.gravitino.catalog.hive.HiveClientPool; import com.datastrato.gravitino.catalog.hive.HiveSchemaPropertiesMetadata; import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; @@ -1364,4 +1366,29 @@ public void testPurgeHiveExternalTable() throws TException, InterruptedException Assertions.assertTrue( hdfs.listStatus(tableDirectory).length > 0, "The table should not be empty"); } + + @Test + void testCustomCatalogOperations() { + String catalogName = "custom_catalog"; + Assertions.assertDoesNotThrow( + () -> createCatalogWithCustomOperation(catalogName, HiveCatalogOperations.class.getName())); + Assertions.assertThrowsExactly( + RuntimeException.class, + () -> + createCatalogWithCustomOperation( + catalogName + "_not_exists", "com.datastrato.gravitino.catalog.not.exists")); + } + + private static void createCatalogWithCustomOperation(String catalogName, String customImpl) { + Map properties = Maps.newHashMap(); + properties.put(METASTORE_URIS, HIVE_METASTORE_URIS); + properties.put(BaseCatalog.CATALOG_OPERATION_IMPL, customImpl); + + metalake.createCatalog( + NameIdentifier.of(metalakeName, catalogName), + Catalog.Type.RELATIONAL, + provider, + "comment", + properties); + } } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java index 758180a972c..a638dfb5945 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java @@ -31,14 +31,12 @@ protected CatalogOperations newOps(Map config) { JdbcTypeConverter jdbcTypeConverter = createJdbcTypeConverter(); JdbcCatalogOperations ops = new JdbcCatalogOperations( - entity(), createExceptionConverter(), jdbcTypeConverter, createJdbcDatabaseOperations(), createJdbcTableOperations(), createJdbcTablePropertiesMetadata(), createJdbcColumnDefaultValueConverter()); - ops.initialize(config); return ops; } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java index 8714ba36ec2..828ad8b1c21 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java @@ -70,7 +70,7 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas private JdbcSchemaPropertiesMetadata jdbcSchemaPropertiesMetadata; - private final CatalogEntity entity; + private CatalogEntity entity; private final JdbcExceptionConverter exceptionConverter; @@ -87,7 +87,6 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas /** * Constructs a new instance of JdbcCatalogOperations. * - * @param entity The catalog entity associated with this operations instance. * @param exceptionConverter The exception converter to be used by the operations. * @param jdbcTypeConverter The type converter to be used by the operations. * @param databaseOperation The database operations to be used by the operations. @@ -95,14 +94,12 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas * @param jdbcTablePropertiesMetadata The table properties metadata to be used by the operations. */ public JdbcCatalogOperations( - CatalogEntity entity, JdbcExceptionConverter exceptionConverter, JdbcTypeConverter jdbcTypeConverter, JdbcDatabaseOperations databaseOperation, JdbcTableOperations tableOperation, JdbcTablePropertiesMetadata jdbcTablePropertiesMetadata, JdbcColumnDefaultValueConverter columnDefaultValueConverter) { - this.entity = entity; this.exceptionConverter = exceptionConverter; this.jdbcTypeConverter = jdbcTypeConverter; this.databaseOperation = databaseOperation; @@ -115,10 +112,12 @@ public JdbcCatalogOperations( * Initializes the Jdbc catalog operations with the provided configuration. * * @param conf The configuration map for the Jdbc catalog operations. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if initialization fails. */ @Override - public void initialize(Map conf) throws RuntimeException { + public void initialize(Map conf, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // Key format like gravitino.bypass.a.b Map prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java index db84b4b3b20..963e5d77bb9 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java @@ -27,8 +27,7 @@ public String shortName() { */ @Override protected CatalogOperations newOps(Map config) { - IcebergCatalogOperations ops = new IcebergCatalogOperations(entity()); - ops.initialize(config); + IcebergCatalogOperations ops = new IcebergCatalogOperations(); return ops; } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index b197da20f29..7baa6204fe6 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -72,27 +72,20 @@ public class IcebergCatalogOperations implements CatalogOperations, SupportsSche private IcebergSchemaPropertiesMetadata icebergSchemaPropertiesMetadata; - private final CatalogEntity entity; + private CatalogEntity entity; private IcebergTableOpsHelper icebergTableOpsHelper; - /** - * Constructs a new instance of IcebergCatalogOperations. - * - * @param entity The catalog entity associated with this operations instance. - */ - public IcebergCatalogOperations(CatalogEntity entity) { - this.entity = entity; - } - /** * Initializes the Iceberg catalog operations with the provided configuration. * * @param conf The configuration map for the Iceberg catalog operations. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if initialization fails. */ @Override - public void initialize(Map conf) throws RuntimeException { + public void initialize(Map conf, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // Key format like gravitino.bypass.a.b Map prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java index 84603958cbb..aec05a9e6d7 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java @@ -36,7 +36,7 @@ public void testListDatabases() { Map conf = Maps.newHashMap(); IcebergCatalog icebergCatalog = new IcebergCatalog().withCatalogConf(conf).withCatalogEntity(entity); - CatalogOperations catalogOperations = icebergCatalog.newOps(Maps.newHashMap()); + CatalogOperations catalogOperations = icebergCatalog.ops(); Assertions.assertTrue(catalogOperations instanceof IcebergCatalogOperations); IcebergTableOps icebergTableOps = @@ -63,8 +63,8 @@ void testCatalogProperty() { Map conf = Maps.newHashMap(); - try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) { - ops.initialize(conf); + try (IcebergCatalogOperations ops = new IcebergCatalogOperations()) { + ops.initialize(conf, entity); Map map1 = Maps.newHashMap(); map1.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, "test"); PropertiesMetadata metadata = ops.catalogPropertiesMetadata(); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java index 6e3d08d6519..3c34c7ae098 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java @@ -144,8 +144,8 @@ void testSchemaProperty() { Map conf = Maps.newHashMap(); - try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) { - ops.initialize(conf); + try (IcebergCatalogOperations ops = new IcebergCatalogOperations()) { + ops.initialize(conf, entity); Map map = Maps.newHashMap(); map.put(IcebergSchemaPropertiesMetadata.COMMENT, "test"); PropertiesMetadata metadata = ops.schemaPropertiesMetadata(); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java index d20c782a12d..590a603717b 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java @@ -501,8 +501,8 @@ public void testAlterIcebergTable() { @Test public void testTableProperty() { CatalogEntity entity = createDefaultCatalogEntity(); - try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) { - ops.initialize(Maps.newHashMap()); + try (IcebergCatalogOperations ops = new IcebergCatalogOperations()) { + ops.initialize(Maps.newHashMap(), entity); Map map = Maps.newHashMap(); map.put(IcebergTablePropertiesMetadata.COMMENT, "test"); map.put(IcebergTablePropertiesMetadata.CREATOR, "test"); diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java index fe2861c7947..ae5c6d90491 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java @@ -21,8 +21,7 @@ public String shortName() { @Override protected CatalogOperations newOps(Map config) { - KafkaCatalogOperations ops = new KafkaCatalogOperations(entity()); - ops.initialize(config); + KafkaCatalogOperations ops = new KafkaCatalogOperations(); return ops; } diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java index b7ee5e934fb..50bd60ca93b 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java @@ -39,16 +39,16 @@ public class KafkaCatalogOperations implements CatalogOperations, SupportsSchema private static final KafkaTopicPropertiesMetadata TOPIC_PROPERTIES_METADATA = new KafkaTopicPropertiesMetadata(); - private final CatalogEntity entity; private final EntityStore store; + private CatalogEntity entity; - public KafkaCatalogOperations(CatalogEntity entity) { - this.entity = entity; + public KafkaCatalogOperations() { this.store = GravitinoEnv.getInstance().entityStore(); } @Override - public void initialize(Map config) throws RuntimeException { + public void initialize(Map config, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // TODO: Implement Kafka catalog initialization, such as creating a default schema. } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index 4090f939c49..9cb936b345d 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -8,10 +8,13 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.CatalogProvider; import com.datastrato.gravitino.meta.CatalogEntity; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.Map; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The abstract base class for Catalog implementations. @@ -27,6 +30,11 @@ */ public abstract class BaseCatalog implements Catalog, CatalogProvider, HasPropertyMetadata { + private static final Logger LOG = LoggerFactory.getLogger(BaseCatalog.class); + + // A hack way to inject custom operation to Gravitino, the object you used is not stable, don't + // use it unless you know what you are doing. + @VisibleForTesting public static final String CATALOG_OPERATION_IMPL = "ops-impl"; private CatalogEntity entity; @@ -91,7 +99,8 @@ public CatalogOperations ops() { if (ops == null) { Preconditions.checkArgument( entity != null && conf != null, "entity and conf must be set before calling ops()"); - CatalogOperations newOps = newOps(conf); + CatalogOperations newOps = createOps(conf); + newOps.initialize(conf, entity); ops = newProxyPlugin(conf) .map( @@ -106,6 +115,27 @@ public CatalogOperations ops() { return ops; } + private CatalogOperations createOps(Map conf) { + String customCatalogOperationClass = conf.get(CATALOG_OPERATION_IMPL); + return Optional.ofNullable(customCatalogOperationClass) + .map(className -> loadCustomOps(className)) + .orElse(newOps(conf)); + } + + private CatalogOperations loadCustomOps(String className) { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + try { + // Class.forName use classloader of the caller class (BaseCatalog.class), it's global + // classloader not the catalog specific classloader, so we must specify the classloader + // explicitly. + return (CatalogOperations) + Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance(); + } catch (Exception e) { + LOG.error("Failed to load custom catalog operations, {}", className, e); + throw new RuntimeException(e); + } + } + /** * Sets the CatalogEntity for this catalog. * diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java index 699f2eafb7e..e7fae916266 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java @@ -22,6 +22,13 @@ public abstract class BaseCatalogPropertiesMetadata extends BasePropertiesMetada false, null, false, + false), + PropertyEntry.stringImmutablePropertyEntry( + BaseCatalog.CATALOG_OPERATION_IMPL, + "The classname of custom catalog operation to replace the default implementation", + false, + null, + false, false)), PropertyEntry::getName); } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java index 91fd6199e9d..88efcb1bf72 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java @@ -4,6 +4,7 @@ */ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.SupportsSchemas; import com.datastrato.gravitino.rel.TableCatalog; import java.io.Closeable; @@ -23,7 +24,8 @@ public interface CatalogOperations extends Closeable, HasPropertyMetadata { * if the initialization failed. * * @param config The configuration of this Catalog. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if the initialization failed. */ - void initialize(Map config) throws RuntimeException; + void initialize(Map config, CatalogEntity entity) throws RuntimeException; } diff --git a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java index 28541dc3e97..31f625aa7e2 100644 --- a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java +++ b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java @@ -20,6 +20,7 @@ import com.datastrato.gravitino.file.FilesetCatalog; import com.datastrato.gravitino.file.FilesetChange; import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.Schema; import com.datastrato.gravitino.rel.SchemaChange; @@ -71,7 +72,8 @@ public TestCatalogOperations(Map config) { } @Override - public void initialize(Map config) throws RuntimeException {} + public void initialize(Map config, CatalogEntity entity) + throws RuntimeException {} @Override public void close() throws IOException {} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java b/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java new file mode 100644 index 00000000000..726de4e1da4 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.meta.CatalogEntity; +import java.io.IOException; +import java.util.Map; + +public class DummyCatalogOperations implements CatalogOperations { + + @Override + public void initialize(Map config, CatalogEntity entity) + throws RuntimeException {} + + @Override + public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public void close() throws IOException {} +} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java new file mode 100644 index 00000000000..9003d5d8969 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.TestCatalog; +import com.datastrato.gravitino.TestCatalogOperations; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestBaseCatalog { + + @Test + void testCustomCatalogOperations() { + CatalogEntity entity = Mockito.mock(CatalogEntity.class); + + TestCatalog catalog = + new TestCatalog().withCatalogConf(ImmutableMap.of()).withCatalogEntity(entity); + CatalogOperations testCatalogOperations = catalog.ops(); + Assertions.assertTrue(testCatalogOperations instanceof TestCatalogOperations); + + TestCatalog catalog2 = + new TestCatalog() + .withCatalogConf( + ImmutableMap.of( + BaseCatalog.CATALOG_OPERATION_IMPL, DummyCatalogOperations.class.getName())) + .withCatalogEntity(entity); + CatalogOperations dummyCatalogOperations = catalog2.ops(); + Assertions.assertTrue(dummyCatalogOperations instanceof DummyCatalogOperations); + } +} diff --git a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java index 7934a4fa50e..90139f181f8 100644 --- a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java +++ b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.catalog.BaseCatalog; import com.datastrato.gravitino.catalog.CatalogOperations; import com.datastrato.gravitino.catalog.PropertiesMetadata; +import com.datastrato.gravitino.meta.CatalogEntity; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; @@ -41,7 +42,8 @@ public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationE } @Override - public void initialize(Map config) throws RuntimeException {} + public void initialize(Map config, CatalogEntity entity) + throws RuntimeException {} @Override public void close() throws IOException {}