Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](hive)append support for struct and map column type on textfile format of hive table #22347

Merged
merged 7 commits into from
Aug 10, 2023
288 changes: 98 additions & 190 deletions be/src/exec/text_converter.cpp

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions be/src/exec/text_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TextConverter {
public:
static constexpr char NULL_STR[3] = {'\\', 'N', '\0'};

TextConverter(char escape_char, char array_delimiter = '\2');
TextConverter(char escape_char, char collection_delimiter = '\2', char map_kv_delimiter = '\3');

void write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
Expand All @@ -57,11 +57,23 @@ class TextConverter {
size_t rows);
void unescape_string_on_spot(const char* src, size_t* len);

void set_array_delimiter(char array_delimiter) { _array_delimiter = array_delimiter; }
void set_collection_delimiter(char collection_delimiter) {
_collection_delimiter = collection_delimiter;
}
void set_map_kv_delimiter(char mapkv_delimiter) { _map_kv_delimiter = mapkv_delimiter; }

private:
bool _write_data(const TypeDescriptor& type_desc, vectorized::IColumn* nullable_col_ptr,
const char* data, size_t len, bool copy_string, bool need_escape, size_t rows,
char array_delimiter);

char _escape_char;
char _array_delimiter;

//struct,array and map delimiter
char _collection_delimiter;

//map key and value delimiter
char _map_kv_delimiter;
};

} // namespace doris
16 changes: 10 additions & 6 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ Status CsvReader::init_reader(bool is_load) {
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();

//get array delimiter
_array_delimiter = _params.file_attributes.text_params.array_delimiter;
_text_converter->set_array_delimiter(_array_delimiter[0]);
_collection_delimiter = _params.file_attributes.text_params.collection_delimiter;
_text_converter->set_collection_delimiter(_collection_delimiter[0]);

_map_kv_delimiter = _params.file_attributes.text_params.mapkv_delimiter;
_text_converter->set_map_kv_delimiter(_map_kv_delimiter[0]);

if (_params.file_attributes.__isset.trim_double_quotes) {
_trim_double_quotes = _params.file_attributes.trim_double_quotes;
Expand Down Expand Up @@ -689,9 +691,11 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();

//get array delimiter
_array_delimiter = _params.file_attributes.text_params.array_delimiter;
_text_converter->set_array_delimiter(_array_delimiter[0]);
_collection_delimiter = _params.file_attributes.text_params.collection_delimiter;
_text_converter->set_collection_delimiter(_collection_delimiter[0]);

_map_kv_delimiter = _params.file_attributes.text_params.mapkv_delimiter;
_text_converter->set_map_kv_delimiter(_map_kv_delimiter[0]);

// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ class CsvReader : public GenericReader {

std::string _value_separator;
std::string _line_delimiter;
std::string _array_delimiter;

// struct, array and map delimiter
std::string _collection_delimiter;
// map key and value delimiter
std::string _map_kv_delimiter;

int _value_separator_length;
int _line_delimiter_length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
Expand Down Expand Up @@ -71,9 +75,12 @@ public class HiveScanNode extends FileQueryScanNode {
public static final String PROP_LINE_DELIMITER = "line.delim";
public static final String DEFAULT_LINE_DELIMITER = "\n";

public static final String PROP_ARRAY_DELIMITER_HIVE2 = "colelction.delim";
public static final String PROP_ARRAY_DELIMITER_HIVE3 = "collection.delim";
public static final String DEFAULT_ARRAY_DELIMITER = "\2";
public static final String PROP_COLLECTION_DELIMITER_HIVE2 = "colelction.delim";
public static final String PROP_COLLECTION_DELIMITER_HIVE3 = "collection.delim";
public static final String DEFAULT_COLLECTION_DELIMITER = "\2";

public static final String PROP_MAP_KV_DELIMITER = "mapkey.delim";
public static final String DEFAULT_MAP_KV_DELIMITER = "\003";

protected final HMSExternalTable hmsTable;
private HiveTransaction hiveTransaction = null;
Expand Down Expand Up @@ -104,10 +111,46 @@ protected void doInitialize() throws UserException {
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
if (inputFormat.contains("TextInputFormat")) {
for (SlotDescriptor slot : desc.getSlots()) {
if (slot.getType().isMapType() || slot.getType().isStructType()) {
if (slot.getType().isScalarType()) {
continue;
}
boolean supported = true;

// support Array<primitive_type> and array<array<...>>
if (slot.getType().isArrayType()) {
ArrayType arraySubType = (ArrayType) slot.getType();
while (true) {
if (arraySubType.getItemType().isArrayType()) {
arraySubType = (ArrayType) arraySubType.getItemType();
continue;
}
if (!arraySubType.getItemType().isScalarType()) {
supported = false;
}
break;
}
} else if (slot.getType().isMapType()) { //support map<primitive_type,primitive_type>
if (!((MapType) slot.getType()).getValueType().isScalarType()) {
supported = false;
}
} else if (slot.getType().isStructType()) { //support Struct< primitive_type,primitive_type ... >
StructType structSubType = (StructType) slot.getType();
structSubType.getColumnSize();
for (StructField f : structSubType.getFields()) {
if (!f.getType().isScalarType()) {
supported = false;
}
}
}

if (supported == false) {
throw new UserException("For column `" + slot.getColumn().getName()
+ "`, The column types MAP/STRUCT are not supported yet"
+ " for text input format of Hive. ");
+ "`, The column types are not supported yet"
+ " for text input format of Hive.\n"
+ "For complex type ,now Support :\n"
+ "\t1. array< primitive_type > and array< array< ... > >\n"
+ "\t2. map< primitive_type , primitive_type >\n"
+ "\t3. Struct< primitive_type , primitive_type ... >\n");
}
}
}
Expand Down Expand Up @@ -281,12 +324,15 @@ protected TFileAttributes getFileAttributes() throws UserException {
java.util.Map<String, String> delimiter = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
textParams.setColumnSeparator(delimiter.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
textParams.setLineDelimiter(delimiter.getOrDefault(PROP_LINE_DELIMITER, DEFAULT_LINE_DELIMITER));
if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE2) != null) {
textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE2));
} else if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE3) != null) {
textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE3));
textParams.setMapkvDelimiter(delimiter.getOrDefault(PROP_MAP_KV_DELIMITER, DEFAULT_MAP_KV_DELIMITER));

