Skip to content

Commit

Permalink
Handle mixed case partition names in sync_partition_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
aalbu authored and findepi committed Jun 1, 2020
1 parent 35e8734 commit 86886ef
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import static io.prestosql.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.prestosql.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static io.prestosql.spi.block.MethodHandleUtil.methodHandle;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.lang.Boolean.TRUE;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

Expand All @@ -71,7 +73,8 @@ public enum SyncMode
ConnectorSession.class,
String.class,
String.class,
String.class);
String.class,
boolean.class);

private final TransactionalMetadataFactory hiveMetadataFactory;
private final HdfsEnvironment hdfsEnvironment;
Expand All @@ -94,18 +97,19 @@ public Procedure get()
ImmutableList.of(
new Argument("schema_name", VARCHAR),
new Argument("table_name", VARCHAR),
new Argument("mode", VARCHAR)),
new Argument("mode", VARCHAR),
new Argument("case_sensitive", BOOLEAN, false, TRUE)),
SYNC_PARTITION_METADATA.bindTo(this));
}

public void syncPartitionMetadata(ConnectorSession session, String schemaName, String tableName, String mode)
public void syncPartitionMetadata(ConnectorSession session, String schemaName, String tableName, String mode, boolean caseSensitive)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doSyncPartitionMetadata(session, schemaName, tableName, mode);
doSyncPartitionMetadata(session, schemaName, tableName, mode, caseSensitive);
}
}

private void doSyncPartitionMetadata(ConnectorSession session, String schemaName, String tableName, String mode)
private void doSyncPartitionMetadata(ConnectorSession session, String schemaName, String tableName, String mode, boolean caseSensitive)
{
SyncMode syncMode = toSyncMode(mode);
HdfsContext hdfsContext = new HdfsContext(session, schemaName, tableName);
Expand All @@ -127,7 +131,7 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation);
List<String> partitionsInMetastore = metastore.getPartitionNames(identity, schemaName, tableName)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
List<String> partitionsInFileSystem = listDirectory(fileSystem, fileSystem.getFileStatus(tableLocation), table.getPartitionColumns(), table.getPartitionColumns().size()).stream()
List<String> partitionsInFileSystem = listDirectory(fileSystem, fileSystem.getFileStatus(tableLocation), table.getPartitionColumns(), table.getPartitionColumns().size(), caseSensitive).stream()
.map(fileStatus -> fileStatus.getPath().toUri())
.map(uri -> tableLocation.toUri().relativize(uri).getPath())
.collect(toImmutableList());
Expand All @@ -144,28 +148,31 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName
syncPartitions(partitionsToAdd, partitionsToDrop, syncMode, metastore, session, table);
}

private static List<FileStatus> listDirectory(FileSystem fileSystem, FileStatus current, List<Column> partitionColumns, int depth)
private static List<FileStatus> listDirectory(FileSystem fileSystem, FileStatus current, List<Column> partitionColumns, int depth, boolean caseSensitive)
{
if (depth == 0) {
return ImmutableList.of(current);
}

try {
return Stream.of(fileSystem.listStatus(current.getPath()))
.filter(fileStatus -> isValidPartitionPath(fileStatus, partitionColumns.get(partitionColumns.size() - depth)))
.flatMap(directory -> listDirectory(fileSystem, directory, partitionColumns, depth - 1).stream())
.filter(fileStatus -> isValidPartitionPath(fileStatus, partitionColumns.get(partitionColumns.size() - depth), caseSensitive))
.flatMap(directory -> listDirectory(fileSystem, directory, partitionColumns, depth - 1, caseSensitive).stream())
.collect(toImmutableList());
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, e);
}
}

private static boolean isValidPartitionPath(FileStatus file, Column column)
private static boolean isValidPartitionPath(FileStatus file, Column column, boolean caseSensitive)
{
Path path = file.getPath();
String path = file.getPath().getName();
if (!caseSensitive) {
path = path.toLowerCase(ENGLISH);
}
String prefix = column.getName() + '=';
return file.isDirectory() && path.getName().startsWith(prefix);
return file.isDirectory() && path.startsWith(prefix);
}

// calculate relative complement of set b with respect to set a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static io.prestosql.tests.TestGroups.HIVE_PARTITIONING;
import static io.prestosql.tests.TestGroups.PRESTO_JDBC;
import static io.prestosql.tests.TestGroups.SMOKE;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestSyncPartitionMetadata
extends ProductTest
Expand All @@ -49,8 +51,8 @@ public void testAddPartition()

query("CALL system.sync_partition_metadata('default', '" + tableName + "', 'ADD')");
assertPartitions(tableName, row("a", "1"), row("b", "2"), row("f", "9"));
assertThat(() -> query("SELECT payload, x, y FROM " + tableName + " ORDER BY 1, 2, 3 ASC"))
.failsWithMessage("Partition location does not exist: hdfs://hadoop-master:9000/user/hive/warehouse/" + tableName + "/x=b/y=2");
assertThat(() -> query("SELECT payload, col_x, col_y FROM " + tableName + " ORDER BY 1, 2, 3 ASC"))
.failsWithMessage("Partition location does not exist: hdfs://hadoop-master:9000/user/hive/warehouse/" + tableName + "/col_x=b/col_y=2");
cleanup(tableName);
}

