Skip to content

Commit

Permalink
Folder: substrait
Browse files Browse the repository at this point in the history
relative pr:

Check a fallback case in validation: using literal partition key in window function facebookincubator#148
Fix might_contain validate fallback and support struct literal facebookincubator#137
Implement datetime functions in velox/sparksql. facebookincubator#81
Parse options in SingularOrList correctly facebookincubator#48
Add SingularOrList support facebookincubator#45
Support if then in filter facebookincubator#74
Fix semi join output type and support existence join facebookincubator#67
Support decimal as partition column facebookincubator#167
Add the window support facebookincubator#61
Add expand operator facebookincubator#65
Support more cases of filter and its pushdown facebookincubator#14
  • Loading branch information
zhejiangxiaomai authored and rui-mo committed Jul 21, 2023
1 parent fd0ccc7 commit a9a05b1
Show file tree
Hide file tree
Showing 30 changed files with 5,434 additions and 367 deletions.
11 changes: 5 additions & 6 deletions velox/substrait/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ get_filename_component(PROTO_DIR ${substrait_proto_directory}/, DIRECTORY)
# Generate Substrait hearders
add_custom_command(
OUTPUT ${PROTO_OUTPUT_FILES}
COMMAND
${Protobuf_PROTOC_EXECUTABLE} --proto_path ${PROJECT_SOURCE_DIR}/
--proto_path ${Protobuf_INCLUDE_DIRS} --cpp_out ${PROJECT_SOURCE_DIR}
${PROTO_FILES}
COMMAND ${Protobuf_PROTOC_EXECUTABLE} --proto_path ${proto_directory}/ --cpp_out ${PROTO_OUTPUT_DIR}
${PROTO_FILES}
DEPENDS ${PROTO_DIR}
COMMENT "Running PROTO compiler"
VERBATIM)
Expand All @@ -54,13 +52,14 @@ set(SRCS
VeloxToSubstraitPlan.cpp
VeloxToSubstraitType.cpp
VeloxSubstraitSignature.cpp
VariantToVectorConverter.cpp)
VariantToVectorConverter.cpp
SubstraitToVeloxPlanValidator.cpp)

