Skip to content

Commit

Permalink
[fix](multi-catalog)resolve hive meta store compatibility for differe…
Browse files Browse the repository at this point in the history
…nt version issues (#32551)

Fix hive list partition at different version.
Only Hive3 uses the prependCatalogToDbName() to wrap db_name.
  • Loading branch information
wsjz authored Mar 22, 2024
1 parent c8976d2 commit df56ab3
Showing 1 changed file with 59 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,16 @@
* 7. getSchema()
* 8. getTableColumnStatistics()
* 9. getPartitionColumnStatistics()
* 10. getPartitionsByNames()
* 11. listPartitions()
* 12. alter_partition()
* 13. add_partitions()
* 14. dropPartition()
* 15. alter_table()
* 16. alter_table_with_environmentContext()
* 17. renamePartition()
* 18. truncateTable()
* 19. drop_table_with_environment_context()
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
Expand Down Expand Up @@ -529,7 +539,7 @@ public boolean isLocalMetaStore() {
@Override
public boolean isCompatibleWith(Configuration conf) {
// Make a copy of currentMetaVars, there is a race condition that
// currentMetaVars might be changed during the execution of the method
// currentMetaVars might be changed during the execution of the method
Map<String, String> currentMetaVarsCopy = currentMetaVars;
if (currentMetaVarsCopy == null) {
return false; // recreate
Expand Down Expand Up @@ -600,14 +610,14 @@ public void alter_table_with_environmentContext(String dbname, String tbl_name,
if (hook != null) {
hook.preAlterTable(new_tbl, envContext);
}
client.alter_table_with_environment_context(prependCatalogToDbName(dbname, conf),
tbl_name, new_tbl, envContext);
client.alter_table_with_environment_context(
prependCatalogToDbNameByVersion(hiveVersion, null, dbname, conf), tbl_name, new_tbl, envContext);
}

@Override
public void alter_table(String catName, String dbName, String tblName, Table newTable,
EnvironmentContext envContext) throws TException {
client.alter_table_with_environment_context(prependCatalogToDbName(catName,
client.alter_table_with_environment_context(prependCatalogToDbNameByVersion(hiveVersion, catName,
dbName, conf), tblName, newTable, envContext);
}

Expand All @@ -620,7 +630,8 @@ public void renamePartition(final String dbname, final String tableName, final L
@Override
public void renamePartition(String catName, String dbname, String tableName, List<String> part_vals,
Partition newPart) throws TException {
client.rename_partition(prependCatalogToDbName(catName, dbname, conf), tableName, part_vals, newPart);
client.rename_partition(prependCatalogToDbNameByVersion(hiveVersion, catName, dbname, conf),
tableName, part_vals, newPart);

}

Expand Down Expand Up @@ -882,8 +893,10 @@ public Partition add_partition(Partition new_part, EnvironmentContext envContext
@Override
public int add_partitions(List<Partition> new_parts) throws TException {
if (new_parts != null && !new_parts.isEmpty() && !new_parts.get(0).isSetCatName()) {
final String defaultCat = getDefaultCatalog(conf);
new_parts.forEach(p -> p.setCatName(defaultCat));
if (hiveVersion == HiveVersion.V3_0) {
final String defaultCat = getDefaultCatalog(conf);
new_parts.forEach(p -> p.setCatName(defaultCat));
}
}
return client.add_partitions(new_parts);
}
Expand Down Expand Up @@ -931,24 +944,24 @@ public Partition appendPartition(String dbName, String tableName, String partNam
@Override
public Partition appendPartition(String catName, String dbName, String tableName,
String name) throws TException {
Partition p = client.append_partition_by_name(prependCatalogToDbName(
catName, dbName, conf), tableName, name);
Partition p = client.append_partition_by_name(
prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf), tableName, name);
return deepCopy(p);
}

@Override
public Partition appendPartition(String catName, String dbName, String tableName,
List<String> partVals) throws TException {
Partition p = client.append_partition(prependCatalogToDbName(
catName, dbName, conf), tableName, partVals);
Partition p = client.append_partition(
prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf), tableName, partVals);
return deepCopy(p);
}

@Deprecated
public Partition appendPartition(String dbName, String tableName, List<String> partVals,
EnvironmentContext ec) throws TException {
return client.append_partition_with_environment_context(prependCatalogToDbName(dbName, conf),
tableName, partVals, ec).deepCopy();
return client.append_partition_with_environment_context(
prependCatalogToDbNameByVersion(hiveVersion, null, dbName, conf), tableName, partVals, ec).deepCopy();
}

/**
Expand Down Expand Up @@ -1260,7 +1273,7 @@ public void dropDatabase(String catalogName, String dbName, boolean deleteData,
}
}
}
client.drop_database(prependCatalogToDbName(catalogName, dbName, conf), deleteData, cascade);
client.drop_database(prependCatalogToDbNameByVersion(hiveVersion, catalogName, dbName, conf), deleteData, cascade);
}

@Override
Expand All @@ -1272,7 +1285,7 @@ public boolean dropPartition(String dbName, String tableName, String partName, b
@Override
public boolean dropPartition(String catName, String db_name, String tbl_name, String name,
boolean deleteData) throws TException {
return client.drop_partition_by_name_with_environment_context(prependCatalogToDbName(
return client.drop_partition_by_name_with_environment_context(prependCatalogToDbNameByVersion(hiveVersion,
catName, db_name, conf), tbl_name, name, deleteData, null);
}

Expand All @@ -1289,21 +1302,23 @@ private static EnvironmentContext getEnvironmentContextWithIfPurgeSet() {
@Deprecated
public boolean dropPartition(String db_name, String tbl_name, List<String> part_vals,
EnvironmentContext env_context) throws TException {
return client.drop_partition_with_environment_context(prependCatalogToDbName(db_name, conf),
tbl_name, part_vals, true, env_context);
return client.drop_partition_with_environment_context(
prependCatalogToDbNameByVersion(hiveVersion, null, db_name, conf), tbl_name, part_vals, true, env_context);
}

@Deprecated
public boolean dropPartition(String dbName, String tableName, String partName, boolean dropData,
EnvironmentContext ec) throws TException {
return client.drop_partition_by_name_with_environment_context(prependCatalogToDbName(dbName, conf),
return client.drop_partition_by_name_with_environment_context(
prependCatalogToDbNameByVersion(hiveVersion, null, dbName, conf),
tableName, partName, dropData, ec);
}

@Deprecated
public boolean dropPartition(String dbName, String tableName, List<String> partVals)
throws TException {
return client.drop_partition(prependCatalogToDbName(dbName, conf), tableName, partVals, true);
return client.drop_partition(prependCatalogToDbNameByVersion(hiveVersion, null, dbName, conf),
tableName, partVals, true);
}

@Override
Expand Down Expand Up @@ -1340,7 +1355,7 @@ public boolean dropPartition(String catName, String db_name, String tbl_name,
}
}
}
return client.drop_partition_with_environment_context(prependCatalogToDbName(
return client.drop_partition_with_environment_context(prependCatalogToDbNameByVersion(hiveVersion,
catName, db_name, conf), tbl_name, part_vals, options.deleteData,
options.purgeData ? getEnvironmentContextWithIfPurgeSet() : null);
}
Expand Down Expand Up @@ -1391,7 +1406,9 @@ public List<Partition> dropPartitions(String catName, String dbName, String tblN
}
rps.setExprs(exprs);
DropPartitionsRequest req = new DropPartitionsRequest(dbName, tblName, rps);
req.setCatName(catName);
if (hiveVersion == HiveVersion.V3_0) {
req.setCatName(catName);
}
req.setDeleteData(options.deleteData);
req.setNeedResult(options.returnResults);
req.setIfExists(options.ifExists);
Expand Down Expand Up @@ -1501,7 +1518,7 @@ public void truncateTable(String dbName, String tableName, List<String> partName
@Override
public void truncateTable(String catName, String dbName, String tableName, List<String> partNames)
throws TException {
client.truncate_table(prependCatalogToDbName(catName, dbName, conf), tableName, partNames);
client.truncate_table(prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf), tableName, partNames);
}

/**
Expand Down Expand Up @@ -1581,7 +1598,7 @@ public List<Partition> listPartitions(String db_name, String tbl_name, short max
@Override
public List<Partition> listPartitions(String catName, String db_name, String tbl_name,
int max_parts) throws TException {
List<Partition> parts = client.get_partitions(prependCatalogToDbName(catName, db_name, conf),
List<Partition> parts = client.get_partitions(prependCatalogToDbNameByVersion(hiveVersion, catName, db_name, conf),
tbl_name, shrinkMaxtoShort(max_parts));
return deepCopyPartitions(filterHook.filterPartitions(parts));
}
Expand All @@ -1607,7 +1624,8 @@ public List<Partition> listPartitions(String db_name, String tbl_name,
@Override
public List<Partition> listPartitions(String catName, String db_name, String tbl_name,
List<String> part_vals, int max_parts) throws TException {
List<Partition> parts = client.get_partitions_ps(prependCatalogToDbName(catName, db_name, conf),
List<Partition> parts = client.get_partitions_ps(
prependCatalogToDbNameByVersion(hiveVersion, catName, db_name, conf),
tbl_name, part_vals, shrinkMaxtoShort(max_parts));
return deepCopyPartitions(filterHook.filterPartitions(parts));
}
Expand All @@ -1624,7 +1642,7 @@ public List<Partition> listPartitionsWithAuthInfo(String db_name, String tbl_nam
public List<Partition> listPartitionsWithAuthInfo(String catName, String dbName, String tableName,
int maxParts, String userName,
List<String> groupNames) throws TException {
List<Partition> parts = client.get_partitions_with_auth(prependCatalogToDbName(catName,
List<Partition> parts = client.get_partitions_with_auth(prependCatalogToDbNameByVersion(hiveVersion, catName,
dbName, conf), tableName, shrinkMaxtoShort(maxParts), userName, groupNames);
return deepCopyPartitions(filterHook.filterPartitions(parts));
}
Expand All @@ -1643,8 +1661,8 @@ public List<Partition> listPartitionsWithAuthInfo(String catName, String dbName,
List<String> partialPvals, int maxParts,
String userName, List<String> groupNames)
throws TException {
List<Partition> parts = client.get_partitions_ps_with_auth(prependCatalogToDbName(catName,
dbName, conf), tableName, partialPvals, shrinkMaxtoShort(maxParts), userName, groupNames);
List<Partition> parts = client.get_partitions_ps_with_auth(prependCatalogToDbNameByVersion(hiveVersion,
catName, dbName, conf), tableName, partialPvals, shrinkMaxtoShort(maxParts), userName, groupNames);
return deepCopyPartitions(filterHook.filterPartitions(parts));
}

Expand Down Expand Up @@ -1784,8 +1802,8 @@ public Partition getPartitionWithAuthInfo(String db_name, String tbl_name,
public Partition getPartitionWithAuthInfo(String catName, String dbName, String tableName,
List<String> pvals, String userName,
List<String> groupNames) throws TException {
Partition p = client.get_partition_with_auth(prependCatalogToDbName(catName, dbName, conf), tableName,
pvals, userName, groupNames);
Partition p = client.get_partition_with_auth(prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf),
tableName, pvals, userName, groupNames);
return deepCopy(filterHook.filterPartition(p));
}

Expand Down Expand Up @@ -1866,7 +1884,7 @@ public List<String> listTableNamesByFilter(String dbName, String filter, short m
public List<String> listTableNamesByFilter(String catName, String dbName, String filter,
int maxTables) throws TException {
return filterHook.filterTableNames(catName, dbName,
client.get_table_names_by_filter(prependCatalogToDbName(catName, dbName, conf), filter,
client.get_table_names_by_filter(prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf), filter,
shrinkMaxtoShort(maxTables)));
}

Expand Down Expand Up @@ -2065,8 +2083,8 @@ public void alter_partition(String dbName, String tblName, Partition newPart, En
@Override
public void alter_partition(String catName, String dbName, String tblName, Partition newPart,
EnvironmentContext environmentContext) throws TException {
client.alter_partition_with_environment_context(prependCatalogToDbName(catName, dbName, conf), tblName,
newPart, environmentContext);
client.alter_partition_with_environment_context(prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf),
tblName, newPart, environmentContext);
}

@Override
Expand All @@ -2085,7 +2103,8 @@ public void alter_partitions(String dbName, String tblName, List<Partition> newP
public void alter_partitions(String catName, String dbName, String tblName,
List<Partition> newParts,
EnvironmentContext environmentContext) throws TException {
client.alter_partitions_with_environment_context(prependCatalogToDbName(catName, dbName, conf),
client.alter_partitions_with_environment_context(
prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf),
tblName, newParts, environmentContext);
}

Expand All @@ -2096,7 +2115,7 @@ public void alterDatabase(String dbName, Database db) throws TException {

@Override
public void alterDatabase(String catName, String dbName, Database newDb) throws TException {
client.alter_database(prependCatalogToDbName(catName, dbName, conf), newDb);
client.alter_database(prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf), newDb);
}

@Override
Expand Down Expand Up @@ -2249,7 +2268,8 @@ public boolean deletePartitionColumnStatistics(String dbName, String tableName,
public boolean deletePartitionColumnStatistics(String catName, String dbName, String tableName,
String partName, String colName)
throws TException {
return client.delete_partition_column_statistics(prependCatalogToDbName(catName, dbName, conf),
return client.delete_partition_column_statistics(
prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf),
tableName, partName, colName);
}

Expand All @@ -2262,7 +2282,7 @@ public boolean deleteTableColumnStatistics(String dbName, String tableName, Stri
@Override
public boolean deleteTableColumnStatistics(String catName, String dbName, String tableName,
String colName) throws TException {
return client.delete_table_column_statistics(prependCatalogToDbName(catName, dbName, conf),
return client.delete_table_column_statistics(prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf),
tableName, colName);
}

Expand Down Expand Up @@ -2304,8 +2324,8 @@ public Partition getPartition(String db, String tableName, String partName) thro
@Override
public Partition getPartition(String catName, String dbName, String tblName, String name)
throws TException {
Partition p = client.get_partition_by_name(prependCatalogToDbName(catName, dbName, conf), tblName,
name);
Partition p = client.get_partition_by_name(prependCatalogToDbNameByVersion(hiveVersion, catName, dbName, conf),
tblName, name);
return deepCopy(filterHook.filterPartition(p));
}

Expand Down Expand Up @@ -3144,7 +3164,7 @@ protected void create_table_with_environment_context(Table tbl, EnvironmentConte

protected void drop_table_with_environment_context(String catName, String dbname, String name,
boolean deleteData, EnvironmentContext envContext) throws TException {
client.drop_table_with_environment_context(prependCatalogToDbName(catName, dbname, conf),
client.drop_table_with_environment_context(prependCatalogToDbNameByVersion(hiveVersion, catName, dbname, conf),
name, deleteData, envContext);
}

Expand Down

0 comments on commit df56ab3

Please sign in to comment.