Skip to content

Commit

Permalink
[feature](insert)implement hive table sink plan (#31765)
Browse files Browse the repository at this point in the history
Issue Number: #31442
  • Loading branch information
wsjz authored Mar 18, 2024
1 parent 79a8620 commit 0deac15
Show file tree
Hide file tree
Showing 42 changed files with 1,268 additions and 504 deletions.
17 changes: 3 additions & 14 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2216,21 +2216,10 @@ public class Config extends ConfigBase {
"Enable external table DDL"})
public static boolean enable_external_ddl = false;


@ConfField(mutable = true, masterOnly = true, description = {
"Hive创建外部表默认指定的input format",
"Default hive input format for creating table."})
public static String hive_default_input_format = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";

@ConfField(mutable = true, masterOnly = true, description = {
"Hive创建外部表默认指定的output format",
"Default hive output format for creating table."})
public static String hive_default_output_format = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";

@ConfField(mutable = true, masterOnly = true, description = {
"Hive创建外部表默认指定的SerDe类",
"Default hive serde class for creating table."})
public static String hive_default_serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
"Hive创建外部表默认指定的文件格式",
"Default hive file format for creating table."})
public static String hive_default_file_format = "orc";

@ConfField
public static int statistics_sql_parallel_exec_instance_num = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
Expand Down Expand Up @@ -176,6 +177,8 @@ default CatalogLog constructEditLog() {
return log;
}

TableName getTableNameByTableId(Long tableId);

// Return a copy of all db collection.
Collection<DatabaseIf<? extends TableIf>> getAllDbs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
Expand Down Expand Up @@ -396,6 +397,16 @@ public List<String> getDbNamesOrEmpty() {
}
}

public TableName getTableNameByTableId(Long tableId) {
for (DatabaseIf<?> db : idToDb.values()) {
TableIf table = db.getTableNullable(tableId);
if (table != null) {
return new TableName(getName(), db.getFullName(), table.getName());
}
}
return null;
}