// textParams.collection_delimiter field is map, array and struct delimiter;
if (delimiter.get(PROP_COLLECTION_DELIMITER_HIVE2) != null) {
textParams.setCollectionDelimiter(delimiter.get(PROP_COLLECTION_DELIMITER_HIVE2));
} else if (delimiter.get(PROP_COLLECTION_DELIMITER_HIVE3) != null) {
textParams.setCollectionDelimiter(delimiter.get(PROP_COLLECTION_DELIMITER_HIVE3));
} else {
textParams.setArrayDelimiter(DEFAULT_ARRAY_DELIMITER);
textParams.setCollectionDelimiter(DEFAULT_COLLECTION_DELIMITER);
}
TFileAttributes fileAttributes = new TFileAttributes();
fileAttributes.setTextParams(textParams);
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ struct TEsScanRange {
struct TFileTextScanRangeParams {
1: optional string column_separator;
2: optional string line_delimiter;
3: optional string array_delimiter;
3: optional string collection_delimiter;// array ,map ,struct delimiter
4: optional string mapkv_delimiter;
}

struct TFileScanSlotInfo {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql1 --
1 {101:1} {102:10} {"field1":100} {"field2":2000000} {"field3":300000000} {"field4":3.14} {"field5":3.14159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}
2 {201:1} {202:11} {"field1":200} {"field2":9000000} {"field3":8000000000} {"field4":9.13321} {"field5":322.14159} {203:"Hello"} {"field6":2023-07-28 12:34:56.000000} {"field7":2023-07-28} {1, 1, 201, 300011000, 44444444444, 3.14, 3.14159, "world", 2023-07-28 12:34:56.000000, 2023-06-28}
3 {201:1} {202:10} {"field1":120} {"field2":44440000} {"field3":700000000} {"field4":3.100004} {"field5":3.00014159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000} {"field7":2023-07-28} {1, 1, 700, 300011000, 3333333334, 3.00014, 3.3314159, "hello world", 2023-07-28 01:34:56.000000, 2023-07-27}
10 {101:1, 102:1, 103:1} {102:10, 104:1, 105:2} {"field1":100, "field0":100} {"field2":3000000} {"field3":300000000} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}
11 {101:1, 102:1, 13:1, 12:1} {102:10, 14:1, 15:2, 12:10} {"field1":100, "fie88ld0":100, "fieweld0":100, "fieeeld1":100, "fieeeld0":100, "feeield0":100, "feeield1":100, "firreld0":100, "field0":100} {"field2":3000000, "abcd":4000000, "1231":3000000} {"fi7eld3":300000000, "field30":300000000, "fielwwd3":300000000, "fi055":300000000, "field7":300000121323} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello", 0:"hello"} {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}

-- !sql2 --
1 {101:1} {102:10} {"field1":100} {"field2":2000000} {"field3":300000000} {"field4":3.14} {"field5":3.14159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}
2 {201:1} {202:11} {"field1":200} {"field2":9000000} {"field3":8000000000} {"field4":9.13321} {"field5":322.14159} {203:"Hello"} {"field6":2023-07-28 12:34:56.000000} {"field7":2023-07-28} {1, 1, 201, 300011000, 44444444444, 3.14, 3.14159, "world", 2023-07-28 12:34:56.000000, 2023-06-28}
3 {201:1} {202:10} {"field1":120} {"field2":44440000} {"field3":700000000} {"field4":3.100004} {"field5":3.00014159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000} {"field7":2023-07-28} {1, 1, 700, 300011000, 3333333334, 3.00014, 3.3314159, "hello world", 2023-07-28 01:34:56.000000, 2023-07-27}
10 {101:1, 102:1, 103:1} {102:10, 104:1, 105:2} {"field1":100, "field0":100} {"field2":3000000} {"field3":300000000} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}
11 {101:1, 102:1, 13:1, 12:1} {102:10, 14:1, 15:2, 12:10} {"field1":100, "fie88ld0":100, "fieweld0":100, "fieeeld1":100, "fieeeld0":100, "feeield0":100, "feeield1":100, "firreld0":100, "field0":100} {"field2":3000000, "abcd":4000000, "1231":3000000} {"fi7eld3":300000000, "field30":300000000, "fielwwd3":300000000, "fi055":300000000, "field7":300000121323} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello", 0:"hello"} {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_text_complex_type", "p2,external,hive,external_remote,external_remote_hive") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_text_complex_type"

sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
logger.info("catalog " + catalog_name + " created")
sql """switch ${catalog_name};"""
logger.info("switched to catalog " + catalog_name)

sql """ use multi_catalog """

qt_sql1 """ select * from hive_text_complex_type order by column1; """

qt_sql2 """ select * from hive_text_complex_type_delimiter order by column1; """


}
}