Skip to content

Commit

Permalink
Upgrade substrait to 0.7.0 (apache#148)
Browse files Browse the repository at this point in the history
* upgrade substrait to 0.7.0

* fix the uts
  • Loading branch information
JkSelf authored Aug 25, 2022
1 parent c4bd7cc commit 37cac97
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cpp/examples/arrow/engine_substrait_consumption.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> GetSubstraitFromServer(
"items": [
{
"uri_file": "file://FILENAME_PLACEHOLDER",
"format": "FILE_FORMAT_PARQUET"
"parquet": {}
}
]
}
Expand Down
38 changes: 29 additions & 9 deletions cpp/src/arrow/engine/substrait/expression_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,35 @@ Result<compute::Expression> FromProto(const substrait::Expression& expr,
ARROW_ASSIGN_OR_RAISE(auto decoded_function,
ext_set.DecodeFunction(scalar_fn.function_reference()));

std::vector<compute::Expression> arguments(scalar_fn.args_size());
for (int i = 0; i < scalar_fn.args_size(); ++i) {
ARROW_ASSIGN_OR_RAISE(arguments[i], FromProto(scalar_fn.args(i), ext_set));
std::vector<compute::Expression> arguments(scalar_fn.arguments_size());
for (int i = 0; i < scalar_fn.arguments_size(); ++i) {
const auto& argument = scalar_fn.arguments(i);
switch (argument.arg_type_case()) {
case substrait::FunctionArgument::kValue: {
ARROW_ASSIGN_OR_RAISE(arguments[i], FromProto(argument.value(), ext_set));
break;
}
default:
return Status::NotImplemented(
"only value arguments are currently supported for functions");
}
}

if (decoded_function.name.to_string() == "alias") {
if (scalar_fn.args_size() != 1) {
if (scalar_fn.arguments_size() != 1) {
return arrow::Status::Invalid("Alias should have exact 1 arg, but got " +
std::to_string(scalar_fn.args_size()));
std::to_string(scalar_fn.arguments_size()));
}

const auto& argument = scalar_fn.arguments(0);
switch (argument.arg_type_case()) {
case substrait::FunctionArgument::kValue: {
return FromProto(argument.value(), ext_set);
}
default:
return Status::NotImplemented(
"only value arguments are currently supported for functions");
}
return FromProto(scalar_fn.args().at(0), ext_set);
}
if (decoded_function.name.to_string() == "is_in") {
const auto& in_list =
Expand Down Expand Up @@ -907,9 +925,11 @@ Result<std::unique_ptr<substrait::Expression>> ToProto(const compute::Expression

auto scalar_fn = internal::make_unique<substrait::Expression::ScalarFunction>();
scalar_fn->set_function_reference(anchor);
scalar_fn->mutable_args()->Reserve(static_cast<int>(arguments.size()));
scalar_fn->mutable_arguments()->Reserve(static_cast<int>(arguments.size()));
for (auto& arg : arguments) {
scalar_fn->mutable_args()->AddAllocated(arg.release());
auto argument = internal::make_unique<substrait::FunctionArgument>();
argument->set_allocated_value(arg.release());
scalar_fn->mutable_arguments()->AddAllocated(argument.release());
}

out->set_allocated_scalar_function(scalar_fn.release());
Expand Down
21 changes: 10 additions & 11 deletions cpp/src/arrow/engine/substrait/relation_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,16 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
"path_type other than uri_file");
}

if (item.format() ==
substrait::ReadRel::LocalFiles::FileOrFiles::FILE_FORMAT_PARQUET) {
format = std::make_shared<dataset::ParquetFileFormat>();
} else if (util::string_view{item.uri_file()}.ends_with(".arrow")) {
format = std::make_shared<dataset::IpcFileFormat>();
} else if (util::string_view{item.uri_file()}.ends_with(".feather")) {
format = std::make_shared<dataset::IpcFileFormat>();
} else {
return Status::NotImplemented(
"substrait::ReadRel::LocalFiles::FileOrFiles::format "
"other than FILE_FORMAT_PARQUET");
switch (item.file_format_case()) {
case substrait::ReadRel_LocalFiles_FileOrFiles::kParquet:
format = std::make_shared<dataset::ParquetFileFormat>();
break;
case substrait::ReadRel_LocalFiles_FileOrFiles::kArrow:
format = std::make_shared<dataset::IpcFileFormat>();
break;
default:
return Status::NotImplemented(
"unknown substrait::ReadRel::LocalFiles::FileOrFiles::file_format");
}

if (!util::string_view{item.uri_file()}.starts_with("file:///")) {
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,12 @@ TEST(Substrait, SupportedExtensionTypes) {
auto anchor = ext_set.num_types();

EXPECT_THAT(ext_set.EncodeType(*expected_type), ResultWith(Eq(anchor)));

ASSERT_OK_AND_ASSIGN(
auto buf,
internal::SubstraitFromJSON(
"Type", "{\"user_defined_type_reference\": " + std::to_string(anchor) + "}"));

"Type", "{\"user_defined\": { \"type_reference\": " + std::to_string(anchor) +
", \"nullability\": \"NULLABILITY_NULLABLE\" } }"));
ASSERT_OK_AND_ASSIGN(auto type, DeserializeType(*buf, ext_set));
EXPECT_EQ(*type, *expected_type);

Expand Down Expand Up @@ -258,8 +259,9 @@ TEST(Substrait, NamedStruct) {
}

TEST(Substrait, NoEquivalentArrowType) {
ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON(
"Type", R"({"user_defined_type_reference": 99})"));
ASSERT_OK_AND_ASSIGN(
auto buf,
internal::SubstraitFromJSON("Type", R"({"user_defined": {"type_reference": 99}})"));
ExtensionSet empty;
ASSERT_THAT(
DeserializeType(*buf, empty),
Expand Down Expand Up @@ -629,11 +631,11 @@ TEST(Substrait, ReadRel) {
"items": [
{
"uri_file": "file:///tmp/dat1.parquet",
"format": "FILE_FORMAT_PARQUET"
"parquet": {}
},
{
"uri_file": "file:///tmp/dat2.parquet",
"format": "FILE_FORMAT_PARQUET"
"parquet": {}
}
]
}
Expand Down
13 changes: 9 additions & 4 deletions cpp/src/arrow/engine/substrait/type_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ Result<std::pair<std::shared_ptr<DataType>, bool>> FromProto(
field("value", std::move(value_nullable.first), value_nullable.second));
}

case substrait::Type::kUserDefinedTypeReference: {
uint32_t anchor = type.user_defined_type_reference();
case substrait::Type::kUserDefined: {
const auto& user_defined = type.user_defined();
uint32_t anchor = user_defined.type_reference();
ARROW_ASSIGN_OR_RAISE(auto type_record, ext_set.DecodeType(anchor));
return std::make_pair(std::move(type_record.type), true);
return std::make_pair(std::move(type_record.type), IsNullable(user_defined));
}

default:
Expand Down Expand Up @@ -394,7 +395,11 @@ struct DataTypeToProtoImpl {
template <typename T>
Status EncodeUserDefined(const T& t) {
ARROW_ASSIGN_OR_RAISE(auto anchor, ext_set_->EncodeType(t));
type_->set_user_defined_type_reference(anchor);
auto user_defined = internal::make_unique<::substrait::Type_UserDefined>();
user_defined->set_type_reference(anchor);
user_defined->set_nullability(nullable_ ? ::substrait::Type::NULLABILITY_NULLABLE
: ::substrait::Type::NULLABILITY_REQUIRED);
type_->set_allocated_user_defined(user_defined.release());
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/thirdparty/versions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ ARROW_SNAPPY_BUILD_SHA256_CHECKSUM=75c1fbb3d618dd3a0483bff0e26d0a92b495bbe5059c8
# There is a bug in GCC < 4.9 with Snappy 1.1.9, so revert to 1.1.8 for those (ARROW-14661)
ARROW_SNAPPY_OLD_BUILD_VERSION=1.1.8
ARROW_SNAPPY_OLD_BUILD_SHA256_CHECKSUM=16b677f07832a612b0836178db7f374e414f94657c138e6993cbfc5dcc58651f
ARROW_SUBSTRAIT_BUILD_VERSION=f7a74dd8
ARROW_SUBSTRAIT_BUILD_SHA256_CHECKSUM=7a969afca305135c4933cffa4316ee6c6c9ee38813348c62401ff9a552cf6fdb
ARROW_SUBSTRAIT_BUILD_VERSION=v0.7.0
ARROW_SUBSTRAIT_BUILD_SHA256_CHECKSUM=15657168b0158e26b2b2e3b19887e7118126284e4094abf2a2a3edddca9d33c2
ARROW_THRIFT_BUILD_VERSION=0.13.0
ARROW_THRIFT_BUILD_SHA256_CHECKSUM=7ad348b88033af46ce49148097afe354d513c1fca7c607b59c33ebb6064b5179
ARROW_UTF8PROC_BUILD_VERSION=v2.7.0
Expand Down

0 comments on commit 37cac97

Please sign in to comment.