add_library(velox_substrait_plan_converter ${SRCS})
target_include_directories(velox_substrait_plan_converter
PUBLIC ${PROTO_OUTPUT_DIR})
target_link_libraries(velox_substrait_plan_converter velox_connector
velox_dwio_dwrf_common)
velox_dwio_dwrf_common velox_functions_spark)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
132 changes: 130 additions & 2 deletions velox/substrait/SubstraitParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ std::shared_ptr<SubstraitParser::SubstraitType> SubstraitParser::parseType(
nullability = substraitType.date().nullability();
break;
}
case ::substrait::Type::KindCase::kTimestamp: {
typeName = "TIMESTAMP";
nullability = substraitType.timestamp().nullability();
break;
}
case ::substrait::Type::KindCase::kDecimal: {
auto precision = substraitType.decimal().precision();
auto scale = substraitType.decimal().scale();
if (precision <= 18) {
typeName = "SHORT_DECIMAL<" + std::to_string(precision) + "," +
std::to_string(scale) + ">";
} else {
typeName = "LONG_DECIMAL<" + std::to_string(precision) + "," +
std::to_string(scale) + ">";
}

nullability = substraitType.decimal().nullability();
break;
}
default:
VELOX_NYI(
"Parsing for Substrait type not supported: {}",
Expand All @@ -144,6 +163,14 @@ std::shared_ptr<SubstraitParser::SubstraitType> SubstraitParser::parseType(
return std::make_shared<SubstraitType>(type);
}

std::string SubstraitParser::parseType(const std::string& substraitType) {
auto it = typeMap_.find(substraitType);
if (it == typeMap_.end()) {
VELOX_NYI("Substrait parsing for type {} not supported.", substraitType);
}
return it->second;
};

std::vector<std::shared_ptr<SubstraitParser::SubstraitType>>
SubstraitParser::parseNamedStruct(const ::substrait::NamedStruct& namedStruct) {
// Nte that "names" are not used.
Expand All @@ -160,6 +187,36 @@ SubstraitParser::parseNamedStruct(const ::substrait::NamedStruct& namedStruct) {
return substraitTypeList;
}

std::vector<bool> SubstraitParser::parsePartitionColumns(
const ::substrait::NamedStruct& namedStruct) {
const auto& columnsTypes = namedStruct.partition_columns().column_type();
std::vector<bool> isPartitionColumns;
if (columnsTypes.size() == 0) {
// Regard all columns as non-partitioned columns.
isPartitionColumns.resize(namedStruct.names().size(), false);
return isPartitionColumns;
} else {
VELOX_CHECK(
columnsTypes.size() == namedStruct.names().size(),
"Invalid partion columns.");
}

isPartitionColumns.reserve(columnsTypes.size());
for (const auto& columnType : columnsTypes) {
switch (columnType) {
case ::substrait::PartitionColumns::NORMAL_COL:
isPartitionColumns.emplace_back(false);
break;
case ::substrait::PartitionColumns::PARTITION_COL:
isPartitionColumns.emplace_back(true);
break;
default:
VELOX_FAIL("Patition column type is not supported.");
}
}
return isPartitionColumns;
}

int32_t SubstraitParser::parseReferenceSegment(
const ::substrait::Expression::ReferenceSegment& refSegment) {
auto typeCase = refSegment.reference_type_case();
Expand Down Expand Up @@ -219,17 +276,73 @@ const std::string& SubstraitParser::findFunctionSpec(
return map[id];
}

std::string SubstraitParser::getSubFunctionName(
const std::string& subFuncSpec) const {
// Get the position of ":" in the function name.
std::size_t pos = subFuncSpec.find(":");
if (pos == std::string::npos) {
return subFuncSpec;
}
return subFuncSpec.substr(0, pos);
}

void SubstraitParser::getSubFunctionTypes(
const std::string& subFuncSpec,
std::vector<std::string>& types) const {
// Get the position of ":" in the function name.
std::size_t pos = subFuncSpec.find(":");
// Get the parameter types.
std::string funcTypes;
if (pos == std::string::npos) {
funcTypes = subFuncSpec;
} else {
if (pos == subFuncSpec.size() - 1) {
return;
}
funcTypes = subFuncSpec.substr(pos + 1);
}
// Split the types with delimiter.
std::string delimiter = "_";
while ((pos = funcTypes.find(delimiter)) != std::string::npos) {
auto type = funcTypes.substr(0, pos);
if (type != "opt" && type != "req") {
types.emplace_back(type);
}
funcTypes.erase(0, pos + delimiter.length());
}
types.emplace_back(funcTypes);
}

std::string SubstraitParser::findVeloxFunction(
const std::unordered_map<uint64_t, std::string>& functionMap,
uint64_t id) const {
std::string funcSpec = findFunctionSpec(functionMap, id);
std::string_view funcName = getNameBeforeDelimiter(funcSpec, ":");
return mapToVeloxFunction({funcName.begin(), funcName.end()});
std::vector<std::string> types;
getSubFunctionTypes(funcSpec, types);
bool isDecimal = false;
for (auto& type : types) {
if (type.find("dec") != std::string::npos) {
isDecimal = true;
break;
}
}
return mapToVeloxFunction({funcName.begin(), funcName.end()}, isDecimal);
}

std::string SubstraitParser::mapToVeloxFunction(
const std::string& substraitFunction) const {
const std::string& substraitFunction,
bool isDecimal) const {
auto it = substraitVeloxFunctionMap_.find(substraitFunction);
if (isDecimal) {
if (substraitFunction == "add" || substraitFunction == "subtract" ||
substraitFunction == "multiply" || substraitFunction == "divide" ||
substraitFunction == "avg" || substraitFunction == "avg_merge" ||
substraitFunction == "sum" || substraitFunction == "sum_merge" ||
substraitFunction == "round") {
return "decimal_" + substraitFunction;
}
}
if (it != substraitVeloxFunctionMap_.end()) {
return it->second;
}
Expand All @@ -239,4 +352,19 @@ std::string SubstraitParser::mapToVeloxFunction(
return substraitFunction;
}

bool SubstraitParser::configSetInOptimization(
const ::substrait::extensions::AdvancedExtension& extension,
const std::string& config) const {
if (extension.has_optimization()) {
google::protobuf::StringValue msg;
extension.optimization().UnpackTo(&msg);
std::size_t pos = msg.value().find(config);
if ((pos != std::string::npos) &&
(msg.value().substr(pos + config.size(), 1) == "1")) {
return true;
}
}
return false;
}

} // namespace facebook::velox::substrait
79 changes: 70 additions & 9 deletions velox/substrait/SubstraitParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "velox/substrait/proto/substrait/type.pb.h"
#include "velox/substrait/proto/substrait/type_expressions.pb.h"

#include <google/protobuf/wrappers.pb.h>

namespace facebook::velox::substrait {

/// This class contains some common functions used to parse Substrait
Expand All @@ -37,14 +39,21 @@ class SubstraitParser {
bool nullable;
};

/// Parse Substrait NamedStruct.
std::vector<std::shared_ptr<SubstraitParser::SubstraitType>> parseNamedStruct(
/// Used to parse Substrait NamedStruct.
std::vector<std::shared_ptr<SubstraitType>> parseNamedStruct(
const ::substrait::NamedStruct& namedStruct);

/// Used to parse partition columns from Substrait NamedStruct.
std::vector<bool> parsePartitionColumns(
const ::substrait::NamedStruct& namedStruct);

/// Parse Substrait Type.
std::shared_ptr<SubstraitType> parseType(
const ::substrait::Type& substraitType);

// Parse substraitType type such as i32.
std::string parseType(const std::string& substraitType);

/// Parse Substrait ReferenceSegment.
int32_t parseReferenceSegment(
const ::substrait::Expression::ReferenceSegment& refSegment);
Expand All @@ -70,26 +79,78 @@ class SubstraitParser {
const std::unordered_map<uint64_t, std::string>& functionMap,
uint64_t id) const;

/// Find the Velox function name according to the function id
/// Extracts the function name for a function from specified compound name.
/// When the input is a simple name, it will be returned.
std::string getSubFunctionName(const std::string& functionSpec) const;

/// This function is used get the types from the compound name.
void getSubFunctionTypes(
const std::string& subFuncSpec,
std::vector<std::string>& types) const;

/// Used to find the Velox function name according to the function id
/// from a pre-constructed function map.
std::string findVeloxFunction(
const std::unordered_map<uint64_t, std::string>& functionMap,
uint64_t id) const;

/// Map the Substrait function keyword into Velox function keyword.
std::string mapToVeloxFunction(const std::string& substraitFunction) const;
std::string mapToVeloxFunction(
const std::string& substraitFunction,
bool isDecimal) const;

/// @brief Return whether a config is set as true in AdvancedExtension
/// optimization.
/// @param extension Substrait advanced extension.
/// @param config the key string of a config.
/// @return Whether the config is set as true.
bool configSetInOptimization(
const ::substrait::extensions::AdvancedExtension& extension,
const std::string& config) const;

private:
/// A map used for mapping Substrait function keywords into Velox functions'
/// keywords. Key: the Substrait function keyword, Value: the Velox function
/// keyword. For those functions with different names in Substrait and Velox,
/// a mapping relation should be added here.
std::unordered_map<std::string, std::string> substraitVeloxFunctionMap_ = {
{"add", "plus"},
{"subtract", "minus"},
{"modulus", "mod"},
{"not_equal", "neq"},
{"equal", "eq"}};
{"is_not_null", "isnotnull"}, /*Spark functions.*/
{"is_null", "isnull"},
{"equal", "equalto"},
{"lt", "lessthan"},
{"lte", "lessthanorequal"},
{"gt", "greaterthan"},
{"gte", "greaterthanorequal"},
{"not_equal", "notequalto"},
{"char_length", "length"},
{"strpos", "instr"},
{"ends_with", "endswith"},
{"starts_with", "startswith"},
{"datediff", "date_diff"},
{"named_struct", "row_constructor"},
{"bit_or", "bitwise_or_agg"},
{"bit_or_merge", "bitwise_or_agg_merge"},
{"bit_and", "bitwise_and_agg"},
{"bit_and_merge", "bitwise_and_agg_merge"},
{"modulus", "mod"} /*Presto functions.*/};

// The map is uesd for mapping substrait type.
// Key: type in function name.
// Value: substrait type name.
const std::unordered_map<std::string, std::string> typeMap_ = {
{"bool", "BOOLEAN"},
{"i8", "TINYINT"},
{"i16", "SMALLINT"},
{"i32", "INTEGER"},
{"i64", "BIGINT"},
{"fp32", "REAL"},
{"fp64", "DOUBLE"},
{"date", "DATE"},
{"ts", "TIMESTAMP"},
{"str", "VARCHAR"},
{"vbin", "VARBINARY"},
{"decShort", "SHORT_DECIMAL"},
{"decLong", "LONG_DECIMAL"}};
};

} // namespace facebook::velox::substrait
Loading

0 comments on commit a9a05b1

Please sign in to comment.