@Override
public String getResource() {
return catalogProperty.getResource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public TableName getTableNameByTableId(Long tableId) {
for (Database db : fullNameToDb.values()) {
Table table = db.getTableNullable(tableId);
if (table != null) {
return new TableName("", db.getFullName(), table.getName());
return new TableName(INTERNAL_CATALOG_NAME, db.getFullName(), table.getName());
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface HMSCachedClient {

List<String> listPartitionNames(String dbName, String tblName);

List<Partition> listPartitions(String dbName, String tblName);

List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum);

Partition getPartition(String dbName, String tblName, List<String> partitionValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,13 @@ public void createTable(CreateTableStmt stmt) throws UserException {
}
try {
Map<String, String> props = stmt.getExtProperties();
String inputFormat = props.getOrDefault("input_format", Config.hive_default_input_format);
String outputFormat = props.getOrDefault("output_format", Config.hive_default_output_format);
String serDe = props.getOrDefault("serde", Config.hive_default_serde);
String fileFormat = props.getOrDefault("file_format", Config.hive_default_file_format);
HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName,
tblName,
stmt.getColumns(),
parsePartitionKeys(props),
props,
inputFormat,
outputFormat,
serDe);
fileFormat);

client.createTable(catalogTable, stmt.isSetIfNotExists());
db.setUnInitialized(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ public class HiveTableMetadata implements TableMetadata {
private String tableName;
private List<Column> columns;
private List<FieldSchema> partitionKeys;
private String inputFormat;
private String outputFormat;
private String serDe;
private String fileFormat;
private Map<String, String> properties;
// private String viewSql;

Expand All @@ -41,16 +39,12 @@ public HiveTableMetadata(String dbName,
List<Column> columns,
List<FieldSchema> partitionKeys,
Map<String, String> props,
String inputFormat,
String outputFormat,
String serDe) {
String fileFormat) {
this.dbName = dbName;
this.tableName = tblName;
this.columns = columns;
this.partitionKeys = partitionKeys;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.serDe = serDe;
this.fileFormat = fileFormat;
this.properties = props;
}

Expand All @@ -77,26 +71,16 @@ public List<FieldSchema> getPartitionKeys() {
return partitionKeys;
}

public String getInputFormat() {
return inputFormat;
}

public String getOutputFormat() {
return outputFormat;
}

public String getSerDe() {
return serDe;
public String getFileFormat() {
return fileFormat;
}

public static HiveTableMetadata of(String dbName,
String tblName,
List<Column> columns,
List<FieldSchema> partitionKeys,
Map<String, String> props,
String inputFormat,
String outputFormat, String serDe) {
return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props,
inputFormat, outputFormat, serDe);
String fileFormat) {
return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, fileFormat);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public List<String> listPartitionNames(String dbName, String tblName) {
return listPartitionNames(dbName, tblName, (long) -1);
}

public List<Partition> listPartitions(String dbName, String tblName) {
return getPartitionsByNames(dbName, tblName, ImmutableList.of());
}

@Override
public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) {
String sql = String.format("SELECT \"PART_NAME\" from \"PARTITIONS\" WHERE \"TBL_ID\" = ("
Expand Down Expand Up @@ -173,15 +177,26 @@ public List<Partition> getPartitions(String dbName, String tblName, List<String>
private List<Partition> getPartitionsByNames(String dbName, String tblName, List<String> partitionNames) {
List<String> partitionNamesWithQuote = partitionNames.stream().map(partitionName -> "'" + partitionName + "'")
.collect(Collectors.toList());
String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote);
String sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+ " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
+ " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\""
+ " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\""
+ " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'"
+ " AND \"PART_NAME\" in (%s);",
dbName, tblName, partitionNamesString);
String sql;
if (partitionNamesWithQuote.isEmpty()) {
sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+ " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
+ " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\""
+ " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\""
+ " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s';",
dbName, tblName);
} else {
String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote);
sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+ " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
+ " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\""
+ " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\""
+ " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'"
+ " AND \"PART_NAME\" in (%s);",
dbName, tblName, partitionNamesString);
}
if (LOG.isDebugEnabled()) {
LOG.debug("getPartitionsByNames exec sql: {}", sql);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ private static Table toHiveTable(HiveTableMetadata hiveTable) {
// table.setRetention(0);
String location = hiveTable.getProperties().get("external_location");
table.setSd(toHiveStorageDesc(hiveTable.getColumns(),
hiveTable.getInputFormat(),
hiveTable.getOutputFormat(),
hiveTable.getSerDe(),
hiveTable.getFileFormat(),
location));
table.setPartitionKeys(hiveTable.getPartitionKeys());
// table.setViewOriginalText(hiveTable.getViewSql());
Expand All @@ -213,15 +211,10 @@ private static Table toHiveTable(HiveTableMetadata hiveTable) {
return table;
}

private static StorageDescriptor toHiveStorageDesc(List<Column> columns, String inputFormat, String outputFormat,
String serDe, String location) {
private static StorageDescriptor toHiveStorageDesc(List<Column> columns, String fileFormat, String location) {
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(toHiveColumns(columns));
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(serDe);
sd.setSerdeInfo(serDeInfo);
sd.setInputFormat(inputFormat);
sd.setOutputFormat(outputFormat);
setFileFormat(fileFormat, sd);
if (StringUtils.isNotEmpty(location)) {
sd.setLocation(location);
}
Expand All @@ -231,6 +224,28 @@ private static StorageDescriptor toHiveStorageDesc(List<Column> columns, String
return sd;
}

private static void setFileFormat(String fileFormat, StorageDescriptor sd) {
String inputFormat;
String outputFormat;
String serDe;
if (fileFormat.equalsIgnoreCase("orc")) {
inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
serDe = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
} else if (fileFormat.equalsIgnoreCase("parquet")) {
inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
outputFormat = "'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
serDe = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
} else {
throw new IllegalArgumentException("Creating table with an unsupported file format: " + fileFormat);
}
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(serDe);
sd.setSerdeInfo(serDeInfo);
sd.setInputFormat(inputFormat);
sd.setOutputFormat(outputFormat);
}

private static List<FieldSchema> toHiveColumns(List<Column> columns) {
List<FieldSchema> result = new ArrayList<>();
for (Column column : columns) {
Expand Down Expand Up @@ -297,6 +312,19 @@ public List<String> listPartitionNames(String dbName, String tblName) {
return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
}

public List<Partition> listPartitions(String dbName, String tblName) {
try (ThriftHMSClient client = getClient()) {
try {
return ugiDoAs(() -> client.client.listPartitions(dbName, tblName, MAX_LIST_PARTITION_NUM));
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
} catch (Exception e) {
throw new HMSClientException("failed to check if table %s in db %s exists", e, tblName, dbName);
}
}

@Override
public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) {
// list all parts when the limit is greater than the short maximum
Expand Down
Loading

0 comments on commit 0deac15

Please sign in to comment.