From 56ad65531ea57770f11ca7f6c3c621ed1ddff65d Mon Sep 17 00:00:00 2001 From: fanng Date: Fri, 8 Mar 2024 11:58:43 +0800 Subject: [PATCH 01/11] support pluggable catalog operations --- .../com/datastrato/gravitino/Catalog.java | 16 +++++ .../catalog/hadoop/HadoopCatalog.java | 3 +- .../hadoop/HadoopCatalogOperations.java | 12 ++-- .../hadoop/TestHadoopCatalogOperations.java | 60 +++++++++---------- .../gravitino/catalog/hive/HiveCatalog.java | 3 +- .../catalog/hive/HiveCatalogOperations.java | 15 ++--- .../catalog/hive/TestHiveCatalog.java | 8 +-- .../hive/TestHiveCatalogOperations.java | 30 +++++----- .../hive/integration/test/CatalogHiveIT.java | 45 ++++++++++++++ .../gravitino/catalog/jdbc/JdbcCatalog.java | 2 - .../catalog/jdbc/JdbcCatalogOperations.java | 9 ++- .../lakehouse/iceberg/IcebergCatalog.java | 3 +- .../iceberg/IcebergCatalogOperations.java | 15 ++--- .../lakehouse/iceberg/TestIcebergCatalog.java | 6 +- .../lakehouse/iceberg/TestIcebergSchema.java | 4 +- .../lakehouse/iceberg/TestIcebergTable.java | 4 +- .../datastrato/gravitino/utils/PathUtils.java | 37 ++++++++++++ .../gravitino/utils/TestPathUtils.java | 27 +++++++++ .../gravitino/catalog/BaseCatalog.java | 26 +++++++- .../BaseCatalogPropertiesMetadata.java | 16 +++++ .../gravitino/catalog/CatalogManager.java | 29 +++++++-- .../gravitino/catalog/CatalogOperations.java | 4 +- .../gravitino/TestCatalogOperations.java | 4 +- .../catalog/DummyCatalogOperations.java | 40 +++++++++++++ .../gravitino/catalog/TestBaseCatalog.java | 37 ++++++++++++ .../integration/test/util/ITUtils.java | 6 ++ .../server/web/rest/TestCatalog.java | 4 +- 27 files changed, 361 insertions(+), 104 deletions(-) create mode 100644 common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java create mode 100644 common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java create mode 100644 core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java create mode 100644 core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java diff --git a/api/src/main/java/com/datastrato/gravitino/Catalog.java b/api/src/main/java/com/datastrato/gravitino/Catalog.java index e9339c6a6c3..56538f298c1 100644 --- a/api/src/main/java/com/datastrato/gravitino/Catalog.java +++ b/api/src/main/java/com/datastrato/gravitino/Catalog.java @@ -41,6 +41,22 @@ enum Type { */ String PROPERTY_PACKAGE = "package"; + /** + * Custom catalog operation class name. It is recommended to extend current Gravitino catalog + * operations like HiveCatalogOperation, and override interfaces like purgeTable, and + * listPartition which should be hacked, but this is not a must. It only needs to implement + * CatalogOperations to initialize itself, implement SupportsSchemas to do schema ops, and + * TableCatalog to do table ops if necessary. + */ + String CATALOG_OPERATION_CLASS_NAME = "catalog.operation.custom.class-name"; + + /** + * Custom catalog operation class path. It will be added to the catalog classpath if both custom + * catalog operation class name and class path are not empty. The path could be absolute or + * relative, if relative, it's supposed to under Gravitino Home directory. + */ + String CATALOG_OPERATION_CLASS_PATH = "catalog.operation.custom.classpath"; + /** @return The name of the catalog. */ String name(); diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java index 6aef14bb74e..2ec9b26b827 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java @@ -24,8 +24,7 @@ public String shortName() { @Override protected CatalogOperations newOps(Map config) { - HadoopCatalogOperations ops = new HadoopCatalogOperations(entity()); - ops.initialize(config); + HadoopCatalogOperations ops = new HadoopCatalogOperations(); return ops; } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index d0e48745eb1..22281931112 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -68,7 +68,7 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem private static final HadoopFilesetPropertiesMetadata FILESET_PROPERTIES_METADATA = new HadoopFilesetPropertiesMetadata(); - private final CatalogEntity entity; + private CatalogEntity entity; private final EntityStore store; @@ -77,17 +77,17 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem @VisibleForTesting Optional catalogStorageLocation; // For testing only. - HadoopCatalogOperations(CatalogEntity entity, EntityStore store) { - this.entity = entity; + HadoopCatalogOperations(EntityStore store) { this.store = store; } - public HadoopCatalogOperations(CatalogEntity entity) { - this(entity, GravitinoEnv.getInstance().entityStore()); + public HadoopCatalogOperations() { + this(GravitinoEnv.getInstance().entityStore()); } @Override - public void initialize(Map config) throws RuntimeException { + public void initialize(Map config, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // Initialize Hadoop Configuration. this.hadoopConf = new Configuration(); Map bypassConfigs = diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java index e3aadbc06c3..8cb67450a56 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java @@ -98,14 +98,14 @@ public static void tearDown() throws IOException { @Test public void testHadoopCatalogConfiguration() { Map emptyProps = Maps.newHashMap(); - HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store); - ops.initialize(emptyProps); + HadoopCatalogOperations ops = new HadoopCatalogOperations(store); + ops.initialize(emptyProps, null); Configuration conf = ops.hadoopConf; String value = conf.get("fs.defaultFS"); Assertions.assertEquals("file:///", value); emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", "hdfs://localhost:9000"); - ops.initialize(emptyProps); + ops.initialize(emptyProps, null); Configuration conf1 = ops.hadoopConf; String value1 = conf1.get("fs.defaultFS"); Assertions.assertEquals("hdfs://localhost:9000", value1); @@ -113,7 +113,7 @@ public void testHadoopCatalogConfiguration() { Assertions.assertFalse(ops.catalogStorageLocation.isPresent()); emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, "file:///tmp/catalog"); - ops.initialize(emptyProps); + ops.initialize(emptyProps, null); Assertions.assertTrue(ops.catalogStorageLocation.isPresent()); Path expectedPath = new Path("file:///tmp/catalog"); Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get()); @@ -198,8 +198,8 @@ public void testLoadSchema() throws IOException { Assertions.assertEquals(name, schema.name()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name)); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -226,8 +226,8 @@ public void testListSchema() throws IOException { createSchema(name, comment, null, null); createSchema(name1, comment1, null, null); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Set idents = Arrays.stream(ops.listSchemas(Namespace.of("m1", "c1"))).collect(Collectors.toSet()); Assertions.assertTrue(idents.size() >= 2); @@ -244,8 +244,8 @@ public void testAlterSchema() throws IOException { Schema schema = createSchema(name, comment, catalogPath, null); Assertions.assertEquals(name, schema.name()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Schema schema1 = ops.loadSchema(NameIdentifier.ofSchema("m1", "c1", name)); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -294,8 +294,8 @@ public void testDropSchema() throws IOException { Assertions.assertEquals(name, schema.name()); NameIdentifier id = NameIdentifier.ofSchema("m1", "c1", name); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath)); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath), null); Schema schema1 = ops.loadSchema(id); Assertions.assertEquals(name, schema1.name()); Assertions.assertEquals(comment, schema1.comment()); @@ -348,8 +348,8 @@ public void testCreateLoadAndDeleteFilesetWithLocations( } NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(catalogProps); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(catalogProps, null); if (!ops.schemaExists(schemaIdent)) { createSchema(schemaName, comment, catalogPath, schemaPath); } @@ -400,8 +400,8 @@ public void testCreateFilesetWithExceptions() throws IOException { + " when it's catalog and schema " + "location are not set", exception.getMessage()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Throwable e = Assertions.assertThrows( NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent)); @@ -416,8 +416,8 @@ public void testCreateFilesetWithExceptions() throws IOException { Assertions.assertEquals( "Storage location must be set for external fileset " + filesetIdent, exception1.getMessage()); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Throwable e = Assertions.assertThrows( NoSuchFilesetException.class, () -> ops.loadFileset(filesetIdent)); @@ -436,8 +436,8 @@ public void testListFilesets() throws IOException { createFileset(fileset, schemaName, comment, Fileset.Type.MANAGED, null, null); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); Set idents = Arrays.stream(ops.listFilesets(Namespace.of("m1", "c1", schemaName))) .collect(Collectors.toSet()); @@ -467,8 +467,8 @@ public void testRenameFileset( } NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", schemaName); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(catalogProps); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(catalogProps, null); if (!ops.schemaExists(schemaIdent)) { createSchema(schemaName, comment, catalogPath, schemaPath); } @@ -513,8 +513,8 @@ public void testAlterFilesetProperties() throws IOException { FilesetChange change1 = FilesetChange.setProperty("k1", "v1"); FilesetChange change2 = FilesetChange.removeProperty("k1"); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Fileset fileset1 = ops.alterFileset(filesetIdent, change1); @@ -578,8 +578,8 @@ public void testUpdateFilesetComment() throws IOException { Fileset fileset = createFileset(name, schemaName, comment, Fileset.Type.MANAGED, null, null); FilesetChange change1 = FilesetChange.updateComment("comment26_new"); - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(Maps.newHashMap()); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(Maps.newHashMap(), null); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Fileset fileset1 = ops.alterFileset(filesetIdent, change1); @@ -830,8 +830,8 @@ private Schema createSchema(String name, String comment, String catalogPath, Str props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(props); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(props, null); NameIdentifier schemaIdent = NameIdentifier.ofSchema("m1", "c1", name); Map schemaProps = Maps.newHashMap(); @@ -859,8 +859,8 @@ private Fileset createFileset( props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath); } - try (HadoopCatalogOperations ops = new HadoopCatalogOperations(null, store)) { - ops.initialize(props); + try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store)) { + ops.initialize(props, null); NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name); Map filesetProps = Maps.newHashMap(); diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java index 8e12ba0beb3..57b85d03737 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalog.java @@ -33,8 +33,7 @@ public String shortName() { */ @Override protected CatalogOperations newOps(Map config) { - HiveCatalogOperations ops = new HiveCatalogOperations(entity()); - ops.initialize(config); + HiveCatalogOperations ops = new HiveCatalogOperations(); return ops; } diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java index 48dac74a301..08d1c227a9a 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java @@ -89,7 +89,7 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas @VisibleForTesting HiveConf hiveConf; - private final CatalogEntity entity; + private CatalogEntity entity; private HiveTablePropertiesMetadata tablePropertiesMetadata; @@ -111,23 +111,16 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas PRINCIPAL, ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); - /** - * Constructs a new instance of HiveCatalogOperations. - * - * @param entity The catalog entity associated with this operations instance. - */ - public HiveCatalogOperations(CatalogEntity entity) { - this.entity = entity; - } - /** * Initializes the Hive catalog operations with the provided configuration. * * @param conf The configuration map for the Hive catalog operations. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if initialization fails. */ @Override - public void initialize(Map conf) throws RuntimeException { + public void initialize(Map conf, CatalogEntity entity) throws RuntimeException { + this.entity = entity; this.tablePropertiesMetadata = new HiveTablePropertiesMetadata(); this.catalogPropertiesMetadata = new HiveCatalogPropertiesMeta(); this.schemaPropertiesMetadata = new HiveSchemaPropertiesMetadata(); diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java index 715cba578b3..ab349b7e256 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalog.java @@ -39,8 +39,8 @@ public void testListDatabases() throws TException, InterruptedException { Map conf = Maps.newHashMap(); metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue())); - try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) { - ops.initialize(conf); + try (HiveCatalogOperations ops = new HiveCatalogOperations()) { + ops.initialize(conf, entity); List dbs = ops.clientPool.run(IMetaStoreClient::getAllDatabases); Assertions.assertEquals(2, dbs.size()); Assertions.assertTrue(dbs.contains("default")); @@ -68,8 +68,8 @@ void testCatalogProperty() { metastore.hiveConf().forEach(e -> conf.put(e.getKey(), e.getValue())); - try (HiveCatalogOperations ops = new HiveCatalogOperations(entity)) { - ops.initialize(conf); + try (HiveCatalogOperations ops = new HiveCatalogOperations()) { + ops.initialize(conf, entity); PropertiesMetadata metadata = ops.catalogPropertiesMetadata(); Assertions.assertDoesNotThrow( diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java index 09cb7670a79..e1424280875 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java @@ -29,47 +29,49 @@ class TestHiveCatalogOperations { void testGetClientPoolSize() { Map maps = Maps.newHashMap(); maps.put(CLIENT_POOL_SIZE, "10"); - HiveCatalogOperations op = new HiveCatalogOperations(null); - op.initialize(maps); + HiveCatalogOperations op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertEquals(10, op.getClientPoolSize(maps)); maps.clear(); maps.put(CLIENT_POOL_SIZE + "_wrong_mark", "10"); - op = new HiveCatalogOperations(null); - op.initialize(maps); + op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertNotEquals(10, op.getClientPoolSize(maps)); maps.put(CLIENT_POOL_SIZE, "1"); - op = new HiveCatalogOperations(null); - op.initialize(maps); + op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertEquals(1, op.getClientPoolSize(maps)); } @Test void testInitialize() throws NoSuchFieldException, IllegalAccessException { Map properties = Maps.newHashMap(); - HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null); - hiveCatalogOperations.initialize(properties); + HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(); + hiveCatalogOperations.initialize(properties, null); String v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces"); Assertions.assertEquals("10", v); // Test If we can override the value in hive-site.xml properties.put(CATALOG_BYPASS_PREFIX + "mapreduce.job.reduces", "20"); - hiveCatalogOperations.initialize(properties); + hiveCatalogOperations.initialize(properties, null); v = hiveCatalogOperations.hiveConf.get("mapreduce.job.reduces"); Assertions.assertEquals("20", v); } @Test void testPropertyMeta() { - HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(null); - hiveCatalogOperations.initialize(Maps.newHashMap()); + HiveCatalogOperations hiveCatalogOperations = new HiveCatalogOperations(); + hiveCatalogOperations.initialize(Maps.newHashMap(), null); Map> propertyEntryMap = hiveCatalogOperations.catalogPropertiesMetadata().propertyEntries(); - Assertions.assertEquals(11, propertyEntryMap.size()); + Assertions.assertEquals(13, propertyEntryMap.size()); Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS)); Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE)); + Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.CATALOG_OPERATION_CLASS_NAME)); + Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.CATALOG_OPERATION_CLASS_PATH)); Assertions.assertTrue(propertyEntryMap.containsKey(CLIENT_POOL_SIZE)); Assertions.assertTrue(propertyEntryMap.containsKey(IMPERSONATION_ENABLE)); @@ -98,8 +100,8 @@ void testPropertyOverwrite() { maps.put(METASTORE_URIS, "url1"); maps.put(ConfVars.METASTOREURIS.varname, "url2"); maps.put(CATALOG_BYPASS_PREFIX + ConfVars.METASTOREURIS.varname, "url3"); - HiveCatalogOperations op = new HiveCatalogOperations(null); - op.initialize(maps); + HiveCatalogOperations op = new HiveCatalogOperations(); + op.initialize(maps, null); Assertions.assertEquals("v2", op.hiveConf.get("a.b")); Assertions.assertEquals("v4", op.hiveConf.get("c.d")); diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java index c99adf6c495..afa30221014 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java @@ -48,6 +48,7 @@ import com.datastrato.gravitino.integration.test.container.HiveContainer; import com.datastrato.gravitino.integration.test.util.AbstractIT; import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.integration.test.util.ITUtils; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.Schema; import com.datastrato.gravitino.rel.SchemaChange; @@ -74,6 +75,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.io.IOException; +import java.nio.file.Paths; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -99,6 +101,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1364,4 +1367,46 @@ public void testPurgeHiveExternalTable() throws TException, InterruptedException Assertions.assertTrue( hdfs.listStatus(tableDirectory).length > 0, "The table should not be empty"); } + + @Test + // Couldn't run under embedded mode, because Iceberg package may not build yet. + @EnabledIf("isDeployMode") + void testCustomCatalogOperations() { + String catalogName = "custom_catalog"; + Assertions.assertDoesNotThrow(() -> createCatalogWithCustomOperation(catalogName)); + } + + private static Catalog createCatalogWithCustomOperation(String catalogName) { + Map properties = Maps.newHashMap(); + properties.put(METASTORE_URIS, HIVE_METASTORE_URIS); + properties.put( + Catalog.CATALOG_OPERATION_CLASS_NAME, + "com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogOperations"); + properties.put(Catalog.CATALOG_OPERATION_CLASS_PATH, getIcebergJarPath()); + + properties.put("catalog-backend", "memory"); + properties.put("warehouse", "mock"); + properties.put("uri", "mock"); + + metalake.createCatalog( + NameIdentifier.of(metalakeName, catalogName), + Catalog.Type.RELATIONAL, + provider, + "comment", + properties); + + return metalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName)); + } + + private static String getIcebergJarPath() { + if (ITUtils.getTestMode().equals(ITUtils.EMBEDDED_TEST_MODE)) { + return Paths.get("catalogs", "catalog-lakehouse-iceberg", "build", "libs").toString(); + } else { + return Paths.get("catalogs", "lakehouse-iceberg", "libs").toString(); + } + } + + private static boolean isDeployMode() { + return !ITUtils.getTestMode().equals(ITUtils.EMBEDDED_TEST_MODE); + } } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java index 758180a972c..a638dfb5945 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java @@ -31,14 +31,12 @@ protected CatalogOperations newOps(Map config) { JdbcTypeConverter jdbcTypeConverter = createJdbcTypeConverter(); JdbcCatalogOperations ops = new JdbcCatalogOperations( - entity(), createExceptionConverter(), jdbcTypeConverter, createJdbcDatabaseOperations(), createJdbcTableOperations(), createJdbcTablePropertiesMetadata(), createJdbcColumnDefaultValueConverter()); - ops.initialize(config); return ops; } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java index 8714ba36ec2..828ad8b1c21 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java @@ -70,7 +70,7 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas private JdbcSchemaPropertiesMetadata jdbcSchemaPropertiesMetadata; - private final CatalogEntity entity; + private CatalogEntity entity; private final JdbcExceptionConverter exceptionConverter; @@ -87,7 +87,6 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas /** * Constructs a new instance of JdbcCatalogOperations. * - * @param entity The catalog entity associated with this operations instance. * @param exceptionConverter The exception converter to be used by the operations. * @param jdbcTypeConverter The type converter to be used by the operations. * @param databaseOperation The database operations to be used by the operations. @@ -95,14 +94,12 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas * @param jdbcTablePropertiesMetadata The table properties metadata to be used by the operations. */ public JdbcCatalogOperations( - CatalogEntity entity, JdbcExceptionConverter exceptionConverter, JdbcTypeConverter jdbcTypeConverter, JdbcDatabaseOperations databaseOperation, JdbcTableOperations tableOperation, JdbcTablePropertiesMetadata jdbcTablePropertiesMetadata, JdbcColumnDefaultValueConverter columnDefaultValueConverter) { - this.entity = entity; this.exceptionConverter = exceptionConverter; this.jdbcTypeConverter = jdbcTypeConverter; this.databaseOperation = databaseOperation; @@ -115,10 +112,12 @@ public JdbcCatalogOperations( * Initializes the Jdbc catalog operations with the provided configuration. * * @param conf The configuration map for the Jdbc catalog operations. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if initialization fails. */ @Override - public void initialize(Map conf) throws RuntimeException { + public void initialize(Map conf, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // Key format like gravitino.bypass.a.b Map prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java index db84b4b3b20..963e5d77bb9 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java @@ -27,8 +27,7 @@ public String shortName() { */ @Override protected CatalogOperations newOps(Map config) { - IcebergCatalogOperations ops = new IcebergCatalogOperations(entity()); - ops.initialize(config); + IcebergCatalogOperations ops = new IcebergCatalogOperations(); return ops; } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index b197da20f29..7baa6204fe6 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -72,27 +72,20 @@ public class IcebergCatalogOperations implements CatalogOperations, SupportsSche private IcebergSchemaPropertiesMetadata icebergSchemaPropertiesMetadata; - private final CatalogEntity entity; + private CatalogEntity entity; private IcebergTableOpsHelper icebergTableOpsHelper; - /** - * Constructs a new instance of IcebergCatalogOperations. - * - * @param entity The catalog entity associated with this operations instance. - */ - public IcebergCatalogOperations(CatalogEntity entity) { - this.entity = entity; - } - /** * Initializes the Iceberg catalog operations with the provided configuration. * * @param conf The configuration map for the Iceberg catalog operations. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if initialization fails. */ @Override - public void initialize(Map conf) throws RuntimeException { + public void initialize(Map conf, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // Key format like gravitino.bypass.a.b Map prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java index 84603958cbb..aec05a9e6d7 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalog.java @@ -36,7 +36,7 @@ public void testListDatabases() { Map conf = Maps.newHashMap(); IcebergCatalog icebergCatalog = new IcebergCatalog().withCatalogConf(conf).withCatalogEntity(entity); - CatalogOperations catalogOperations = icebergCatalog.newOps(Maps.newHashMap()); + CatalogOperations catalogOperations = icebergCatalog.ops(); Assertions.assertTrue(catalogOperations instanceof IcebergCatalogOperations); IcebergTableOps icebergTableOps = @@ -63,8 +63,8 @@ void testCatalogProperty() { Map conf = Maps.newHashMap(); - try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) { - ops.initialize(conf); + try (IcebergCatalogOperations ops = new IcebergCatalogOperations()) { + ops.initialize(conf, entity); Map map1 = Maps.newHashMap(); map1.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, "test"); PropertiesMetadata metadata = ops.catalogPropertiesMetadata(); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java index 6e3d08d6519..3c34c7ae098 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergSchema.java @@ -144,8 +144,8 @@ void testSchemaProperty() { Map conf = Maps.newHashMap(); - try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) { - ops.initialize(conf); + try (IcebergCatalogOperations ops = new IcebergCatalogOperations()) { + ops.initialize(conf, entity); Map map = Maps.newHashMap(); map.put(IcebergSchemaPropertiesMetadata.COMMENT, "test"); PropertiesMetadata metadata = ops.schemaPropertiesMetadata(); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java index d20c782a12d..590a603717b 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java @@ -501,8 +501,8 @@ public void testAlterIcebergTable() { @Test public void testTableProperty() { CatalogEntity entity = createDefaultCatalogEntity(); - try (IcebergCatalogOperations ops = new IcebergCatalogOperations(entity)) { - ops.initialize(Maps.newHashMap()); + try (IcebergCatalogOperations ops = new IcebergCatalogOperations()) { + ops.initialize(Maps.newHashMap(), entity); Map map = Maps.newHashMap(); map.put(IcebergTablePropertiesMetadata.COMMENT, "test"); map.put(IcebergTablePropertiesMetadata.CREATOR, "test"); diff --git a/common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java b/common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java new file mode 100644 index 00000000000..2ec1f5c63a7 --- /dev/null +++ b/common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.utils; + +import com.google.common.base.Preconditions; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.commons.lang3.StringUtils; + +/** Provides Helper methods to process paths. */ +public class PathUtils { + + /** + * Helper methods to transform path to absolutePath, if path is relative, it's supposed to under + * gravitinoHome directory. + * + * @param path The path to transform. + * @param gravitinoHome Gravitino home directory. + * @return The absolute path. + */ + public static String getAbsolutePath(String path, String gravitinoHome) { + Path p = Paths.get(path); + if (p.isAbsolute()) { + return p.toString(); + } + + Preconditions.checkArgument( + StringUtils.isNotBlank(gravitinoHome), "GRAVITINO_HOME should not empty"); + Path newPath = Paths.get(gravitinoHome, path); + return newPath.toString(); + } + + private PathUtils() {} +} diff --git a/common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java b/common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java new file mode 100644 index 00000000000..35f1df18235 --- /dev/null +++ b/common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.utils; + +import java.nio.file.Paths; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestPathUtils { + + @Test + void testGetAbstractPath() { + String gravitinoHome = Paths.get("home", "gravitino").toString(); + String currentDirectory = Paths.get("").toAbsolutePath().toString(); + + String path = PathUtils.getAbsolutePath(currentDirectory, gravitinoHome); + Assertions.assertEquals(currentDirectory, path); + + String relativePath = "abc"; + path = PathUtils.getAbsolutePath(relativePath, gravitinoHome); + String expectedPath = Paths.get(gravitinoHome, relativePath).toString(); + Assertions.assertEquals(expectedPath, path); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index 4090f939c49..232da047027 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -12,6 +12,8 @@ import com.google.common.collect.Maps; import java.util.Map; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The abstract base class for Catalog implementations. @@ -28,6 +30,8 @@ public abstract class BaseCatalog implements Catalog, CatalogProvider, HasPropertyMetadata { + private static final Logger LOG = LoggerFactory.getLogger(BaseCatalog.class); + private CatalogEntity entity; private Map conf; @@ -91,7 +95,8 @@ public CatalogOperations ops() { if (ops == null) { Preconditions.checkArgument( entity != null && conf != null, "entity and conf must be set before calling ops()"); - CatalogOperations newOps = newOps(conf); + CatalogOperations newOps = createOps(conf); + newOps.initialize(conf, entity); ops = newProxyPlugin(conf) .map( @@ -106,6 +111,25 @@ public CatalogOperations ops() { return ops; } + private CatalogOperations createOps(Map conf) { + String customCatalogOperationClass = conf.get(CATALOG_OPERATION_CLASS_NAME); + return Optional.ofNullable(customCatalogOperationClass) + .map(className -> loadCustomerOps(className)) + .orElse(newOps(conf)); + } + + private CatalogOperations loadCustomerOps(String className) { + // Couldn't find package if not specifying classloader + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + try { + return (CatalogOperations) + Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance(); + } catch (Exception e) { + LOG.error("Failed to load custom catalog operations, {}", className, e); + throw new RuntimeException(e); + } + } + /** * Sets the CatalogEntity for this catalog. * diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java index 699f2eafb7e..a07f7ca21ef 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java @@ -5,6 +5,8 @@ package com.datastrato.gravitino.catalog; +import static com.datastrato.gravitino.Catalog.CATALOG_OPERATION_CLASS_NAME; +import static com.datastrato.gravitino.Catalog.CATALOG_OPERATION_CLASS_PATH; import static com.datastrato.gravitino.Catalog.PROPERTY_PACKAGE; import com.google.common.collect.ImmutableList; @@ -22,6 +24,20 @@ public abstract class BaseCatalogPropertiesMetadata extends BasePropertiesMetada false, null, false, + false), + PropertyEntry.stringImmutablePropertyEntry( + CATALOG_OPERATION_CLASS_NAME, + "The custom catalog operation class name", + false, + null, + false, + false), + PropertyEntry.stringImmutablePropertyEntry( + CATALOG_OPERATION_CLASS_PATH, + "The custom catalog operation class path", + false, + null, + false, false)), PropertyEntry::getName); } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java index 40fb74d80f5..48870b72626 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java @@ -31,6 +31,7 @@ import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.storage.IdGenerator; import com.datastrato.gravitino.utils.IsolatedClassLoader; +import com.datastrato.gravitino.utils.PathUtils; import com.datastrato.gravitino.utils.PrincipalUtils; import com.datastrato.gravitino.utils.ThrowableFunction; import com.github.benmanes.caffeine.cache.Cache; @@ -39,7 +40,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Streams; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -469,9 +470,9 @@ private CatalogWrapper createCatalogWrapper(CatalogEntity entity) { IsolatedClassLoader classLoader; if (config.get(Configs.CATALOG_LOAD_ISOLATED)) { - String pkgPath = buildPkgPath(conf, provider); - String confPath = buildConfPath(provider); - classLoader = IsolatedClassLoader.buildClassLoader(Lists.newArrayList(pkgPath, confPath)); + List catalogClassPath = buildCatalogClassPath(conf, provider); + LOG.info("Catalog {} classpath: {}", provider, catalogClassPath); + classLoader = IsolatedClassLoader.buildClassLoader(catalogClassPath); } else { // This will use the current class loader, it is mainly used for test. classLoader = @@ -559,6 +560,17 @@ static Map mergeConf(Map properties, Map buildCatalogClassPath( + Map catalogProperties, String provider) { + String pkgPath = buildPkgPath(catalogProperties, provider); + String confPath = buildConfPath(provider); + Optional customCatalogOperationPath = + buildCustomCatalogOperationPath(catalogProperties); + return customCatalogOperationPath + .map(path -> Arrays.asList(pkgPath, confPath, path)) + .orElse(Arrays.asList(pkgPath, confPath)); + } + /** * Build the config path from the specific provider. Usually, the configuration file is under the * conf and conf and package are under the same directory. @@ -581,6 +593,15 @@ private String buildConfPath(String provider) { return String.join(File.separator, gravitinoHome, "catalogs", provider, "conf"); } + private Optional buildCustomCatalogOperationPath(Map catalogProperties) { + String className = catalogProperties.get(Catalog.CATALOG_OPERATION_CLASS_NAME); + String path = catalogProperties.get(Catalog.CATALOG_OPERATION_CLASS_PATH); + if (StringUtils.isNotBlank(className) && StringUtils.isNotBlank(path)) { + return Optional.of(PathUtils.getAbsolutePath(path, System.getenv("GRAVITINO_HOME"))); + } + return Optional.empty(); + } + private String buildPkgPath(Map conf, String provider) { String gravitinoHome = System.getenv("GRAVITINO_HOME"); Preconditions.checkArgument(gravitinoHome != null, "GRAVITINO_HOME not set"); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java index 91fd6199e9d..88efcb1bf72 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperations.java @@ -4,6 +4,7 @@ */ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.SupportsSchemas; import com.datastrato.gravitino.rel.TableCatalog; import java.io.Closeable; @@ -23,7 +24,8 @@ public interface CatalogOperations extends Closeable, HasPropertyMetadata { * if the initialization failed. * * @param config The configuration of this Catalog. + * @param entity The catalog entity associated with this operations instance. * @throws RuntimeException if the initialization failed. */ - void initialize(Map config) throws RuntimeException; + void initialize(Map config, CatalogEntity entity) throws RuntimeException; } diff --git a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java index 28541dc3e97..31f625aa7e2 100644 --- a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java +++ b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java @@ -20,6 +20,7 @@ import com.datastrato.gravitino.file.FilesetCatalog; import com.datastrato.gravitino.file.FilesetChange; import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.Schema; import com.datastrato.gravitino.rel.SchemaChange; @@ -71,7 +72,8 @@ public TestCatalogOperations(Map config) { } @Override - public void initialize(Map config) throws RuntimeException {} + public void initialize(Map config, CatalogEntity entity) + throws RuntimeException {} @Override public void close() throws IOException {} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java b/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java new file mode 100644 index 00000000000..35ea9a304a3 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.meta.CatalogEntity; +import java.io.IOException; +import java.util.Map; + +public class DummyCatalogOperations implements CatalogOperations { + + @Override + public void initialize(Map config, CatalogEntity entity) + throws RuntimeException {} + + @Override + public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public void close() throws IOException {} +} diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java new file mode 100644 index 00000000000..93741559a7e --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.TestCatalog; +import com.datastrato.gravitino.TestCatalogOperations; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestBaseCatalog { + + @Test + void testCustomCatalogOperations() { + CatalogEntity entity = Mockito.mock(CatalogEntity.class); + + TestCatalog catalog = + new TestCatalog().withCatalogConf(ImmutableMap.of()).withCatalogEntity(entity); + CatalogOperations testCatalogOperations = catalog.ops(); + Assertions.assertTrue(testCatalogOperations instanceof TestCatalogOperations); + + TestCatalog catalog2 = + new TestCatalog() + .withCatalogConf( + ImmutableMap.of( + Catalog.CATALOG_OPERATION_CLASS_NAME, DummyCatalogOperations.class.getName())) + .withCatalogEntity(entity); + CatalogOperations dummyCatalogOperations = catalog2.ops(); + Assertions.assertTrue(dummyCatalogOperations instanceof DummyCatalogOperations); + } +} diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java index 17c219d971d..243e49c33ae 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java @@ -53,5 +53,11 @@ public static void overwriteConfigFile(String configFileName, Properties props) } } + public static String getTestMode() { + return System.getProperty(ITUtils.TEST_MODE) == null + ? ITUtils.EMBEDDED_TEST_MODE + : System.getProperty(ITUtils.TEST_MODE); + } + private ITUtils() {} } diff --git a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java index 7934a4fa50e..90139f181f8 100644 --- a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java +++ b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestCatalog.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.catalog.BaseCatalog; import com.datastrato.gravitino.catalog.CatalogOperations; import com.datastrato.gravitino.catalog.PropertiesMetadata; +import com.datastrato.gravitino.meta.CatalogEntity; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; @@ -41,7 +42,8 @@ public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationE } @Override - public void initialize(Map config) throws RuntimeException {} + public void initialize(Map config, CatalogEntity entity) + throws RuntimeException {} @Override public void close() throws IOException {} From 87b78328168cdeb797093d21c46b4106bf58400f Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 13 Mar 2024 14:49:23 +0800 Subject: [PATCH 02/11] remove class path --- .../com/datastrato/gravitino/Catalog.java | 16 -------- .../hive/TestHiveCatalogOperations.java | 4 +- .../hive/integration/test/CatalogHiveIT.java | 40 +++++-------------- .../datastrato/gravitino/utils/PathUtils.java | 37 ----------------- .../gravitino/catalog/BaseCatalog.java | 14 ++++--- .../BaseCatalogPropertiesMetadata.java | 16 -------- .../gravitino/catalog/CatalogManager.java | 17 +------- .../gravitino/catalog/TestBaseCatalog.java | 3 +- .../integration/test/util/ITUtils.java | 6 --- 9 files changed, 22 insertions(+), 131 deletions(-) delete mode 100644 common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java diff --git a/api/src/main/java/com/datastrato/gravitino/Catalog.java b/api/src/main/java/com/datastrato/gravitino/Catalog.java index 56538f298c1..e9339c6a6c3 100644 --- a/api/src/main/java/com/datastrato/gravitino/Catalog.java +++ b/api/src/main/java/com/datastrato/gravitino/Catalog.java @@ -41,22 +41,6 @@ enum Type { */ String PROPERTY_PACKAGE = "package"; - /** - * Custom catalog operation class name. It is recommended to extend current Gravitino catalog - * operations like HiveCatalogOperation, and override interfaces like purgeTable, and - * listPartition which should be hacked, but this is not a must. It only needs to implement - * CatalogOperations to initialize itself, implement SupportsSchemas to do schema ops, and - * TableCatalog to do table ops if necessary. - */ - String CATALOG_OPERATION_CLASS_NAME = "catalog.operation.custom.class-name"; - - /** - * Custom catalog operation class path. It will be added to the catalog classpath if both custom - * catalog operation class name and class path are not empty. The path could be absolute or - * relative, if relative, it's supposed to under Gravitino Home directory. - */ - String CATALOG_OPERATION_CLASS_PATH = "catalog.operation.custom.classpath"; - /** @return The name of the catalog. */ String name(); diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java index e1424280875..980850da37d 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java @@ -67,11 +67,9 @@ void testPropertyMeta() { Map> propertyEntryMap = hiveCatalogOperations.catalogPropertiesMetadata().propertyEntries(); - Assertions.assertEquals(13, propertyEntryMap.size()); + Assertions.assertEquals(11, propertyEntryMap.size()); Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS)); Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE)); - Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.CATALOG_OPERATION_CLASS_NAME)); - Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.CATALOG_OPERATION_CLASS_PATH)); Assertions.assertTrue(propertyEntryMap.containsKey(CLIENT_POOL_SIZE)); Assertions.assertTrue(propertyEntryMap.containsKey(IMPERSONATION_ENABLE)); diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java index afa30221014..f3663f15a02 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java @@ -32,6 +32,8 @@ import com.datastrato.gravitino.MetalakeChange; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.auth.AuthConstants; +import com.datastrato.gravitino.catalog.BaseCatalog; +import com.datastrato.gravitino.catalog.hive.HiveCatalogOperations; import com.datastrato.gravitino.catalog.hive.HiveClientPool; import com.datastrato.gravitino.catalog.hive.HiveSchemaPropertiesMetadata; import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata; @@ -48,7 +50,6 @@ import com.datastrato.gravitino.integration.test.container.HiveContainer; import com.datastrato.gravitino.integration.test.util.AbstractIT; import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; -import com.datastrato.gravitino.integration.test.util.ITUtils; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.Schema; import com.datastrato.gravitino.rel.SchemaChange; @@ -75,7 +76,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.io.IOException; -import java.nio.file.Paths; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -101,7 +101,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledIf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1369,24 +1368,21 @@ public void testPurgeHiveExternalTable() throws TException, InterruptedException } @Test - // Couldn't run under embedded mode, because Iceberg package may not build yet. - @EnabledIf("isDeployMode") void testCustomCatalogOperations() { String catalogName = "custom_catalog"; - Assertions.assertDoesNotThrow(() -> createCatalogWithCustomOperation(catalogName)); + Assertions.assertDoesNotThrow( + () -> createCatalogWithCustomOperation(catalogName, HiveCatalogOperations.class.getName())); + Assertions.assertThrowsExactly( + RuntimeException.class, + () -> + createCatalogWithCustomOperation( + catalogName + "_not_exists", "com.datastrato.gravitino.catalog.not.exists")); } - private static Catalog createCatalogWithCustomOperation(String catalogName) { + private static void createCatalogWithCustomOperation(String catalogName, String customImpl) { Map properties = Maps.newHashMap(); properties.put(METASTORE_URIS, HIVE_METASTORE_URIS); - properties.put( - Catalog.CATALOG_OPERATION_CLASS_NAME, - "com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogOperations"); - properties.put(Catalog.CATALOG_OPERATION_CLASS_PATH, getIcebergJarPath()); - - properties.put("catalog-backend", "memory"); - properties.put("warehouse", "mock"); - properties.put("uri", "mock"); + properties.put(BaseCatalog.CATALOG_OPERATION_IMPL, customImpl); metalake.createCatalog( NameIdentifier.of(metalakeName, catalogName), @@ -1394,19 +1390,5 @@ private static Catalog createCatalogWithCustomOperation(String catalogName) { provider, "comment", properties); - - return metalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName)); - } - - private static String getIcebergJarPath() { - if (ITUtils.getTestMode().equals(ITUtils.EMBEDDED_TEST_MODE)) { - return Paths.get("catalogs", "catalog-lakehouse-iceberg", "build", "libs").toString(); - } else { - return Paths.get("catalogs", "lakehouse-iceberg", "libs").toString(); - } - } - - private static boolean isDeployMode() { - return !ITUtils.getTestMode().equals(ITUtils.EMBEDDED_TEST_MODE); } } diff --git a/common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java b/common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java deleted file mode 100644 index 2ec1f5c63a7..00000000000 --- a/common/src/main/java/com/datastrato/gravitino/utils/PathUtils.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ - -package com.datastrato.gravitino.utils; - -import com.google.common.base.Preconditions; -import java.nio.file.Path; -import java.nio.file.Paths; -import org.apache.commons.lang3.StringUtils; - -/** Provides Helper methods to process paths. */ -public class PathUtils { - - /** - * Helper methods to transform path to absolutePath, if path is relative, it's supposed to under - * gravitinoHome directory. - * - * @param path The path to transform. - * @param gravitinoHome Gravitino home directory. - * @return The absolute path. - */ - public static String getAbsolutePath(String path, String gravitinoHome) { - Path p = Paths.get(path); - if (p.isAbsolute()) { - return p.toString(); - } - - Preconditions.checkArgument( - StringUtils.isNotBlank(gravitinoHome), "GRAVITINO_HOME should not empty"); - Path newPath = Paths.get(gravitinoHome, path); - return newPath.toString(); - } - - private PathUtils() {} -} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index 232da047027..4d044dbfb93 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.CatalogProvider; import com.datastrato.gravitino.meta.CatalogEntity; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.Map; @@ -29,9 +30,13 @@ */ public abstract class BaseCatalog implements Catalog, CatalogProvider, HasPropertyMetadata { - private static final Logger LOG = LoggerFactory.getLogger(BaseCatalog.class); + // A hack way to inject custom operation to Gravitino, the object you used is not stable, don't + // use + // it unless you know what you are doing. + @VisibleForTesting public static final String CATALOG_OPERATION_IMPL = "ops-impl"; + private CatalogEntity entity; private Map conf; @@ -112,18 +117,15 @@ public CatalogOperations ops() { } private CatalogOperations createOps(Map conf) { - String customCatalogOperationClass = conf.get(CATALOG_OPERATION_CLASS_NAME); + String customCatalogOperationClass = conf.get(CATALOG_OPERATION_IMPL); return Optional.ofNullable(customCatalogOperationClass) .map(className -> loadCustomerOps(className)) .orElse(newOps(conf)); } private CatalogOperations loadCustomerOps(String className) { - // Couldn't find package if not specifying classloader - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { - return (CatalogOperations) - Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance(); + return (CatalogOperations) Class.forName(className).getDeclaredConstructor().newInstance(); } catch (Exception e) { LOG.error("Failed to load custom catalog operations, {}", className, e); throw new RuntimeException(e); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java index a07f7ca21ef..699f2eafb7e 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java @@ -5,8 +5,6 @@ package com.datastrato.gravitino.catalog; -import static com.datastrato.gravitino.Catalog.CATALOG_OPERATION_CLASS_NAME; -import static com.datastrato.gravitino.Catalog.CATALOG_OPERATION_CLASS_PATH; import static com.datastrato.gravitino.Catalog.PROPERTY_PACKAGE; import com.google.common.collect.ImmutableList; @@ -24,20 +22,6 @@ public abstract class BaseCatalogPropertiesMetadata extends BasePropertiesMetada false, null, false, - false), - PropertyEntry.stringImmutablePropertyEntry( - CATALOG_OPERATION_CLASS_NAME, - "The custom catalog operation class name", - false, - null, - false, - false), - PropertyEntry.stringImmutablePropertyEntry( - CATALOG_OPERATION_CLASS_PATH, - "The custom catalog operation class path", - false, - null, - false, false)), PropertyEntry::getName); } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java index 48870b72626..0f647ef6e43 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java @@ -31,7 +31,6 @@ import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.storage.IdGenerator; import com.datastrato.gravitino.utils.IsolatedClassLoader; -import com.datastrato.gravitino.utils.PathUtils; import com.datastrato.gravitino.utils.PrincipalUtils; import com.datastrato.gravitino.utils.ThrowableFunction; import com.github.benmanes.caffeine.cache.Cache; @@ -60,7 +59,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -564,11 +562,7 @@ private List buildCatalogClassPath( Map catalogProperties, String provider) { String pkgPath = buildPkgPath(catalogProperties, provider); String confPath = buildConfPath(provider); - Optional customCatalogOperationPath = - buildCustomCatalogOperationPath(catalogProperties); - return customCatalogOperationPath - .map(path -> Arrays.asList(pkgPath, confPath, path)) - .orElse(Arrays.asList(pkgPath, confPath)); + return Arrays.asList(pkgPath, confPath); } /** @@ -593,15 +587,6 @@ private String buildConfPath(String provider) { return String.join(File.separator, gravitinoHome, "catalogs", provider, "conf"); } - private Optional buildCustomCatalogOperationPath(Map catalogProperties) { - String className = catalogProperties.get(Catalog.CATALOG_OPERATION_CLASS_NAME); - String path = catalogProperties.get(Catalog.CATALOG_OPERATION_CLASS_PATH); - if (StringUtils.isNotBlank(className) && StringUtils.isNotBlank(path)) { - return Optional.of(PathUtils.getAbsolutePath(path, System.getenv("GRAVITINO_HOME"))); - } - return Optional.empty(); - } - private String buildPkgPath(Map conf, String provider) { String gravitinoHome = System.getenv("GRAVITINO_HOME"); Preconditions.checkArgument(gravitinoHome != null, "GRAVITINO_HOME not set"); diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java index 93741559a7e..9003d5d8969 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestBaseCatalog.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.catalog; -import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.TestCatalog; import com.datastrato.gravitino.TestCatalogOperations; import com.datastrato.gravitino.meta.CatalogEntity; @@ -29,7 +28,7 @@ void testCustomCatalogOperations() { new TestCatalog() .withCatalogConf( ImmutableMap.of( - Catalog.CATALOG_OPERATION_CLASS_NAME, DummyCatalogOperations.class.getName())) + BaseCatalog.CATALOG_OPERATION_IMPL, DummyCatalogOperations.class.getName())) .withCatalogEntity(entity); CatalogOperations dummyCatalogOperations = catalog2.ops(); Assertions.assertTrue(dummyCatalogOperations instanceof DummyCatalogOperations); diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java index 243e49c33ae..17c219d971d 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/ITUtils.java @@ -53,11 +53,5 @@ public static void overwriteConfigFile(String configFileName, Properties props) } } - public static String getTestMode() { - return System.getProperty(ITUtils.TEST_MODE) == null - ? ITUtils.EMBEDDED_TEST_MODE - : System.getProperty(ITUtils.TEST_MODE); - } - private ITUtils() {} } From d0be7c60d4223fed2d77fbb5e91fa61bad288384 Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 13 Mar 2024 14:56:28 +0800 Subject: [PATCH 03/11] polish --- .../gravitino/utils/TestPathUtils.java | 27 ------------------- .../gravitino/catalog/BaseCatalog.java | 3 +-- .../gravitino/catalog/CatalogManager.java | 14 +++------- 3 files changed, 5 insertions(+), 39 deletions(-) delete mode 100644 common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java diff --git a/common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java b/common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java deleted file mode 100644 index 35f1df18235..00000000000 --- a/common/src/test/java/com/datastrato/gravitino/utils/TestPathUtils.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ - -package com.datastrato.gravitino.utils; - -import java.nio.file.Paths; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class TestPathUtils { - - @Test - void testGetAbstractPath() { - String gravitinoHome = Paths.get("home", "gravitino").toString(); - String currentDirectory = Paths.get("").toAbsolutePath().toString(); - - String path = PathUtils.getAbsolutePath(currentDirectory, gravitinoHome); - Assertions.assertEquals(currentDirectory, path); - - String relativePath = "abc"; - path = PathUtils.getAbsolutePath(relativePath, gravitinoHome); - String expectedPath = Paths.get(gravitinoHome, relativePath).toString(); - Assertions.assertEquals(expectedPath, path); - } -} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index 4d044dbfb93..8b79452d33d 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -33,8 +33,7 @@ public abstract class BaseCatalog private static final Logger LOG = LoggerFactory.getLogger(BaseCatalog.class); // A hack way to inject custom operation to Gravitino, the object you used is not stable, don't - // use - // it unless you know what you are doing. + // use it unless you know what you are doing. @VisibleForTesting public static final String CATALOG_OPERATION_IMPL = "ops-impl"; private CatalogEntity entity; diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java index 0f647ef6e43..40fb74d80f5 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogManager.java @@ -39,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Streams; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -468,9 +469,9 @@ private CatalogWrapper createCatalogWrapper(CatalogEntity entity) { IsolatedClassLoader classLoader; if (config.get(Configs.CATALOG_LOAD_ISOLATED)) { - List catalogClassPath = buildCatalogClassPath(conf, provider); - LOG.info("Catalog {} classpath: {}", provider, catalogClassPath); - classLoader = IsolatedClassLoader.buildClassLoader(catalogClassPath); + String pkgPath = buildPkgPath(conf, provider); + String confPath = buildConfPath(provider); + classLoader = IsolatedClassLoader.buildClassLoader(Lists.newArrayList(pkgPath, confPath)); } else { // This will use the current class loader, it is mainly used for test. classLoader = @@ -558,13 +559,6 @@ static Map mergeConf(Map properties, Map buildCatalogClassPath( - Map catalogProperties, String provider) { - String pkgPath = buildPkgPath(catalogProperties, provider); - String confPath = buildConfPath(provider); - return Arrays.asList(pkgPath, confPath); - } - /** * Build the config path from the specific provider. Usually, the configuration file is under the * conf and conf and package are under the same directory. From ac49b8ec572adad80805f06cc321767f08c4a440 Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 13 Mar 2024 15:37:48 +0800 Subject: [PATCH 04/11] add context classloader to class.forname --- .../java/com/datastrato/gravitino/catalog/BaseCatalog.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index 8b79452d33d..f03ea59a3e1 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -123,8 +123,10 @@ private CatalogOperations createOps(Map conf) { } private CatalogOperations loadCustomerOps(String className) { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { - return (CatalogOperations) Class.forName(className).getDeclaredConstructor().newInstance(); + return (CatalogOperations) + Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance(); } catch (Exception e) { LOG.error("Failed to load custom catalog operations, {}", className, e); throw new RuntimeException(e); From a9e7795944685402c68f4eb241bac49b40b21005 Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 13 Mar 2024 16:50:50 +0800 Subject: [PATCH 05/11] add descirption about classloader --- .../java/com/datastrato/gravitino/catalog/BaseCatalog.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index f03ea59a3e1..1534823c2f1 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -125,6 +125,9 @@ private CatalogOperations createOps(Map conf) { private CatalogOperations loadCustomerOps(String className) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { + // Class.forName use classloader of the caller class (BaseCatalog.class), it's global + // classloader not the catalog specific classloader, so we must specify the classloader + // explicitly. return (CatalogOperations) Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance(); } catch (Exception e) { From 244054f3843e4b8f174fecf7ad97cd67853ffbbb Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 13 Mar 2024 17:05:16 +0800 Subject: [PATCH 06/11] fix kafka catalog --- .../gravitino/catalog/kafka/KafkaCatalogOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java index b7ee5e934fb..ebcf063f2b2 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java @@ -48,7 +48,7 @@ public KafkaCatalogOperations(CatalogEntity entity) { } @Override - public void initialize(Map config) throws RuntimeException { + public void initialize(Map config, CatalogEntity entity) throws RuntimeException { // TODO: Implement Kafka catalog initialization, such as creating a default schema. } From 0358741aeab3c362adde253da30846f1784a2b96 Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 13 Mar 2024 17:07:14 +0800 Subject: [PATCH 07/11] fix kafka catalog --- .../datastrato/gravitino/catalog/kafka/KafkaCatalog.java | 3 +-- .../gravitino/catalog/kafka/KafkaCatalogOperations.java | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java index fe2861c7947..ae5c6d90491 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalog.java @@ -21,8 +21,7 @@ public String shortName() { @Override protected CatalogOperations newOps(Map config) { - KafkaCatalogOperations ops = new KafkaCatalogOperations(entity()); - ops.initialize(config); + KafkaCatalogOperations ops = new KafkaCatalogOperations(); return ops; } diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java index ebcf063f2b2..50bd60ca93b 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java @@ -39,16 +39,16 @@ public class KafkaCatalogOperations implements CatalogOperations, SupportsSchema private static final KafkaTopicPropertiesMetadata TOPIC_PROPERTIES_METADATA = new KafkaTopicPropertiesMetadata(); - private final CatalogEntity entity; private final EntityStore store; + private CatalogEntity entity; - public KafkaCatalogOperations(CatalogEntity entity) { - this.entity = entity; + public KafkaCatalogOperations() { this.store = GravitinoEnv.getInstance().entityStore(); } @Override public void initialize(Map config, CatalogEntity entity) throws RuntimeException { + this.entity = entity; // TODO: Implement Kafka catalog initialization, such as creating a default schema. } From aca26aae4289f5166278c46bc33056300251542d Mon Sep 17 00:00:00 2001 From: fanng Date: Wed, 13 Mar 2024 17:16:17 +0800 Subject: [PATCH 08/11] fix kafka catalog --- .../datastrato/gravitino/catalog/DummyCatalogOperations.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java b/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java index 35ea9a304a3..726de4e1da4 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/DummyCatalogOperations.java @@ -35,6 +35,11 @@ public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperatio return null; } + @Override + public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException { + return null; + } + @Override public void close() throws IOException {} } From af7d65293d09f4094e661ab936a37feeb1cb71da Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 14 Mar 2024 08:28:07 +0800 Subject: [PATCH 09/11] correct custom --- .../java/com/datastrato/gravitino/catalog/BaseCatalog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java index 1534823c2f1..9cb936b345d 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalog.java @@ -118,11 +118,11 @@ public CatalogOperations ops() { private CatalogOperations createOps(Map conf) { String customCatalogOperationClass = conf.get(CATALOG_OPERATION_IMPL); return Optional.ofNullable(customCatalogOperationClass) - .map(className -> loadCustomerOps(className)) + .map(className -> loadCustomOps(className)) .orElse(newOps(conf)); } - private CatalogOperations loadCustomerOps(String className) { + private CatalogOperations loadCustomOps(String className) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { // Class.forName use classloader of the caller class (BaseCatalog.class), it's global From 5579f32f7bd4762b8b259105809ca6c6ebfb05da Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 14 Mar 2024 15:06:45 +0800 Subject: [PATCH 10/11] add to catalog properties --- .../gravitino/catalog/BaseCatalogPropertiesMetadata.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java index 699f2eafb7e..e7fae916266 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/BaseCatalogPropertiesMetadata.java @@ -22,6 +22,13 @@ public abstract class BaseCatalogPropertiesMetadata extends BasePropertiesMetada false, null, false, + false), + PropertyEntry.stringImmutablePropertyEntry( + BaseCatalog.CATALOG_OPERATION_IMPL, + "The classname of custom catalog operation to replace the default implementation", + false, + null, + false, false)), PropertyEntry::getName); } From 316526797743e3c9a6419cb0ccb48b79fa81d6e8 Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 14 Mar 2024 15:08:12 +0800 Subject: [PATCH 11/11] add to catalog properties --- .../gravitino/catalog/hive/TestHiveCatalogOperations.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java index 980850da37d..5833931a048 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveCatalogOperations.java @@ -16,6 +16,7 @@ import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.PRINCIPAL; import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.catalog.BaseCatalog; import com.datastrato.gravitino.catalog.PropertyEntry; import com.google.common.collect.Maps; import java.util.Map; @@ -67,9 +68,10 @@ void testPropertyMeta() { Map> propertyEntryMap = hiveCatalogOperations.catalogPropertiesMetadata().propertyEntries(); - Assertions.assertEquals(11, propertyEntryMap.size()); + Assertions.assertEquals(12, propertyEntryMap.size()); Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS)); Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE)); + Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL)); Assertions.assertTrue(propertyEntryMap.containsKey(CLIENT_POOL_SIZE)); Assertions.assertTrue(propertyEntryMap.containsKey(IMPERSONATION_ENABLE));