Expand Down Expand Up @@ -92,25 +94,59 @@ public void testInvalidSyncMode()
cleanup(tableName);
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
public void testMixedCasePartitionNames()
{
String tableName = "test_sync_partition_mixed_case";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
String tableLocation = WAREHOUSE_DIRECTORY_PATH + tableName;
HiveDataSource dataSource = createResourceDataSource(tableName, "io/prestosql/tests/hive/data/single_int_column/data.orc");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/col_x=h/col_Y=11", dataSource);
hdfsClient.createDirectory(tableLocation + "/COL_X=UPPER/COL_Y=12");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/COL_X=UPPER/COL_Y=12", dataSource);

query("CALL system.sync_partition_metadata('default', '" + tableName + "', 'FULL', false)");
assertPartitions(tableName, row("UPPER", "12"), row("a", "1"), row("f", "9"), row("g", "10"), row("h", "11"));
assertData(tableName, row(1, "a", "1"), row(42, "UPPER", "12"), row(42, "f", "9"), row(42, "g", "10"), row(42, "h", "11"));
}

@Test(groups = {HIVE_PARTITIONING, SMOKE})
public void testConflictingMixedCasePartitionNames()
{
String tableName = "test_sync_partition_mixed_case";
prepare(hdfsClient, hdfsDataSourceWriter, tableName);
String tableLocation = WAREHOUSE_DIRECTORY_PATH + tableName;
HiveDataSource dataSource = createResourceDataSource(tableName, "io/prestosql/tests/hive/data/single_int_column/data.orc");
// this conflicts with a partition that already exits in the metastore
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/COL_X=a/cOl_y=1", dataSource);

assertThatThrownBy(() -> query("CALL system.sync_partition_metadata('default', '" + tableName + "', 'ADD', false)"))
.hasMessageContaining(format("One or more partitions already exist for table 'default.%s'", tableName));
assertPartitions(tableName, row("a", "1"), row("b", "2"));
}

private static void prepare(HdfsClient hdfsClient, HdfsDataSourceWriter hdfsDataSourceWriter, String tableName)
{
query("DROP TABLE IF EXISTS " + tableName);

query("CREATE TABLE " + tableName + " (payload bigint, x varchar, y varchar) WITH (format = 'ORC', partitioned_by = ARRAY[ 'x', 'y' ])");
query("CREATE TABLE " + tableName + " (payload bigint, col_x varchar, col_y varchar) WITH (format = 'ORC', partitioned_by = ARRAY[ 'col_x', 'col_y' ])");
query("INSERT INTO " + tableName + " VALUES (1, 'a', '1'), (2, 'b', '2')");

String tableLocation = WAREHOUSE_DIRECTORY_PATH + tableName;
// remove partition x=b/y=2
hdfsClient.delete(tableLocation + "/x=b/y=2");
// add partition directory x=f/y=9 with single_int_column/data.orc file
hdfsClient.createDirectory(tableLocation + "/x=f/y=9");
// remove partition col_x=b/col_y=2
hdfsClient.delete(tableLocation + "/col_x=b/col_y=2");
// add partition directory col_x=f/col_y=9 with single_int_column/data.orc file
hdfsClient.createDirectory(tableLocation + "/col_x=f/col_y=9");
HiveDataSource dataSource = createResourceDataSource(tableName, "io/prestosql/tests/hive/data/single_int_column/data.orc");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/x=f/y=9", dataSource);
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/col_x=f/col_y=9", dataSource);
// should only be picked up when not in case sensitive mode
hdfsClient.createDirectory(tableLocation + "/COL_X=g/col_y=10");
hdfsDataSourceWriter.ensureDataOnHdfs(tableLocation + "/COL_X=g/col_y=10", dataSource);

// add invalid partition path
hdfsClient.createDirectory(tableLocation + "/x=d");
hdfsClient.createDirectory(tableLocation + "/y=3/x=h");
hdfsClient.createDirectory(tableLocation + "/y=3");
hdfsClient.createDirectory(tableLocation + "/col_x=d");
hdfsClient.createDirectory(tableLocation + "/col_y=3/col_x=h");
hdfsClient.createDirectory(tableLocation + "/col_y=3");
hdfsClient.createDirectory(tableLocation + "/xyz");

assertPartitions(tableName, row("a", "1"), row("b", "2"));
Expand All @@ -123,13 +159,13 @@ private static void cleanup(String tableName)

private static void assertPartitions(String tableName, QueryAssert.Row... rows)
{
QueryResult partitionListResult = query("SELECT * FROM \"" + tableName + "$partitions\"");
QueryResult partitionListResult = query("SELECT * FROM \"" + tableName + "$partitions\" ORDER BY 1, 2");
assertThat(partitionListResult).containsExactly(rows);
}

private static void assertData(String tableName, QueryAssert.Row... rows)
{
QueryResult dataResult = query("SELECT payload, x, y FROM " + tableName + " ORDER BY 1, 2, 3 ASC");
QueryResult dataResult = query("SELECT payload, col_x, col_y FROM " + tableName + " ORDER BY 1, 2, 3 ASC");
assertThat(dataResult).containsExactly(rows);
}
}

0 comments on commit 86886ef

Please sign in to comment.