From 52ab5f4dc168f4b47729110dc69a9d3817776638 Mon Sep 17 00:00:00 2001 From: serhiish Date: Thu, 22 Jul 2021 15:35:07 +0300 Subject: [PATCH] Spark: Added ability to add uuid suffix to the table location in Hive catalog --- .../org/apache/iceberg/hive/HiveCatalog.java | 22 +++++++++++++-- .../iceberg/spark/SparkCatalogTestBase.java | 7 +++++ .../iceberg/spark/sql/TestCreateTable.java | 27 +++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 68ff6255f766..c1dac57cc1b8 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -54,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,10 +63,14 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + public static final String APPEND_UUID_SUFFIX_TO_TABLE_LOCATION = "append-uuid-suffix-to-table-location"; + public static final boolean APPEND_UUID_SUFFIX_TO_TABLE_LOCATION_DEFAULT = false; + private String name; private Configuration conf; private FileIO fileIO; private ClientPool clients; + private boolean useUniqueTableName; public HiveCatalog() { } @@ -107,6 +113,10 @@ public void initialize(String inputName, Map properties) { this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); this.clients = new CachedClientPool(conf, properties); + if (properties.containsKey(APPEND_UUID_SUFFIX_TO_TABLE_LOCATION)) { + this.useUniqueTableName = PropertyUtil.propertyAsBoolean(properties, + APPEND_UUID_SUFFIX_TO_TABLE_LOCATION, APPEND_UUID_SUFFIX_TO_TABLE_LOCATION_DEFAULT); + } } @Override @@ -438,7 +448,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { Database databaseData = clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0])); if (databaseData.getLocationUri() != null) { // If the database location is set use it as a base. - return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name()); + return String.format("%s/%s", databaseData.getLocationUri(), getTableName(tableIdentifier)); } } catch (TException e) { @@ -455,7 +465,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { "%s/%s.db/%s", warehouseLocation, tableIdentifier.namespace().levels()[0], - tableIdentifier.name()); + getTableName(tableIdentifier)); } private String getWarehouseLocation() { @@ -464,6 +474,14 @@ private String getWarehouseLocation() { return warehouseLocation; } + private String getTableName(TableIdentifier tableIdentifier) { + if (useUniqueTableName) { + return String.format("%s-%s", tableIdentifier.name(), UUID.randomUUID()); + } else { + return tableIdentifier.name(); + } + } + private Map convertToMetadata(Database database) { Map meta = Maps.newHashMap(); diff --git a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java index 106ba122d206..20229d1855a8 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java @@ -27,6 +27,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.AfterClass; import org.junit.Assert; @@ -71,6 +72,12 @@ public static Object[][] parameters() { "default-namespace", "default", "parquet-enabled", "true", "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync + ) }, + { "spark_catalog_with_unique_location", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + HiveCatalog.APPEND_UUID_SUFFIX_TO_TABLE_LOCATION, "true" ) } }; } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java index 9d8711c24403..da87dba19f26 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java @@ -27,6 +27,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; @@ -38,8 +39,14 @@ import org.junit.Test; public class TestCreateTable extends SparkCatalogTestBase { + private boolean shouldHaveUniqueTableLocation; + public TestCreateTable(String catalogName, String implementation, Map config) { super(catalogName, implementation, config); + + if ("true".equals(config.get(HiveCatalog.APPEND_UUID_SUFFIX_TO_TABLE_LOCATION))) { + this.shouldHaveUniqueTableLocation = true; + } } @After @@ -221,4 +228,24 @@ public void testCreateTableProperties() { Assert.assertEquals("Should have property p1", "2", table.properties().get("p1")); Assert.assertEquals("Should have property p2", "x", table.properties().get("p2")); } + + @Test + public void testCreateTableWithUniqueLocation() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + String location = table.location(); + if (shouldHaveUniqueTableLocation) { + String tableNameWithUuidSuffix = ".*[0-9a-f]{8}\\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\\b[0-9a-f]{12}"; + Assert.assertTrue( + "Should have uuid suffix in table name", + location.matches(tableNameWithUuidSuffix)); + } else { + Assert.assertTrue("Should end with table name ", + location.endsWith(tableIdent.name())); + } + } }