Skip to content

Commit

Permalink
Spark: Added ability to add uuid suffix to the table location in Hive…
Browse files Browse the repository at this point in the history
… catalog
  • Loading branch information
sshkvar committed Jul 22, 2021
1 parent 2a39712 commit 52ab5f4
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -54,17 +55,22 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);

public static final String APPEND_UUID_SUFFIX_TO_TABLE_LOCATION = "append-uuid-suffix-to-table-location";
public static final boolean APPEND_UUID_SUFFIX_TO_TABLE_LOCATION_DEFAULT = false;

private String name;
private Configuration conf;
private FileIO fileIO;
private ClientPool<HiveMetaStoreClient, TException> clients;
private boolean useUniqueTableName;

public HiveCatalog() {
}
Expand Down Expand Up @@ -107,6 +113,10 @@ public void initialize(String inputName, Map<String, String> properties) {
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

this.clients = new CachedClientPool(conf, properties);
if (properties.containsKey(APPEND_UUID_SUFFIX_TO_TABLE_LOCATION)) {
this.useUniqueTableName = PropertyUtil.propertyAsBoolean(properties,
APPEND_UUID_SUFFIX_TO_TABLE_LOCATION, APPEND_UUID_SUFFIX_TO_TABLE_LOCATION_DEFAULT);
}
}

@Override
Expand Down Expand Up @@ -438,7 +448,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
Database databaseData = clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0]));
if (databaseData.getLocationUri() != null) {
// If the database location is set use it as a base.
return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name());
return String.format("%s/%s", databaseData.getLocationUri(), getTableName(tableIdentifier));
}

} catch (TException e) {
Expand All @@ -455,7 +465,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
"%s/%s.db/%s",
warehouseLocation,
tableIdentifier.namespace().levels()[0],
tableIdentifier.name());
getTableName(tableIdentifier));
}

private String getWarehouseLocation() {
Expand All @@ -464,6 +474,14 @@ private String getWarehouseLocation() {
return warehouseLocation;
}

private String getTableName(TableIdentifier tableIdentifier) {
if (useUniqueTableName) {
return String.format("%s-%s", tableIdentifier.name(), UUID.randomUUID());
} else {
return tableIdentifier.name();
}
}

private Map<String, String> convertToMetadata(Database database) {

Map<String, String> meta = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -71,6 +72,12 @@ public static Object[][] parameters() {
"default-namespace", "default",
"parquet-enabled", "true",
"cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
) },
{ "spark_catalog_with_unique_location", SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
HiveCatalog.APPEND_UUID_SUFFIX_TO_TABLE_LOCATION, "true"
) }
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
Expand All @@ -38,8 +39,14 @@
import org.junit.Test;

public class TestCreateTable extends SparkCatalogTestBase {
private boolean shouldHaveUniqueTableLocation;

public TestCreateTable(String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);

if ("true".equals(config.get(HiveCatalog.APPEND_UUID_SUFFIX_TO_TABLE_LOCATION))) {
this.shouldHaveUniqueTableLocation = true;
}
}

@After
Expand Down Expand Up @@ -221,4 +228,24 @@ public void testCreateTableProperties() {
Assert.assertEquals("Should have property p1", "2", table.properties().get("p1"));
Assert.assertEquals("Should have property p2", "x", table.properties().get("p2"));
}

@Test
public void testCreateTableWithUniqueLocation() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertNotNull("Should load the new table", table);

String location = table.location();
if (shouldHaveUniqueTableLocation) {
String tableNameWithUuidSuffix = ".*[0-9a-f]{8}\\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\\b[0-9a-f]{12}";
Assert.assertTrue(
"Should have uuid suffix in table name",
location.matches(tableNameWithUuidSuffix));
} else {
Assert.assertTrue("Should end with table name ",
location.endsWith(tableIdent.name()));
}
}
}

0 comments on commit 52ab5f4

Please sign in to comment.